module Simulation.Aivika.Queue
(
FCFSQueue,
LCFSQueue,
SIROQueue,
PriorityQueue,
Queue,
newFCFSQueue,
newLCFSQueue,
newSIROQueue,
newPriorityQueue,
newQueue,
enqueueStrategy,
enqueueStoringStrategy,
dequeueStrategy,
queueNull,
queueFull,
queueMaxCount,
queueCount,
queueCountStats,
enqueueCount,
enqueueLostCount,
enqueueStoreCount,
dequeueCount,
dequeueExtractCount,
queueLoadFactor,
enqueueRate,
enqueueStoreRate,
dequeueRate,
dequeueExtractRate,
queueWaitTime,
queueTotalWaitTime,
enqueueWaitTime,
dequeueWaitTime,
queueRate,
dequeue,
dequeueWithOutputPriority,
tryDequeue,
enqueue,
enqueueWithInputPriority,
enqueueWithStoringPriority,
enqueueWithInputStoringPriorities,
tryEnqueue,
tryEnqueueWithStoringPriority,
enqueueOrLost,
enqueueOrLost_,
enqueueWithStoringPriorityOrLost,
enqueueWithStoringPriorityOrLost_,
waitWhileFullQueue,
queueSummary,
queueNullChanged,
queueNullChanged_,
queueFullChanged,
queueFullChanged_,
queueCountChanged,
queueCountChanged_,
enqueueCountChanged,
enqueueCountChanged_,
enqueueLostCountChanged,
enqueueLostCountChanged_,
enqueueStoreCountChanged,
enqueueStoreCountChanged_,
dequeueCountChanged,
dequeueCountChanged_,
dequeueExtractCountChanged,
dequeueExtractCountChanged_,
queueLoadFactorChanged,
queueLoadFactorChanged_,
queueWaitTimeChanged,
queueWaitTimeChanged_,
queueTotalWaitTimeChanged,
queueTotalWaitTimeChanged_,
enqueueWaitTimeChanged,
enqueueWaitTimeChanged_,
dequeueWaitTimeChanged,
dequeueWaitTimeChanged_,
queueRateChanged,
queueRateChanged_,
enqueueInitiated,
enqueueStored,
enqueueLost,
dequeueRequested,
dequeueExtracted,
queueChanged_) where
import Data.IORef
import Data.Monoid
import Control.Monad
import Control.Monad.Trans
import Simulation.Aivika.Internal.Specs
import Simulation.Aivika.Internal.Simulation
import Simulation.Aivika.Internal.Dynamics
import Simulation.Aivika.Internal.Event
import Simulation.Aivika.Internal.Process
import Simulation.Aivika.Internal.Signal
import Simulation.Aivika.Signal
import Simulation.Aivika.Resource
import Simulation.Aivika.QueueStrategy
import Simulation.Aivika.Statistics
import qualified Simulation.Aivika.DoubleLinkedList as DLL
import qualified Simulation.Aivika.Vector as V
import qualified Simulation.Aivika.PriorityQueue as PQ
type FCFSQueue a =
Queue FCFS DLL.DoubleLinkedList FCFS DLL.DoubleLinkedList FCFS DLL.DoubleLinkedList a
type LCFSQueue a =
Queue FCFS DLL.DoubleLinkedList LCFS DLL.DoubleLinkedList FCFS DLL.DoubleLinkedList a
type SIROQueue a =
Queue FCFS DLL.DoubleLinkedList SIRO V.Vector FCFS DLL.DoubleLinkedList a
type PriorityQueue a =
Queue FCFS DLL.DoubleLinkedList StaticPriorities PQ.PriorityQueue FCFS DLL.DoubleLinkedList a
data Queue si qi sm qm so qo a =
Queue { queueMaxCount :: Int,
enqueueStrategy :: si,
enqueueStoringStrategy :: sm,
dequeueStrategy :: so,
enqueueRes :: Resource si qi,
queueStore :: qm (QueueItem a),
dequeueRes :: Resource so qo,
queueCountRef :: IORef Int,
queueCountStatsRef :: IORef (TimingStats Int),
enqueueCountRef :: IORef Int,
enqueueLostCountRef :: IORef Int,
enqueueStoreCountRef :: IORef Int,
dequeueCountRef :: IORef Int,
dequeueExtractCountRef :: IORef Int,
queueWaitTimeRef :: IORef (SamplingStats Double),
queueTotalWaitTimeRef :: IORef (SamplingStats Double),
enqueueWaitTimeRef :: IORef (SamplingStats Double),
dequeueWaitTimeRef :: IORef (SamplingStats Double),
enqueueInitiatedSource :: SignalSource a,
enqueueLostSource :: SignalSource a,
enqueueStoredSource :: SignalSource a,
dequeueRequestedSource :: SignalSource (),
dequeueExtractedSource :: SignalSource a }
data QueueItem a =
QueueItem { itemValue :: a,
itemInputTime :: Double,
itemStoringTime :: Double
}
newFCFSQueue :: Int -> Event (FCFSQueue a)
newFCFSQueue = newQueue FCFS FCFS FCFS
newLCFSQueue :: Int -> Event (LCFSQueue a)
newLCFSQueue = newQueue FCFS LCFS FCFS
newSIROQueue :: Int -> Event (SIROQueue a)
newSIROQueue = newQueue FCFS SIRO FCFS
newPriorityQueue :: Int -> Event (PriorityQueue a)
newPriorityQueue = newQueue FCFS StaticPriorities FCFS
newQueue :: (QueueStrategy si qi,
QueueStrategy sm qm,
QueueStrategy so qo) =>
si
-> sm
-> so
-> Int
-> Event (Queue si qi sm qm so qo a)
newQueue si sm so count =
do t <- liftDynamics time
i <- liftIO $ newIORef 0
is <- liftIO $ newIORef $ returnTimingStats t 0
ci <- liftIO $ newIORef 0
cl <- liftIO $ newIORef 0
cm <- liftIO $ newIORef 0
cr <- liftIO $ newIORef 0
co <- liftIO $ newIORef 0
ri <- liftSimulation $ newResourceWithMaxCount si count (Just count)
qm <- liftSimulation $ newStrategyQueue sm
ro <- liftSimulation $ newResourceWithMaxCount so 0 (Just count)
w <- liftIO $ newIORef mempty
wt <- liftIO $ newIORef mempty
wi <- liftIO $ newIORef mempty
wo <- liftIO $ newIORef mempty
s1 <- liftSimulation $ newSignalSource
s2 <- liftSimulation $ newSignalSource
s3 <- liftSimulation $ newSignalSource
s4 <- liftSimulation $ newSignalSource
s5 <- liftSimulation $ newSignalSource
return Queue { queueMaxCount = count,
enqueueStrategy = si,
enqueueStoringStrategy = sm,
dequeueStrategy = so,
enqueueRes = ri,
queueStore = qm,
dequeueRes = ro,
queueCountRef = i,
queueCountStatsRef = is,
enqueueCountRef = ci,
enqueueLostCountRef = cl,
enqueueStoreCountRef = cm,
dequeueCountRef = cr,
dequeueExtractCountRef = co,
queueWaitTimeRef = w,
queueTotalWaitTimeRef = wt,
enqueueWaitTimeRef = wi,
dequeueWaitTimeRef = wo,
enqueueInitiatedSource = s1,
enqueueLostSource = s2,
enqueueStoredSource = s3,
dequeueRequestedSource = s4,
dequeueExtractedSource = s5 }
queueNull :: Queue si qi sm qm so qo a -> Event Bool
queueNull q =
Event $ \p ->
do n <- readIORef (queueCountRef q)
return (n == 0)
queueNullChanged :: Queue si qi sm qm so qo a -> Signal Bool
queueNullChanged q =
mapSignalM (const $ queueNull q) (queueNullChanged_ q)
queueNullChanged_ :: Queue si qi sm qm so qo a -> Signal ()
queueNullChanged_ = queueCountChanged_
queueFull :: Queue si qi sm qm so qo a -> Event Bool
queueFull q =
Event $ \p ->
do n <- readIORef (queueCountRef q)
return (n == queueMaxCount q)
queueFullChanged :: Queue si qi sm qm so qo a -> Signal Bool
queueFullChanged q =
mapSignalM (const $ queueFull q) (queueFullChanged_ q)
queueFullChanged_ :: Queue si qi sm qm so qo a -> Signal ()
queueFullChanged_ = queueCountChanged_
queueCount :: Queue si qi sm qm so qo a -> Event Int
queueCount q =
Event $ \p -> readIORef (queueCountRef q)
queueCountStats :: Queue si qi sm qm so qo a -> Event (TimingStats Int)
queueCountStats q =
Event $ \p -> readIORef (queueCountStatsRef q)
queueCountChanged :: Queue si qi sm qm so qo a -> Signal Int
queueCountChanged q =
mapSignalM (const $ queueCount q) (queueCountChanged_ q)
queueCountChanged_ :: Queue si qi sm qm so qo a -> Signal ()
queueCountChanged_ q =
mapSignal (const ()) (enqueueStored q) <>
mapSignal (const ()) (dequeueExtracted q)
enqueueCount :: Queue si qi sm qm so qo a -> Event Int
enqueueCount q =
Event $ \p -> readIORef (enqueueCountRef q)
enqueueCountChanged :: Queue si qi sm qm so qo a -> Signal Int
enqueueCountChanged q =
mapSignalM (const $ enqueueCount q) (enqueueCountChanged_ q)
enqueueCountChanged_ :: Queue si qi sm qm so qo a -> Signal ()
enqueueCountChanged_ q =
mapSignal (const ()) (enqueueInitiated q)
enqueueLostCount :: Queue si qi sm qm so qo a -> Event Int
enqueueLostCount q =
Event $ \p -> readIORef (enqueueLostCountRef q)
enqueueLostCountChanged :: Queue si qi sm qm so qo a -> Signal Int
enqueueLostCountChanged q =
mapSignalM (const $ enqueueLostCount q) (enqueueLostCountChanged_ q)
enqueueLostCountChanged_ :: Queue si qi sm qm so qo a -> Signal ()
enqueueLostCountChanged_ q =
mapSignal (const ()) (enqueueLost q)
enqueueStoreCount :: Queue si qi sm qm so qo a -> Event Int
enqueueStoreCount q =
Event $ \p -> readIORef (enqueueStoreCountRef q)
enqueueStoreCountChanged :: Queue si qi sm qm so qo a -> Signal Int
enqueueStoreCountChanged q =
mapSignalM (const $ enqueueStoreCount q) (enqueueStoreCountChanged_ q)
enqueueStoreCountChanged_ :: Queue si qi sm qm so qo a -> Signal ()
enqueueStoreCountChanged_ q =
mapSignal (const ()) (enqueueStored q)
dequeueCount :: Queue si qi sm qm so qo a -> Event Int
dequeueCount q =
Event $ \p -> readIORef (dequeueCountRef q)
dequeueCountChanged :: Queue si qi sm qm so qo a -> Signal Int
dequeueCountChanged q =
mapSignalM (const $ dequeueCount q) (dequeueCountChanged_ q)
dequeueCountChanged_ :: Queue si qi sm qm so qo a -> Signal ()
dequeueCountChanged_ q =
mapSignal (const ()) (dequeueRequested q)
dequeueExtractCount :: Queue si qi sm qm so qo a -> Event Int
dequeueExtractCount q =
Event $ \p -> readIORef (dequeueExtractCountRef q)
dequeueExtractCountChanged :: Queue si qi sm qm so qo a -> Signal Int
dequeueExtractCountChanged q =
mapSignalM (const $ dequeueExtractCount q) (dequeueExtractCountChanged_ q)
dequeueExtractCountChanged_ :: Queue si qi sm qm so qo a -> Signal ()
dequeueExtractCountChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
queueLoadFactor :: Queue si qi sm qm so qo a -> Event Double
queueLoadFactor q =
Event $ \p ->
do x <- readIORef (queueCountRef q)
let y = queueMaxCount q
return (fromIntegral x / fromIntegral y)
queueLoadFactorChanged :: Queue si qi sm qm so qo a -> Signal Double
queueLoadFactorChanged q =
mapSignalM (const $ queueLoadFactor q) (queueLoadFactorChanged_ q)
queueLoadFactorChanged_ :: Queue si qi sm qm so qo a -> Signal ()
queueLoadFactorChanged_ q =
mapSignal (const ()) (enqueueStored q) <>
mapSignal (const ()) (dequeueExtracted q)
enqueueRate :: Queue si qi sm qm so qo a -> Event Double
enqueueRate q =
Event $ \p ->
do x <- readIORef (enqueueCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t t0))
enqueueStoreRate :: Queue si qi sm qm so qo a -> Event Double
enqueueStoreRate q =
Event $ \p ->
do x <- readIORef (enqueueStoreCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t t0))
dequeueRate :: Queue si qi sm qm so qo a -> Event Double
dequeueRate q =
Event $ \p ->
do x <- readIORef (dequeueCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t t0))
dequeueExtractRate :: Queue si qi sm qm so qo a -> Event Double
dequeueExtractRate q =
Event $ \p ->
do x <- readIORef (dequeueExtractCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t t0))
queueWaitTime :: Queue si qi sm qm so qo a -> Event (SamplingStats Double)
queueWaitTime q =
Event $ \p -> readIORef (queueWaitTimeRef q)
queueWaitTimeChanged :: Queue si qi sm qm so qo a -> Signal (SamplingStats Double)
queueWaitTimeChanged q =
mapSignalM (const $ queueWaitTime q) (queueWaitTimeChanged_ q)
queueWaitTimeChanged_ :: Queue si qi sm qm so qo a -> Signal ()
queueWaitTimeChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
queueTotalWaitTime :: Queue si qi sm qm so qo a -> Event (SamplingStats Double)
queueTotalWaitTime q =
Event $ \p -> readIORef (queueTotalWaitTimeRef q)
queueTotalWaitTimeChanged :: Queue si qi sm qm so qo a -> Signal (SamplingStats Double)
queueTotalWaitTimeChanged q =
mapSignalM (const $ queueTotalWaitTime q) (queueTotalWaitTimeChanged_ q)
queueTotalWaitTimeChanged_ :: Queue si qi sm qm so qo a -> Signal ()
queueTotalWaitTimeChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
enqueueWaitTime :: Queue si qi sm qm so qo a -> Event (SamplingStats Double)
enqueueWaitTime q =
Event $ \p -> readIORef (enqueueWaitTimeRef q)
enqueueWaitTimeChanged :: Queue si qi sm qm so qo a -> Signal (SamplingStats Double)
enqueueWaitTimeChanged q =
mapSignalM (const $ enqueueWaitTime q) (enqueueWaitTimeChanged_ q)
enqueueWaitTimeChanged_ :: Queue si qi sm qm so qo a -> Signal ()
enqueueWaitTimeChanged_ q =
mapSignal (const ()) (enqueueStored q)
dequeueWaitTime :: Queue si qi sm qm so qo a -> Event (SamplingStats Double)
dequeueWaitTime q =
Event $ \p -> readIORef (dequeueWaitTimeRef q)
dequeueWaitTimeChanged :: Queue si qi sm qm so qo a -> Signal (SamplingStats Double)
dequeueWaitTimeChanged q =
mapSignalM (const $ dequeueWaitTime q) (dequeueWaitTimeChanged_ q)
dequeueWaitTimeChanged_ :: Queue si qi sm qm so qo a -> Signal ()
dequeueWaitTimeChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
queueRate :: Queue si qi sm qm so qo a -> Event Double
queueRate q =
Event $ \p ->
do x <- readIORef (queueCountStatsRef q)
y <- readIORef (queueWaitTimeRef q)
return (timingStatsMean x / samplingStatsMean y)
queueRateChanged :: Queue si qi sm qm so qo a -> Signal Double
queueRateChanged q =
mapSignalM (const $ queueRate q) (queueRateChanged_ q)
queueRateChanged_ :: Queue si qi sm qm so qo a -> Signal ()
queueRateChanged_ q =
mapSignal (const ()) (enqueueStored q) <>
mapSignal (const ()) (dequeueExtracted q)
dequeue :: (DequeueStrategy si qi,
DequeueStrategy sm qm,
EnqueueStrategy so qo)
=> Queue si qi sm qm so qo a
-> Process a
dequeue q =
do t <- liftEvent $ dequeueRequest q
requestResource (dequeueRes q)
liftEvent $ dequeueExtract q t
dequeueWithOutputPriority :: (DequeueStrategy si qi,
DequeueStrategy sm qm,
PriorityQueueStrategy so qo po)
=> Queue si qi sm qm so qo a
-> po
-> Process a
dequeueWithOutputPriority q po =
do t <- liftEvent $ dequeueRequest q
requestResourceWithPriority (dequeueRes q) po
liftEvent $ dequeueExtract q t
tryDequeue :: (DequeueStrategy si qi,
DequeueStrategy sm qm)
=> Queue si qi sm qm so qo a
-> Event (Maybe a)
tryDequeue q =
do x <- tryRequestResourceWithinEvent (dequeueRes q)
if x
then do t <- dequeueRequest q
fmap Just $ dequeueExtract q t
else return Nothing
enqueue :: (EnqueueStrategy si qi,
EnqueueStrategy sm qm,
DequeueStrategy so qo)
=> Queue si qi sm qm so qo a
-> a
-> Process ()
enqueue q a =
do i <- liftEvent $ enqueueInitiate q a
requestResource (enqueueRes q)
liftEvent $ enqueueStore q i
enqueueWithInputPriority :: (PriorityQueueStrategy si qi pi,
EnqueueStrategy sm qm,
DequeueStrategy so qo)
=> Queue si qi sm qm so qo a
-> pi
-> a
-> Process ()
enqueueWithInputPriority q pi a =
do i <- liftEvent $ enqueueInitiate q a
requestResourceWithPriority (enqueueRes q) pi
liftEvent $ enqueueStore q i
enqueueWithStoringPriority :: (EnqueueStrategy si qi,
PriorityQueueStrategy sm qm pm,
DequeueStrategy so qo)
=> Queue si qi sm qm so qo a
-> pm
-> a
-> Process ()
enqueueWithStoringPriority q pm a =
do i <- liftEvent $ enqueueInitiate q a
requestResource (enqueueRes q)
liftEvent $ enqueueStoreWithPriority q pm i
enqueueWithInputStoringPriorities :: (PriorityQueueStrategy si qi pi,
PriorityQueueStrategy sm qm pm,
DequeueStrategy so qo)
=> Queue si qi sm qm so qo a
-> pi
-> pm
-> a
-> Process ()
enqueueWithInputStoringPriorities q pi pm a =
do i <- liftEvent $ enqueueInitiate q a
requestResourceWithPriority (enqueueRes q) pi
liftEvent $ enqueueStoreWithPriority q pm i
tryEnqueue :: (EnqueueStrategy sm qm,
DequeueStrategy so qo)
=> Queue si qi sm qm so qo a
-> a
-> Event Bool
tryEnqueue q a =
do x <- tryRequestResourceWithinEvent (enqueueRes q)
if x
then do enqueueInitiate q a >>= enqueueStore q
return True
else return False
tryEnqueueWithStoringPriority :: (PriorityQueueStrategy sm qm pm,
DequeueStrategy so qo)
=> Queue si qi sm qm so qo a
-> pm
-> a
-> Event Bool
tryEnqueueWithStoringPriority q pm a =
do x <- tryRequestResourceWithinEvent (enqueueRes q)
if x
then do enqueueInitiate q a >>= enqueueStoreWithPriority q pm
return True
else return False
enqueueOrLost :: (EnqueueStrategy sm qm,
DequeueStrategy so qo)
=> Queue si qi sm qm so qo a
-> a
-> Event Bool
enqueueOrLost q a =
do x <- tryRequestResourceWithinEvent (enqueueRes q)
if x
then do enqueueInitiate q a >>= enqueueStore q
return True
else do enqueueDeny q a
return False
enqueueWithStoringPriorityOrLost :: (PriorityQueueStrategy sm qm pm,
DequeueStrategy so qo)
=> Queue si qi sm qm so qo a
-> pm
-> a
-> Event Bool
enqueueWithStoringPriorityOrLost q pm a =
do x <- tryRequestResourceWithinEvent (enqueueRes q)
if x
then do enqueueInitiate q a >>= enqueueStoreWithPriority q pm
return True
else do enqueueDeny q a
return False
enqueueOrLost_ :: (EnqueueStrategy sm qm,
DequeueStrategy so qo)
=> Queue si qi sm qm so qo a
-> a
-> Event ()
enqueueOrLost_ q a =
do x <- enqueueOrLost q a
return ()
enqueueWithStoringPriorityOrLost_ :: (PriorityQueueStrategy sm qm pm,
DequeueStrategy so qo)
=> Queue si qi sm qm so qo a
-> pm
-> a
-> Event ()
enqueueWithStoringPriorityOrLost_ q pm a =
do x <- enqueueWithStoringPriorityOrLost q pm a
return ()
enqueueInitiated :: Queue si qi sm qm so qo a -> Signal a
enqueueInitiated q = publishSignal (enqueueInitiatedSource q)
enqueueStored :: Queue si qi sm qm so qo a -> Signal a
enqueueStored q = publishSignal (enqueueStoredSource q)
enqueueLost :: Queue si qi sm qm so qo a -> Signal a
enqueueLost q = publishSignal (enqueueLostSource q)
dequeueRequested :: Queue si qi sm qm so qo a -> Signal ()
dequeueRequested q = publishSignal (dequeueRequestedSource q)
dequeueExtracted :: Queue si qi sm qm so qo a -> Signal a
dequeueExtracted q = publishSignal (dequeueExtractedSource q)
enqueueInitiate :: Queue si qi sm qm so qo a
-> a
-> Event (QueueItem a)
enqueueInitiate q a =
Event $ \p ->
do let t = pointTime p
modifyIORef' (enqueueCountRef q) (+ 1)
invokeEvent p $
triggerSignal (enqueueInitiatedSource q) a
return QueueItem { itemValue = a,
itemInputTime = t,
itemStoringTime = t
}
enqueueStore :: (EnqueueStrategy sm qm,
DequeueStrategy so qo)
=> Queue si qi sm qm so qo a
-> QueueItem a
-> Event ()
enqueueStore q i =
Event $ \p ->
do let i' = i { itemStoringTime = pointTime p }
invokeEvent p $
strategyEnqueue (enqueueStoringStrategy q) (queueStore q) i'
c <- readIORef (queueCountRef q)
let c' = c + 1
t = pointTime p
c' `seq` writeIORef (queueCountRef q) c'
modifyIORef' (queueCountStatsRef q) (addTimingStats t c')
modifyIORef' (enqueueStoreCountRef q) (+ 1)
invokeEvent p $
enqueueStat q i'
invokeEvent p $
releaseResourceWithinEvent (dequeueRes q)
invokeEvent p $
triggerSignal (enqueueStoredSource q) (itemValue i')
enqueueStoreWithPriority :: (PriorityQueueStrategy sm qm pm,
DequeueStrategy so qo)
=> Queue si qi sm qm so qo a
-> pm
-> QueueItem a
-> Event ()
enqueueStoreWithPriority q pm i =
Event $ \p ->
do let i' = i { itemStoringTime = pointTime p }
invokeEvent p $
strategyEnqueueWithPriority (enqueueStoringStrategy q) (queueStore q) pm i'
c <- readIORef (queueCountRef q)
let c' = c + 1
t = pointTime p
c' `seq` writeIORef (queueCountRef q) c'
modifyIORef' (queueCountStatsRef q) (addTimingStats t c')
modifyIORef' (enqueueStoreCountRef q) (+ 1)
invokeEvent p $
enqueueStat q i'
invokeEvent p $
releaseResourceWithinEvent (dequeueRes q)
invokeEvent p $
triggerSignal (enqueueStoredSource q) (itemValue i')
enqueueDeny :: Queue si qi sm qm so qo a
-> a
-> Event ()
enqueueDeny q a =
Event $ \p ->
do modifyIORef' (enqueueLostCountRef q) $ (+) 1
invokeEvent p $
triggerSignal (enqueueLostSource q) a
enqueueStat :: Queue si qi sm qm so qo a
-> QueueItem a
-> Event ()
enqueueStat q i =
Event $ \p ->
do let t0 = itemInputTime i
t1 = itemStoringTime i
modifyIORef' (enqueueWaitTimeRef q) $
addSamplingStats (t1 t0)
dequeueRequest :: Queue si qi sm qm so qo a
-> Event Double
dequeueRequest q =
Event $ \p ->
do modifyIORef' (dequeueCountRef q) (+ 1)
invokeEvent p $
triggerSignal (dequeueRequestedSource q) ()
return $ pointTime p
dequeueExtract :: (DequeueStrategy si qi,
DequeueStrategy sm qm)
=> Queue si qi sm qm so qo a
-> Double
-> Event a
dequeueExtract q t' =
Event $ \p ->
do i <- invokeEvent p $
strategyDequeue (enqueueStoringStrategy q) (queueStore q)
c <- readIORef (queueCountRef q)
let c' = c 1
t = pointTime p
c' `seq` writeIORef (queueCountRef q) c'
modifyIORef' (queueCountStatsRef q) (addTimingStats t c')
modifyIORef' (dequeueExtractCountRef q) (+ 1)
invokeEvent p $
dequeueStat q t' i
invokeEvent p $
releaseResourceWithinEvent (enqueueRes q)
invokeEvent p $
triggerSignal (dequeueExtractedSource q) (itemValue i)
return $ itemValue i
dequeueStat :: Queue si qi sm qm so qo a
-> Double
-> QueueItem a
-> Event ()
dequeueStat q t' i =
Event $ \p ->
do let t0 = itemInputTime i
t1 = itemStoringTime i
t = pointTime p
modifyIORef' (dequeueWaitTimeRef q) $
addSamplingStats (t t')
modifyIORef' (queueTotalWaitTimeRef q) $
addSamplingStats (t t0)
modifyIORef' (queueWaitTimeRef q) $
addSamplingStats (t t1)
waitWhileFullQueue :: Queue si qi sm qm so qo a -> Process ()
waitWhileFullQueue q =
do x <- liftEvent (queueFull q)
when x $
do processAwait (dequeueExtracted q)
waitWhileFullQueue q
queueChanged_ :: Queue si qi sm qm so qo a -> Signal ()
queueChanged_ q =
mapSignal (const ()) (enqueueInitiated q) <>
mapSignal (const ()) (enqueueStored q) <>
mapSignal (const ()) (enqueueLost q) <>
dequeueRequested q <>
mapSignal (const ()) (dequeueExtracted q)
queueSummary :: (Show si, Show sm, Show so) => Queue si qi sm qm so qo a -> Int -> Event ShowS
queueSummary q indent =
do let si = enqueueStrategy q
sm = enqueueStoringStrategy q
so = dequeueStrategy q
null <- queueNull q
full <- queueFull q
let maxCount = queueMaxCount q
count <- queueCount q
countStats <- queueCountStats q
enqueueCount <- enqueueCount q
enqueueLostCount <- enqueueLostCount q
enqueueStoreCount <- enqueueStoreCount q
dequeueCount <- dequeueCount q
dequeueExtractCount <- dequeueExtractCount q
loadFactor <- queueLoadFactor q
enqueueRate <- enqueueRate q
enqueueStoreRate <- enqueueStoreRate q
dequeueRate <- dequeueRate q
dequeueExtractRate <- dequeueExtractRate q
waitTime <- queueWaitTime q
totalWaitTime <- queueTotalWaitTime q
enqueueWaitTime <- enqueueWaitTime q
dequeueWaitTime <- dequeueWaitTime q
let tab = replicate indent ' '
return $
showString tab .
showString "the enqueueing (input) strategy = " .
shows si .
showString "\n" .
showString tab .
showString "the storing (memory) strategy = " .
shows sm .
showString "\n" .
showString tab .
showString "the dequeueing (output) strategy = " .
shows so .
showString "\n" .
showString tab .
showString "empty? = " .
shows null .
showString "\n" .
showString tab .
showString "full? = " .
shows full .
showString "\n" .
showString tab .
showString "max. capacity = " .
shows maxCount .
showString "\n" .
showString tab .
showString "size = " .
shows count .
showString "\n" .
showString tab .
showString "the size statistics = \n\n" .
timingStatsSummary countStats (2 + indent) .
showString "\n\n" .
showString tab .
showString "the enqueue count (number of the input items that were enqueued) = " .
shows enqueueCount .
showString "\n" .
showString tab .
showString "the enqueue lost count (number of the lost items) = " .
shows enqueueLostCount .
showString "\n" .
showString tab .
showString "the enqueue store count (number of the input items that were stored) = " .
shows enqueueStoreCount .
showString "\n" .
showString tab .
showString "the dequeue count (number of requests for dequeueing an item) = " .
shows dequeueCount .
showString "\n" .
showString tab .
showString "the dequeue extract count (number of the output items that were dequeued) = " .
shows dequeueExtractCount .
showString "\n" .
showString tab .
showString "the load factor (size / max. capacity) = " .
shows loadFactor .
showString "\n" .
showString tab .
showString "the enqueue rate (how many input items were enqueued per time) = " .
shows enqueueRate .
showString "\n" .
showString tab .
showString "the enqueue store rate (how many input items were stored per time) = " .
shows enqueueStoreRate .
showString "\n" .
showString tab .
showString "the dequeue rate (how many requests for dequeueing per time) = " .
shows dequeueRate .
showString "\n" .
showString tab .
showString "the dequeue extract rate (how many output items were dequeued per time) = " .
shows dequeueExtractRate .
showString "\n" .
showString tab .
showString "the wait time (when was stored -> when was dequeued) = \n\n" .
samplingStatsSummary waitTime (2 + indent) .
showString "\n\n" .
showString tab .
showString "the total wait time (when the enqueueing was initiated -> when was dequeued) = \n\n" .
samplingStatsSummary totalWaitTime (2 + indent) .
showString "\n\n" .
showString tab .
showString "the enqueue wait time (when the enqueueing was initiated -> when was stored) = \n\n" .
samplingStatsSummary enqueueWaitTime (2 + indent) .
showString "\n\n" .
showString tab .
showString "the dequeue wait time (when was requested for dequeueing -> when was dequeued) = \n\n" .
samplingStatsSummary dequeueWaitTime (2 + indent)