module Simulation.Aivika.IO.Resource.Preemption () where
import Control.Monad
import Control.Monad.Trans
import Data.Maybe
import Data.IORef
import Data.Monoid
import Simulation.Aivika.Trans.Exception
import Simulation.Aivika.Trans.Ref.Base
import Simulation.Aivika.Trans.DES
import Simulation.Aivika.Trans.Internal.Specs
import Simulation.Aivika.Trans.Internal.Simulation
import Simulation.Aivika.Trans.Internal.Event
import Simulation.Aivika.Trans.Internal.Cont
import Simulation.Aivika.Trans.Internal.Process
import Simulation.Aivika.Trans.Resource.Preemption
import Simulation.Aivika.Trans.Statistics
import Simulation.Aivika.Trans.Signal
import Simulation.Aivika.IO.DES
import qualified Simulation.Aivika.PriorityQueue as PQ
instance MonadResource IO where
data Resource IO =
Resource { resourceMaxCount0 :: Maybe Int,
resourceCountRef :: IORef Int,
resourceCountStatsRef :: IORef (TimingStats Int),
resourceCountSource :: SignalSource IO Int,
resourceUtilisationCountRef :: IORef Int,
resourceUtilisationCountStatsRef :: IORef (TimingStats Int),
resourceUtilisationCountSource :: SignalSource IO Int,
resourceQueueCountRef :: IORef Int,
resourceQueueCountStatsRef :: IORef (TimingStats Int),
resourceQueueCountSource :: SignalSource IO Int,
resourceTotalWaitTimeRef :: IORef Double,
resourceWaitTimeRef :: IORef (SamplingStats Double),
resourceWaitTimeSource :: SignalSource IO (),
resourceActingQueue :: PQ.PriorityQueue (ResourceActingItem IO),
resourceWaitQueue :: PQ.PriorityQueue (ResourceAwaitingItem IO) }
newResource count =
newResourceWithMaxCount count (Just count)
newResourceWithMaxCount count maxCount =
Event $ \p ->
do let r = pointRun p
t = pointTime p
when (count < 0) $
throwComp $
SimulationRetry $
"The resource count cannot be negative: " ++
"newResourceWithMaxCount."
case maxCount of
Just maxCount | count > maxCount ->
throwComp $
SimulationRetry $
"The resource count cannot be greater than " ++
"its maximum value: newResourceWithMaxCount."
_ ->
return ()
countRef <- liftIO $ newIORef count
countStatsRef <- liftIO $ newIORef $ returnTimingStats t count
countSource <- invokeSimulation r newSignalSource
utilCountRef <- liftIO $ newIORef 0
utilCountStatsRef <- liftIO $ newIORef $ returnTimingStats t 0
utilCountSource <- invokeSimulation r newSignalSource
queueCountRef <- liftIO $ newIORef 0
queueCountStatsRef <- liftIO $ newIORef $ returnTimingStats t 0
queueCountSource <- invokeSimulation r newSignalSource
totalWaitTimeRef <- liftIO $ newIORef 0
waitTimeRef <- liftIO $ newIORef emptySamplingStats
waitTimeSource <- invokeSimulation r newSignalSource
actingQueue <- liftIO PQ.newQueue
waitQueue <- liftIO PQ.newQueue
return Resource { resourceMaxCount0 = maxCount,
resourceCountRef = countRef,
resourceCountStatsRef = countStatsRef,
resourceCountSource = countSource,
resourceUtilisationCountRef = utilCountRef,
resourceUtilisationCountStatsRef = utilCountStatsRef,
resourceUtilisationCountSource = utilCountSource,
resourceQueueCountRef = queueCountRef,
resourceQueueCountStatsRef = queueCountStatsRef,
resourceQueueCountSource = queueCountSource,
resourceTotalWaitTimeRef = totalWaitTimeRef,
resourceWaitTimeRef = waitTimeRef,
resourceWaitTimeSource = waitTimeSource,
resourceActingQueue = actingQueue,
resourceWaitQueue = waitQueue }
resourceMaxCount = resourceMaxCount0
resourceCount r =
Event $ \p -> liftIO $ readIORef (resourceCountRef r)
resourceCountStats r =
Event $ \p -> liftIO $ readIORef (resourceCountStatsRef r)
resourceCountChanged r =
publishSignal $ resourceCountSource r
resourceCountChanged_ r =
mapSignal (const ()) $ resourceCountChanged r
resourceUtilisationCount r =
Event $ \p -> liftIO $ readIORef (resourceUtilisationCountRef r)
resourceUtilisationCountStats r =
Event $ \p -> liftIO $ readIORef (resourceUtilisationCountStatsRef r)
resourceUtilisationCountChanged r =
publishSignal $ resourceUtilisationCountSource r
resourceUtilisationCountChanged_ r =
mapSignal (const ()) $ resourceUtilisationCountChanged r
resourceQueueCount r =
Event $ \p -> liftIO $ readIORef (resourceQueueCountRef r)
resourceQueueCountStats r =
Event $ \p -> liftIO $ readIORef (resourceQueueCountStatsRef r)
resourceQueueCountChanged r =
publishSignal $ resourceQueueCountSource r
resourceQueueCountChanged_ r =
mapSignal (const ()) $ resourceQueueCountChanged r
resourceTotalWaitTime r =
Event $ \p -> liftIO $ readIORef (resourceTotalWaitTimeRef r)
resourceWaitTime r =
Event $ \p -> liftIO $ readIORef (resourceWaitTimeRef r)
resourceWaitTimeChanged r =
mapSignalM (\() -> resourceWaitTime r) $ resourceWaitTimeChanged_ r
resourceWaitTimeChanged_ r =
publishSignal $ resourceWaitTimeSource r
resourceChanged_ r =
resourceCountChanged_ r <>
resourceUtilisationCountChanged_ r <>
resourceQueueCountChanged_ r
requestResourceWithPriority r priority =
Process $ \pid ->
Cont $ \c ->
Event $ \p ->
do let t = pointTime p
a <- liftIO $ readIORef (resourceCountRef r)
if a == 0
then do f <- liftIO $ PQ.queueNull (resourceActingQueue r)
if f
then do c <- invokeEvent p $
freezeContReentering c () $
invokeCont c $
invokeProcess pid $
requestResourceWithPriority r priority
liftIO $ PQ.enqueue (resourceWaitQueue r) priority (Left $ ResourceRequestingItem priority t pid c)
invokeEvent p $ updateResourceQueueCount r 1
else do (p0', item0) <- liftIO $ PQ.queueFront (resourceActingQueue r)
let p0 = p0'
pid0 = actingItemId item0
if priority < p0
then do liftIO $ PQ.dequeue (resourceActingQueue r)
liftIO $ PQ.enqueue (resourceActingQueue r) ( priority) $ ResourceActingItem priority pid
liftIO $ PQ.enqueue (resourceWaitQueue r) p0 (Right $ ResourcePreemptedItem p0 t pid0)
invokeEvent p $ updateResourceQueueCount r 1
invokeEvent p $ processPreemptionBegin pid0
invokeEvent p $ resumeCont c ()
else do c <- invokeEvent p $
freezeContReentering c () $
invokeCont c $
invokeProcess pid $
requestResourceWithPriority r priority
liftIO $ PQ.enqueue (resourceWaitQueue r) priority (Left $ ResourceRequestingItem priority t pid c)
invokeEvent p $ updateResourceQueueCount r 1
else do liftIO $ PQ.enqueue (resourceActingQueue r) ( priority) $ ResourceActingItem priority pid
invokeEvent p $ updateResourceWaitTime r 0
invokeEvent p $ updateResourceCount r (1)
invokeEvent p $ updateResourceUtilisationCount r 1
invokeEvent p $ resumeCont c ()
releaseResource r =
Process $ \pid ->
Cont $ \c ->
Event $ \p ->
do f <- liftIO $ fmap isJust $ PQ.queueDeleteBy (resourceActingQueue r) (\item -> actingItemId item == pid)
if f
then do invokeEvent p $ updateResourceUtilisationCount r (1)
invokeEvent p $ releaseResource' r
invokeEvent p $ resumeCont c ()
else throwComp $
SimulationRetry
"The resource was not acquired by this process: releaseResource"
usingResourceWithPriority r priority m =
do requestResourceWithPriority r priority
finallyProcess m $ releaseResource r
incResourceCount r n
| n < 0 = throwEvent $ SimulationRetry "The increment cannot be negative: incResourceCount"
| n == 0 = return ()
| otherwise =
do releaseResource' r
incResourceCount r (n 1)
decResourceCount r n
| n < 0 = throwEvent $ SimulationRetry "The decrement cannot be negative: decResourceCount"
| n == 0 = return ()
| otherwise =
do decResourceCount' r
decResourceCount r (n 1)
alterResourceCount r n
| n < 0 = decResourceCount r ( n)
| n > 0 = incResourceCount r n
| n == 0 = return ()
data ResourceActingItem m =
ResourceActingItem { actingItemPriority :: Double,
actingItemId :: ProcessId m }
type ResourceAwaitingItem m = Either (ResourceRequestingItem m) (ResourcePreemptedItem m)
data ResourceRequestingItem m =
ResourceRequestingItem { requestingItemPriority :: Double,
requestingItemTime :: Double,
requestingItemId :: ProcessId m,
requestingItemCont :: FrozenCont m () }
data ResourcePreemptedItem m =
ResourcePreemptedItem { preemptedItemPriority :: Double,
preemptedItemTime :: Double,
preemptedItemId :: ProcessId m }
instance Eq (Resource IO) where
x == y = resourceCountRef x == resourceCountRef y
instance Eq (ResourceActingItem IO) where
x == y = actingItemId x == actingItemId y
releaseResource' :: Resource IO
-> Event IO ()
releaseResource' r =
Event $ \p ->
do a <- liftIO $ readIORef (resourceCountRef r)
let a' = a + 1
case resourceMaxCount r of
Just maxCount | a' > maxCount ->
throwComp $
SimulationRetry $
"The resource count cannot be greater than " ++
"its maximum value: releaseResource'."
_ ->
return ()
f <- liftIO $ PQ.queueNull (resourceWaitQueue r)
if f
then invokeEvent p $ updateResourceCount r 1
else do (priority', item) <- liftIO $ PQ.queueFront (resourceWaitQueue r)
liftIO $ PQ.dequeue (resourceWaitQueue r)
invokeEvent p $ updateResourceQueueCount r (1)
case item of
Left (ResourceRequestingItem priority t pid c) ->
do c <- invokeEvent p $ unfreezeCont c
case c of
Nothing ->
invokeEvent p $ releaseResource' r
Just c ->
do liftIO $ PQ.enqueue (resourceActingQueue r) ( priority) $ ResourceActingItem priority pid
invokeEvent p $ updateResourceWaitTime r (pointTime p t)
invokeEvent p $ updateResourceUtilisationCount r 1
invokeEvent p $ enqueueEvent (pointTime p) $ reenterCont c ()
Right (ResourcePreemptedItem priority t pid) ->
do f <- invokeEvent p $ processCancelled pid
case f of
True ->
invokeEvent p $ releaseResource' r
False ->
do liftIO $ PQ.enqueue (resourceActingQueue r) ( priority) $ ResourceActingItem priority pid
invokeEvent p $ updateResourceWaitTime r (pointTime p t)
invokeEvent p $ updateResourceUtilisationCount r 1
invokeEvent p $ processPreemptionEnd pid
decResourceCount' :: Resource IO -> Event IO ()
decResourceCount' r =
Event $ \p ->
do let t = pointTime p
a <- liftIO $ readIORef (resourceCountRef r)
when (a == 0) $
throwComp $
SimulationRetry
"The resource exceeded and its count is zero: decResourceCount'"
f <- liftIO $ PQ.queueNull (resourceActingQueue r)
unless f $
do (p0', item0) <- liftIO $ PQ.queueFront (resourceActingQueue r)
let p0 = p0'
pid0 = actingItemId item0
liftIO $ PQ.dequeue (resourceActingQueue r)
liftIO $ PQ.enqueue (resourceWaitQueue r) p0 (Right $ ResourcePreemptedItem p0 t pid0)
invokeEvent p $ processPreemptionBegin pid0
invokeEvent p $ updateResourceUtilisationCount r (1)
invokeEvent p $ updateResourceQueueCount r 1
invokeEvent p $ updateResourceCount r (1)
updateResourceCount :: Resource IO -> Int -> Event IO ()
updateResourceCount r delta =
Event $ \p ->
do a <- liftIO $ readIORef (resourceCountRef r)
let a' = a + delta
a' `seq` liftIO $ writeIORef (resourceCountRef r) a'
liftIO $
modifyIORef' (resourceCountStatsRef r) $
addTimingStats (pointTime p) a'
invokeEvent p $
triggerSignal (resourceCountSource r) a'
updateResourceQueueCount :: Resource IO -> Int -> Event IO ()
updateResourceQueueCount r delta =
Event $ \p ->
do a <- liftIO $ readIORef (resourceQueueCountRef r)
let a' = a + delta
a' `seq` liftIO $ writeIORef (resourceQueueCountRef r) a'
liftIO $
modifyIORef' (resourceQueueCountStatsRef r) $
addTimingStats (pointTime p) a'
invokeEvent p $
triggerSignal (resourceQueueCountSource r) a'
updateResourceUtilisationCount :: Resource IO -> Int -> Event IO ()
updateResourceUtilisationCount r delta =
Event $ \p ->
do a <- liftIO $ readIORef (resourceUtilisationCountRef r)
let a' = a + delta
a' `seq` liftIO $ writeIORef (resourceUtilisationCountRef r) a'
liftIO $
modifyIORef' (resourceUtilisationCountStatsRef r) $
addTimingStats (pointTime p) a'
invokeEvent p $
triggerSignal (resourceUtilisationCountSource r) a'
updateResourceWaitTime :: Resource IO -> Double -> Event IO ()
updateResourceWaitTime r delta =
Event $ \p ->
do a <- liftIO $ readIORef (resourceTotalWaitTimeRef r)
let a' = a + delta
a' `seq` liftIO $ writeIORef (resourceTotalWaitTimeRef r) a'
liftIO $
modifyIORef' (resourceWaitTimeRef r) $
addSamplingStats delta
invokeEvent p $
triggerSignal (resourceWaitTimeSource r) ()