module Simulation.Aivika.Server
(
Server,
ServerInterruption(..),
newServer,
newStateServer,
newInterruptibleServer,
newInterruptibleStateServer,
serverProcessor,
serverInitState,
serverState,
serverTotalInputWaitTime,
serverTotalProcessingTime,
serverTotalOutputWaitTime,
serverInputWaitTime,
serverProcessingTime,
serverOutputWaitTime,
serverInputWaitFactor,
serverProcessingFactor,
serverOutputWaitFactor,
serverSummary,
serverStateChanged,
serverStateChanged_,
serverTotalInputWaitTimeChanged,
serverTotalInputWaitTimeChanged_,
serverTotalProcessingTimeChanged,
serverTotalProcessingTimeChanged_,
serverTotalOutputWaitTimeChanged,
serverTotalOutputWaitTimeChanged_,
serverInputWaitTimeChanged,
serverInputWaitTimeChanged_,
serverProcessingTimeChanged,
serverProcessingTimeChanged_,
serverOutputWaitTimeChanged,
serverOutputWaitTimeChanged_,
serverInputWaitFactorChanged,
serverInputWaitFactorChanged_,
serverProcessingFactorChanged,
serverProcessingFactorChanged_,
serverOutputWaitFactorChanged,
serverOutputWaitFactorChanged_,
serverInputReceived,
serverTaskInterrupted,
serverTaskProcessed,
serverOutputProvided,
serverChanged_) where
import Data.IORef
import Data.Monoid
import Control.Monad
import Control.Monad.Trans
import Control.Arrow
import Simulation.Aivika.Simulation
import Simulation.Aivika.Dynamics
import Simulation.Aivika.Internal.Event
import Simulation.Aivika.Signal
import Simulation.Aivika.Resource
import Simulation.Aivika.Cont
import Simulation.Aivika.Process
import Simulation.Aivika.Processor
import Simulation.Aivika.Stream
import Simulation.Aivika.Statistics
data Server s a b =
Server { serverInitState :: s,
serverStateRef :: IORef s,
serverProcess :: s -> a -> Process (s, b),
serverProcessInterruptible :: Bool,
serverTotalInputWaitTimeRef :: IORef Double,
serverTotalProcessingTimeRef :: IORef Double,
serverTotalOutputWaitTimeRef :: IORef Double,
serverInputWaitTimeRef :: IORef (SamplingStats Double),
serverProcessingTimeRef :: IORef (SamplingStats Double),
serverOutputWaitTimeRef :: IORef (SamplingStats Double),
serverInputReceivedSource :: SignalSource a,
serverTaskInterruptedSource :: SignalSource (ServerInterruption a),
serverTaskProcessedSource :: SignalSource (a, b),
serverOutputProvidedSource :: SignalSource (a, b)
}
data ServerInterruption a =
ServerInterruption { serverInterruptedInput :: a,
serverStartProcessingTime :: Double,
serverInterruptionTime :: Double
}
newServer :: (a -> Process b)
-> Simulation (Server () a b)
newServer = newInterruptibleServer False
newStateServer :: (s -> a -> Process (s, b))
-> s
-> Simulation (Server s a b)
newStateServer = newInterruptibleStateServer False
newInterruptibleServer :: Bool
-> (a -> Process b)
-> Simulation (Server () a b)
newInterruptibleServer interruptible provide =
flip (newInterruptibleStateServer interruptible) () $ \s a ->
do b <- provide a
return (s, b)
newInterruptibleStateServer :: Bool
-> (s -> a -> Process (s, b))
-> s
-> Simulation (Server s a b)
newInterruptibleStateServer interruptible provide state =
do r0 <- liftIO $ newIORef state
r1 <- liftIO $ newIORef 0
r2 <- liftIO $ newIORef 0
r3 <- liftIO $ newIORef 0
r4 <- liftIO $ newIORef emptySamplingStats
r5 <- liftIO $ newIORef emptySamplingStats
r6 <- liftIO $ newIORef emptySamplingStats
s1 <- newSignalSource
s2 <- newSignalSource
s3 <- newSignalSource
s4 <- newSignalSource
let server = Server { serverInitState = state,
serverStateRef = r0,
serverProcess = provide,
serverProcessInterruptible = interruptible,
serverTotalInputWaitTimeRef = r1,
serverTotalProcessingTimeRef = r2,
serverTotalOutputWaitTimeRef = r3,
serverInputWaitTimeRef = r4,
serverProcessingTimeRef = r5,
serverOutputWaitTimeRef = r6,
serverInputReceivedSource = s1,
serverTaskInterruptedSource = s2,
serverTaskProcessedSource = s3,
serverOutputProvidedSource = s4 }
return server
serverProcessor :: Server s a b -> Processor a b
serverProcessor server =
Processor $ \xs -> loop (serverInitState server) Nothing xs
where
loop s r xs =
Cons $
do t0 <- liftDynamics time
liftEvent $
case r of
Nothing -> return ()
Just (t', a', b') ->
do liftIO $
do modifyIORef' (serverTotalOutputWaitTimeRef server) (+ (t0 t'))
modifyIORef' (serverOutputWaitTimeRef server) $
addSamplingStats (t0 t')
triggerSignal (serverOutputProvidedSource server) (a', b')
(a, xs') <- runStream xs
t1 <- liftDynamics time
liftEvent $
do liftIO $
do modifyIORef' (serverTotalInputWaitTimeRef server) (+ (t1 t0))
modifyIORef' (serverInputWaitTimeRef server) $
addSamplingStats (t1 t0)
triggerSignal (serverInputReceivedSource server) a
(s', b) <-
if serverProcessInterruptible server
then serverProcessInterrupting server s a
else serverProcess server s a
t2 <- liftDynamics time
liftEvent $
do liftIO $
do writeIORef (serverStateRef server) $! s'
modifyIORef' (serverTotalProcessingTimeRef server) (+ (t2 t1))
modifyIORef' (serverProcessingTimeRef server) $
addSamplingStats (t2 t1)
triggerSignal (serverTaskProcessedSource server) (a, b)
return (b, loop s' (Just (t2, a, b)) xs')
serverProcessInterrupting :: Server s a b -> s -> a -> Process (s, b)
serverProcessInterrupting server s a =
do pid <- processId
t1 <- liftDynamics time
finallyProcess
(serverProcess server s a)
(liftEvent $
do cancelled <- processCancelled pid
when cancelled $
do t2 <- liftDynamics time
liftIO $
do modifyIORef' (serverTotalProcessingTimeRef server) (+ (t2 t1))
modifyIORef' (serverProcessingTimeRef server) $
addSamplingStats (t2 t1)
let x = ServerInterruption a t1 t2
triggerSignal (serverTaskInterruptedSource server) x)
serverState :: Server s a b -> Event s
serverState server =
Event $ \p -> readIORef (serverStateRef server)
serverStateChanged :: Server s a b -> Signal s
serverStateChanged server =
mapSignalM (const $ serverState server) (serverStateChanged_ server)
serverStateChanged_ :: Server s a b -> Signal ()
serverStateChanged_ server =
mapSignal (const ()) (serverTaskProcessed server)
serverTotalInputWaitTime :: Server s a b -> Event Double
serverTotalInputWaitTime server =
Event $ \p -> readIORef (serverTotalInputWaitTimeRef server)
serverTotalInputWaitTimeChanged :: Server s a b -> Signal Double
serverTotalInputWaitTimeChanged server =
mapSignalM (const $ serverTotalInputWaitTime server) (serverTotalInputWaitTimeChanged_ server)
serverTotalInputWaitTimeChanged_ :: Server s a b -> Signal ()
serverTotalInputWaitTimeChanged_ server =
mapSignal (const ()) (serverInputReceived server)
serverTotalProcessingTime :: Server s a b -> Event Double
serverTotalProcessingTime server =
Event $ \p -> readIORef (serverTotalProcessingTimeRef server)
serverTotalProcessingTimeChanged :: Server s a b -> Signal Double
serverTotalProcessingTimeChanged server =
mapSignalM (const $ serverTotalProcessingTime server) (serverTotalProcessingTimeChanged_ server)
serverTotalProcessingTimeChanged_ :: Server s a b -> Signal ()
serverTotalProcessingTimeChanged_ server =
mapSignal (const ()) (serverTaskProcessed server)
serverTotalOutputWaitTime :: Server s a b -> Event Double
serverTotalOutputWaitTime server =
Event $ \p -> readIORef (serverTotalOutputWaitTimeRef server)
serverTotalOutputWaitTimeChanged :: Server s a b -> Signal Double
serverTotalOutputWaitTimeChanged server =
mapSignalM (const $ serverTotalOutputWaitTime server) (serverTotalOutputWaitTimeChanged_ server)
serverTotalOutputWaitTimeChanged_ :: Server s a b -> Signal ()
serverTotalOutputWaitTimeChanged_ server =
mapSignal (const ()) (serverOutputProvided server)
serverInputWaitTime :: Server s a b -> Event (SamplingStats Double)
serverInputWaitTime server =
Event $ \p -> readIORef (serverInputWaitTimeRef server)
serverInputWaitTimeChanged :: Server s a b -> Signal (SamplingStats Double)
serverInputWaitTimeChanged server =
mapSignalM (const $ serverInputWaitTime server) (serverInputWaitTimeChanged_ server)
serverInputWaitTimeChanged_ :: Server s a b -> Signal ()
serverInputWaitTimeChanged_ server =
mapSignal (const ()) (serverInputReceived server)
serverProcessingTime :: Server s a b -> Event (SamplingStats Double)
serverProcessingTime server =
Event $ \p -> readIORef (serverProcessingTimeRef server)
serverProcessingTimeChanged :: Server s a b -> Signal (SamplingStats Double)
serverProcessingTimeChanged server =
mapSignalM (const $ serverProcessingTime server) (serverProcessingTimeChanged_ server)
serverProcessingTimeChanged_ :: Server s a b -> Signal ()
serverProcessingTimeChanged_ server =
mapSignal (const ()) (serverTaskProcessed server)
serverOutputWaitTime :: Server s a b -> Event (SamplingStats Double)
serverOutputWaitTime server =
Event $ \p -> readIORef (serverOutputWaitTimeRef server)
serverOutputWaitTimeChanged :: Server s a b -> Signal (SamplingStats Double)
serverOutputWaitTimeChanged server =
mapSignalM (const $ serverOutputWaitTime server) (serverOutputWaitTimeChanged_ server)
serverOutputWaitTimeChanged_ :: Server s a b -> Signal ()
serverOutputWaitTimeChanged_ server =
mapSignal (const ()) (serverOutputProvided server)
serverInputWaitFactor :: Server s a b -> Event Double
serverInputWaitFactor server =
Event $ \p ->
do x1 <- readIORef (serverTotalInputWaitTimeRef server)
x2 <- readIORef (serverTotalProcessingTimeRef server)
x3 <- readIORef (serverTotalOutputWaitTimeRef server)
return (x1 / (x1 + x2 + x3))
serverInputWaitFactorChanged :: Server s a b -> Signal Double
serverInputWaitFactorChanged server =
mapSignalM (const $ serverInputWaitFactor server) (serverInputWaitFactorChanged_ server)
serverInputWaitFactorChanged_ :: Server s a b -> Signal ()
serverInputWaitFactorChanged_ server =
mapSignal (const ()) (serverInputReceived server) <>
mapSignal (const ()) (serverTaskProcessed server) <>
mapSignal (const ()) (serverOutputProvided server)
serverProcessingFactor :: Server s a b -> Event Double
serverProcessingFactor server =
Event $ \p ->
do x1 <- readIORef (serverTotalInputWaitTimeRef server)
x2 <- readIORef (serverTotalProcessingTimeRef server)
x3 <- readIORef (serverTotalOutputWaitTimeRef server)
return (x2 / (x1 + x2 + x3))
serverProcessingFactorChanged :: Server s a b -> Signal Double
serverProcessingFactorChanged server =
mapSignalM (const $ serverProcessingFactor server) (serverProcessingFactorChanged_ server)
serverProcessingFactorChanged_ :: Server s a b -> Signal ()
serverProcessingFactorChanged_ server =
mapSignal (const ()) (serverInputReceived server) <>
mapSignal (const ()) (serverTaskProcessed server) <>
mapSignal (const ()) (serverOutputProvided server)
serverOutputWaitFactor :: Server s a b -> Event Double
serverOutputWaitFactor server =
Event $ \p ->
do x1 <- readIORef (serverTotalInputWaitTimeRef server)
x2 <- readIORef (serverTotalProcessingTimeRef server)
x3 <- readIORef (serverTotalOutputWaitTimeRef server)
return (x3 / (x1 + x2 + x3))
serverOutputWaitFactorChanged :: Server s a b -> Signal Double
serverOutputWaitFactorChanged server =
mapSignalM (const $ serverOutputWaitFactor server) (serverOutputWaitFactorChanged_ server)
serverOutputWaitFactorChanged_ :: Server s a b -> Signal ()
serverOutputWaitFactorChanged_ server =
mapSignal (const ()) (serverInputReceived server) <>
mapSignal (const ()) (serverTaskProcessed server) <>
mapSignal (const ()) (serverOutputProvided server)
serverInputReceived :: Server s a b -> Signal a
serverInputReceived = publishSignal . serverInputReceivedSource
serverTaskInterrupted :: Server s a b -> Signal (ServerInterruption a)
serverTaskInterrupted = publishSignal . serverTaskInterruptedSource
serverTaskProcessed :: Server s a b -> Signal (a, b)
serverTaskProcessed = publishSignal . serverTaskProcessedSource
serverOutputProvided :: Server s a b -> Signal (a, b)
serverOutputProvided = publishSignal . serverOutputProvidedSource
serverChanged_ :: Server s a b -> Signal ()
serverChanged_ server =
mapSignal (const ()) (serverInputReceived server) <>
mapSignal (const ()) (serverTaskInterrupted server) <>
mapSignal (const ()) (serverTaskProcessed server) <>
mapSignal (const ()) (serverOutputProvided server)
serverSummary :: Server s a b -> Int -> Event ShowS
serverSummary server indent =
Event $ \p ->
do tx1 <- readIORef (serverTotalInputWaitTimeRef server)
tx2 <- readIORef (serverTotalProcessingTimeRef server)
tx3 <- readIORef (serverTotalOutputWaitTimeRef server)
let xf1 = tx1 / (tx1 + tx2 + tx3)
xf2 = tx2 / (tx1 + tx2 + tx3)
xf3 = tx3 / (tx1 + tx2 + tx3)
xs1 <- readIORef (serverInputWaitTimeRef server)
xs2 <- readIORef (serverProcessingTimeRef server)
xs3 <- readIORef (serverOutputWaitTimeRef server)
let tab = replicate indent ' '
return $
showString tab .
showString "total input wait time (locked while awaiting the input) = " . shows tx1 .
showString "\n" .
showString tab .
showString "total processing time = " . shows tx2 .
showString "\n" .
showString tab .
showString "total output wait time (locked while delivering the output) = " . shows tx3 .
showString "\n\n" .
showString tab .
showString "input wait factor (from 0 to 1) = " . shows xf1 .
showString "\n" .
showString tab .
showString "processing factor (from 0 to 1) = " . shows xf2 .
showString "\n" .
showString tab .
showString "output wait factor (from 0 to 1) = " . shows xf3 .
showString "\n\n" .
showString tab .
showString "input wait time (locked while awaiting the input):\n\n" .
samplingStatsSummary xs1 (2 + indent) .
showString "\n\n" .
showString tab .
showString "processing time:\n\n" .
samplingStatsSummary xs2 (2 + indent) .
showString "\n\n" .
showString tab .
showString "output wait time (locked while delivering the output):\n\n" .
samplingStatsSummary xs3 (2 + indent)