module Simulation.Aivika.Queue.Infinite
(
FCFSQueue,
LCFSQueue,
SIROQueue,
PriorityQueue,
Queue,
newFCFSQueue,
newLCFSQueue,
newSIROQueue,
newPriorityQueue,
newQueue,
enqueueStoringStrategy,
dequeueStrategy,
queueNull,
queueCount,
queueCountStats,
enqueueStoreCount,
dequeueCount,
dequeueExtractCount,
enqueueStoreRate,
dequeueRate,
dequeueExtractRate,
queueWaitTime,
dequeueWaitTime,
queueRate,
dequeue,
dequeueWithOutputPriority,
tryDequeue,
enqueue,
enqueueWithStoringPriority,
queueSummary,
queueNullChanged,
queueNullChanged_,
queueCountChanged,
queueCountChanged_,
enqueueStoreCountChanged,
enqueueStoreCountChanged_,
dequeueCountChanged,
dequeueCountChanged_,
dequeueExtractCountChanged,
dequeueExtractCountChanged_,
queueWaitTimeChanged,
queueWaitTimeChanged_,
dequeueWaitTimeChanged,
dequeueWaitTimeChanged_,
queueRateChanged,
queueRateChanged_,
enqueueStored,
dequeueRequested,
dequeueExtracted,
queueChanged_) where
import Data.IORef
import Data.Monoid
import Control.Monad
import Control.Monad.Trans
import Simulation.Aivika.Internal.Specs
import Simulation.Aivika.Internal.Simulation
import Simulation.Aivika.Internal.Dynamics
import Simulation.Aivika.Internal.Event
import Simulation.Aivika.Internal.Process
import Simulation.Aivika.Internal.Signal
import Simulation.Aivika.Signal
import Simulation.Aivika.Resource
import Simulation.Aivika.QueueStrategy
import Simulation.Aivika.Statistics
import qualified Simulation.Aivika.DoubleLinkedList as DLL
import qualified Simulation.Aivika.Vector as V
import qualified Simulation.Aivika.PriorityQueue as PQ
type FCFSQueue a =
Queue FCFS DLL.DoubleLinkedList FCFS DLL.DoubleLinkedList a
type LCFSQueue a =
Queue LCFS DLL.DoubleLinkedList FCFS DLL.DoubleLinkedList a
type SIROQueue a =
Queue SIRO V.Vector FCFS DLL.DoubleLinkedList a
type PriorityQueue a =
Queue StaticPriorities PQ.PriorityQueue FCFS DLL.DoubleLinkedList a
data Queue sm qm so qo a =
Queue { enqueueStoringStrategy :: sm,
dequeueStrategy :: so,
queueStore :: qm (QueueItem a),
dequeueRes :: Resource so qo,
queueCountRef :: IORef Int,
queueCountStatsRef :: IORef (TimingStats Int),
enqueueStoreCountRef :: IORef Int,
dequeueCountRef :: IORef Int,
dequeueExtractCountRef :: IORef Int,
queueWaitTimeRef :: IORef (SamplingStats Double),
dequeueWaitTimeRef :: IORef (SamplingStats Double),
enqueueStoredSource :: SignalSource a,
dequeueRequestedSource :: SignalSource (),
dequeueExtractedSource :: SignalSource a }
data QueueItem a =
QueueItem { itemValue :: a,
itemStoringTime :: Double
}
newFCFSQueue :: Event (FCFSQueue a)
newFCFSQueue = newQueue FCFS FCFS
newLCFSQueue :: Event (LCFSQueue a)
newLCFSQueue = newQueue LCFS FCFS
newSIROQueue :: Event (SIROQueue a)
newSIROQueue = newQueue SIRO FCFS
newPriorityQueue :: Event (PriorityQueue a)
newPriorityQueue = newQueue StaticPriorities FCFS
newQueue :: (QueueStrategy sm qm,
QueueStrategy so qo) =>
sm
-> so
-> Event (Queue sm qm so qo a)
newQueue sm so =
do t <- liftDynamics time
i <- liftIO $ newIORef 0
is <- liftIO $ newIORef $ returnTimingStats t 0
cm <- liftIO $ newIORef 0
cr <- liftIO $ newIORef 0
co <- liftIO $ newIORef 0
qm <- liftSimulation $ newStrategyQueue sm
ro <- liftSimulation $ newResourceWithMaxCount so 0 Nothing
w <- liftIO $ newIORef mempty
wo <- liftIO $ newIORef mempty
s3 <- liftSimulation newSignalSource
s4 <- liftSimulation newSignalSource
s5 <- liftSimulation newSignalSource
return Queue { enqueueStoringStrategy = sm,
dequeueStrategy = so,
queueStore = qm,
dequeueRes = ro,
queueCountRef = i,
queueCountStatsRef = is,
enqueueStoreCountRef = cm,
dequeueCountRef = cr,
dequeueExtractCountRef = co,
queueWaitTimeRef = w,
dequeueWaitTimeRef = wo,
enqueueStoredSource = s3,
dequeueRequestedSource = s4,
dequeueExtractedSource = s5 }
queueNull :: Queue sm qm so qo a -> Event Bool
queueNull q =
Event $ \p ->
do n <- readIORef (queueCountRef q)
return (n == 0)
queueNullChanged :: Queue sm qm so qo a -> Signal Bool
queueNullChanged q =
mapSignalM (const $ queueNull q) (queueNullChanged_ q)
queueNullChanged_ :: Queue sm qm so qo a -> Signal ()
queueNullChanged_ = queueCountChanged_
queueCount :: Queue sm qm so qo a -> Event Int
queueCount q =
Event $ \p -> readIORef (queueCountRef q)
queueCountStats :: Queue sm qm so qo a -> Event (TimingStats Int)
queueCountStats q =
Event $ \p -> readIORef (queueCountStatsRef q)
queueCountChanged :: Queue sm qm so qo a -> Signal Int
queueCountChanged q =
mapSignalM (const $ queueCount q) (queueCountChanged_ q)
queueCountChanged_ :: Queue sm qm so qo a -> Signal ()
queueCountChanged_ q =
mapSignal (const ()) (enqueueStored q) <>
mapSignal (const ()) (dequeueExtracted q)
enqueueStoreCount :: Queue sm qm so qo a -> Event Int
enqueueStoreCount q =
Event $ \p -> readIORef (enqueueStoreCountRef q)
enqueueStoreCountChanged :: Queue sm qm so qo a -> Signal Int
enqueueStoreCountChanged q =
mapSignalM (const $ enqueueStoreCount q) (enqueueStoreCountChanged_ q)
enqueueStoreCountChanged_ :: Queue sm qm so qo a -> Signal ()
enqueueStoreCountChanged_ q =
mapSignal (const ()) (enqueueStored q)
dequeueCount :: Queue sm qm so qo a -> Event Int
dequeueCount q =
Event $ \p -> readIORef (dequeueCountRef q)
dequeueCountChanged :: Queue sm qm so qo a -> Signal Int
dequeueCountChanged q =
mapSignalM (const $ dequeueCount q) (dequeueCountChanged_ q)
dequeueCountChanged_ :: Queue sm qm so qo a -> Signal ()
dequeueCountChanged_ q =
mapSignal (const ()) (dequeueRequested q)
dequeueExtractCount :: Queue sm qm so qo a -> Event Int
dequeueExtractCount q =
Event $ \p -> readIORef (dequeueExtractCountRef q)
dequeueExtractCountChanged :: Queue sm qm so qo a -> Signal Int
dequeueExtractCountChanged q =
mapSignalM (const $ dequeueExtractCount q) (dequeueExtractCountChanged_ q)
dequeueExtractCountChanged_ :: Queue sm qm so qo a -> Signal ()
dequeueExtractCountChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
enqueueStoreRate :: Queue sm qm so qo a -> Event Double
enqueueStoreRate q =
Event $ \p ->
do x <- readIORef (enqueueStoreCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t t0))
dequeueRate :: Queue sm qm so qo a -> Event Double
dequeueRate q =
Event $ \p ->
do x <- readIORef (dequeueCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t t0))
dequeueExtractRate :: Queue sm qm so qo a -> Event Double
dequeueExtractRate q =
Event $ \p ->
do x <- readIORef (dequeueExtractCountRef q)
let t0 = spcStartTime $ pointSpecs p
t = pointTime p
return (fromIntegral x / (t t0))
queueWaitTime :: Queue sm qm so qo a -> Event (SamplingStats Double)
queueWaitTime q =
Event $ \p -> readIORef (queueWaitTimeRef q)
queueWaitTimeChanged :: Queue sm qm so qo a -> Signal (SamplingStats Double)
queueWaitTimeChanged q =
mapSignalM (const $ queueWaitTime q) (queueWaitTimeChanged_ q)
queueWaitTimeChanged_ :: Queue sm qm so qo a -> Signal ()
queueWaitTimeChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
dequeueWaitTime :: Queue sm qm so qo a -> Event (SamplingStats Double)
dequeueWaitTime q =
Event $ \p -> readIORef (dequeueWaitTimeRef q)
dequeueWaitTimeChanged :: Queue sm qm so qo a -> Signal (SamplingStats Double)
dequeueWaitTimeChanged q =
mapSignalM (const $ dequeueWaitTime q) (dequeueWaitTimeChanged_ q)
dequeueWaitTimeChanged_ :: Queue sm qm so qo a -> Signal ()
dequeueWaitTimeChanged_ q =
mapSignal (const ()) (dequeueExtracted q)
queueRate :: Queue sm qm so qo a -> Event Double
queueRate q =
Event $ \p ->
do x <- readIORef (queueCountStatsRef q)
y <- readIORef (queueWaitTimeRef q)
return (timingStatsMean x / samplingStatsMean y)
queueRateChanged :: Queue sm qm so qo a -> Signal Double
queueRateChanged q =
mapSignalM (const $ queueRate q) (queueRateChanged_ q)
queueRateChanged_ :: Queue sm qm so qo a -> Signal ()
queueRateChanged_ q =
mapSignal (const ()) (enqueueStored q) <>
mapSignal (const ()) (dequeueExtracted q)
dequeue :: (DequeueStrategy sm qm,
EnqueueStrategy so qo)
=> Queue sm qm so qo a
-> Process a
dequeue q =
do t <- liftEvent $ dequeueRequest q
requestResource (dequeueRes q)
liftEvent $ dequeueExtract q t
dequeueWithOutputPriority :: (DequeueStrategy sm qm,
PriorityQueueStrategy so qo po)
=> Queue sm qm so qo a
-> po
-> Process a
dequeueWithOutputPriority q po =
do t <- liftEvent $ dequeueRequest q
requestResourceWithPriority (dequeueRes q) po
liftEvent $ dequeueExtract q t
tryDequeue :: DequeueStrategy sm qm
=> Queue sm qm so qo a
-> Event (Maybe a)
tryDequeue q =
do x <- tryRequestResourceWithinEvent (dequeueRes q)
if x
then do t <- dequeueRequest q
fmap Just $ dequeueExtract q t
else return Nothing
enqueue :: (EnqueueStrategy sm qm,
DequeueStrategy so qo)
=> Queue sm qm so qo a
-> a
-> Event ()
enqueue = enqueueStore
enqueueWithStoringPriority :: (PriorityQueueStrategy sm qm pm,
DequeueStrategy so qo)
=> Queue sm qm so qo a
-> pm
-> a
-> Event ()
enqueueWithStoringPriority = enqueueStoreWithPriority
enqueueStored :: Queue sm qm so qo a -> Signal a
enqueueStored q = publishSignal (enqueueStoredSource q)
dequeueRequested :: Queue sm qm so qo a -> Signal ()
dequeueRequested q = publishSignal (dequeueRequestedSource q)
dequeueExtracted :: Queue sm qm so qo a -> Signal a
dequeueExtracted q = publishSignal (dequeueExtractedSource q)
enqueueStore :: (EnqueueStrategy sm qm,
DequeueStrategy so qo)
=> Queue sm qm so qo a
-> a
-> Event ()
enqueueStore q a =
Event $ \p ->
do let i = QueueItem { itemValue = a,
itemStoringTime = pointTime p }
invokeEvent p $
strategyEnqueue (enqueueStoringStrategy q) (queueStore q) i
c <- readIORef (queueCountRef q)
let c' = c + 1
t = pointTime p
c' `seq` writeIORef (queueCountRef q) c'
modifyIORef' (queueCountStatsRef q) (addTimingStats t c')
modifyIORef' (enqueueStoreCountRef q) (+ 1)
invokeEvent p $
releaseResourceWithinEvent (dequeueRes q)
invokeEvent p $
triggerSignal (enqueueStoredSource q) (itemValue i)
enqueueStoreWithPriority :: (PriorityQueueStrategy sm qm pm,
DequeueStrategy so qo)
=> Queue sm qm so qo a
-> pm
-> a
-> Event ()
enqueueStoreWithPriority q pm a =
Event $ \p ->
do let i = QueueItem { itemValue = a,
itemStoringTime = pointTime p }
invokeEvent p $
strategyEnqueueWithPriority (enqueueStoringStrategy q) (queueStore q) pm i
c <- readIORef (queueCountRef q)
let c' = c + 1
t = pointTime p
c' `seq` writeIORef (queueCountRef q) c'
modifyIORef' (queueCountStatsRef q) (addTimingStats t c')
modifyIORef' (enqueueStoreCountRef q) (+ 1)
invokeEvent p $
releaseResourceWithinEvent (dequeueRes q)
invokeEvent p $
triggerSignal (enqueueStoredSource q) (itemValue i)
dequeueRequest :: Queue sm qm so qo a
-> Event Double
dequeueRequest q =
Event $ \p ->
do modifyIORef' (dequeueCountRef q) (+ 1)
invokeEvent p $
triggerSignal (dequeueRequestedSource q) ()
return $ pointTime p
dequeueExtract :: DequeueStrategy sm qm
=> Queue sm qm so qo a
-> Double
-> Event a
dequeueExtract q t' =
Event $ \p ->
do i <- invokeEvent p $
strategyDequeue (enqueueStoringStrategy q) (queueStore q)
c <- readIORef (queueCountRef q)
let c' = c 1
t = pointTime p
c' `seq` writeIORef (queueCountRef q) c'
modifyIORef' (queueCountStatsRef q) (addTimingStats t c')
modifyIORef' (dequeueExtractCountRef q) (+ 1)
invokeEvent p $
dequeueStat q t' i
invokeEvent p $
triggerSignal (dequeueExtractedSource q) (itemValue i)
return $ itemValue i
dequeueStat :: Queue sm qm so qo a
-> Double
-> QueueItem a
-> Event ()
dequeueStat q t' i =
Event $ \p ->
do let t1 = itemStoringTime i
t = pointTime p
modifyIORef' (dequeueWaitTimeRef q) $
addSamplingStats (t t')
modifyIORef' (queueWaitTimeRef q) $
addSamplingStats (t t1)
queueChanged_ :: Queue sm qm so qo a -> Signal ()
queueChanged_ q =
mapSignal (const ()) (enqueueStored q) <>
dequeueRequested q <>
mapSignal (const ()) (dequeueExtracted q)
queueSummary :: (Show sm, Show so) => Queue sm qm so qo a -> Int -> Event ShowS
queueSummary q indent =
do let sm = enqueueStoringStrategy q
so = dequeueStrategy q
null <- queueNull q
count <- queueCount q
countStats <- queueCountStats q
enqueueStoreCount <- enqueueStoreCount q
dequeueCount <- dequeueCount q
dequeueExtractCount <- dequeueExtractCount q
enqueueStoreRate <- enqueueStoreRate q
dequeueRate <- dequeueRate q
dequeueExtractRate <- dequeueExtractRate q
waitTime <- queueWaitTime q
dequeueWaitTime <- dequeueWaitTime q
let tab = replicate indent ' '
return $
showString tab .
showString "the storing (memory) strategy = " .
shows sm .
showString "\n" .
showString tab .
showString "the dequeueing (output) strategy = " .
shows so .
showString "\n" .
showString tab .
showString "empty? = " .
shows null .
showString "\n" .
showString tab .
showString "the current size = " .
shows count .
showString "\n" .
showString tab .
showString "the size statistics = \n\n" .
timingStatsSummary countStats (2 + indent) .
showString "\n\n" .
showString tab .
showString "the enqueue store count (number of the input items that were stored) = " .
shows enqueueStoreCount .
showString "\n" .
showString tab .
showString "the dequeue count (number of requests for dequeueing an item) = " .
shows dequeueCount .
showString "\n" .
showString tab .
showString "the dequeue extract count (number of the output items that were dequeued) = " .
shows dequeueExtractCount .
showString "\n" .
showString tab .
showString "the enqueue store rate (how many input items were stored per time) = " .
shows enqueueStoreRate .
showString "\n" .
showString tab .
showString "the dequeue rate (how many requests for dequeueing per time) = " .
shows dequeueRate .
showString "\n" .
showString tab .
showString "the dequeue extract rate (how many output items were dequeued per time) = " .
shows dequeueExtractRate .
showString "\n" .
showString tab .
showString "the wait time (when was stored -> when was dequeued) = \n\n" .
samplingStatsSummary waitTime (2 + indent) .
showString "\n\n" .
showString tab .
showString "the dequeue wait time (when was requested for dequeueing -> when was dequeued) = \n\n" .
samplingStatsSummary dequeueWaitTime (2 + indent)