-- |This module can execute events at specified time. It uses a two thread -- system that allows the STM adding and deleting of new threads without -- requiring later IO actions. An ability to place arbitrary event preprocessing -- when adding each event exists, but in a course grained manner. This -- feature can be expanded on request. -- -- This is very like control-timeout, but was developed separately internal -- operation is similar with a thread sleeping via threadDelay and EventIds -- being based in part on expire time. It differs in that control-event is: -- * More complex -- * Requires initilization -- * Allows pure STM adding and removing of events (no post STM IO action) -- * Allows user control over event systems (can have more than one) -- * Allows events to run in event handler thread -- (advisable if thread spark is too expensive / computation is cheap) -- * No possible duplication of EventId (theoretical! no real advantage) -- -- On the other hand, a shim could be made to provide the -- control-timeout API with Control.Event running under the hood. module Control.Event ( EventId ,EventSystem ,noEvent ,initEventSystem ,addEvent ,addEventSTM ,cancelEvent ,cancelEventSTM ,evtSystemSize ) where import Prelude hiding (lookup) import Control.Concurrent (forkIO, myThreadId, ThreadId, threadDelay) import Control.Concurrent.STM import Control.Exception (throwDynTo, catchDyn, block, unblock) import Control.Monad (forever) import Data.Dynamic import Data.List (partition, deleteBy) import Data.Map (Map, empty, findMin, deleteFindMin, insertLookupWithKey, adjust, size, singleton, toList, insert, updateLookupWithKey, delete, lookup, fold) import System.Time (TimeDiff(..), ClockTime(..), diffClockTimes, getClockTime) import GHC.Conc type EventNumber = Int type EventSet = (EventNumber, Map EventNumber (IO ())) singletonSet :: (IO ()) -> EventSet singletonSet a = (1, singleton 0 a) -- |IDs the program can use to cancel previously scheduled events. data EventId = EvtId ClockTime EventNumber deriving (Eq, Ord, Show) -- |A value indicating there is no such event. noEvent = EvtId (TOD (-1) (-1)) (-1) -- |The event system can either be initilized and passed as state or a global -- system can be declared using gEvtSys = unsafePeformIO initEventSystem data EventSystem = EvtSys { esEvents :: TVar (Map ClockTime EventSet), -- Pending Events esThread :: TVar (Maybe ThreadId), -- Id of thread for TimerReset exceptions esAlarm :: TVar ClockTime, -- Time of soonest event esNewAlarm :: TVar Bool, -- An event w/ earlier expiration was added esExpired :: TVar [[EventSet]] } -- |The only way to get an event system is to initilize one, which sets internal TVars -- and sparks two threads (one to expire events, one to look if you've added an -- event expiring before the current alarm). initEventSystem :: IO EventSystem initEventSystem = do evts <- newTVarIO empty tid <- newTVarIO Nothing alm <- newTVarIO (TOD (-1) (-1)) new <- newTVarIO False exp <- newTVarIO [] let evtSys = EvtSys evts tid alm new exp forkIO $ forever $ trackAlarm evtSys forkIO $ forever $ monitorExpiredQueue exp forkIO $ expireEvents evtSys return evtSys -- |Main thread that delays till the alarm time then executes any expired events. -- Asynchronous 'TimerReset' exceptions might occur to indicate a new, earlier, alarm time. expireEvents :: EventSystem -> IO () expireEvents es = do block (do tid <- myThreadId forever $ catchDyn (unblock (setTID (Just tid) es >> expireEvents' es)) (\TimerReset -> return ()) ) where setTID i es = atomically (writeTVar (esThread es) i) -- |Worker function for expireEvents - the parent simply catches the exceptions expireEvents' :: EventSystem -> IO () expireEvents' evtSys = do usDelay <- determineDelay threadDelay usDelay runExpire evtSys where determineDelay :: IO Int determineDelay = do alm <- atomically (do evts <- readTVar (esEvents evtSys) case findMinM evts of Nothing -> retry Just (c,_) -> return c ) now <- getClockTime return $ timeDiffToMicroSec $ diffClockTimes alm now findMinM :: Map ClockTime EventSet -> Maybe (ClockTime,EventSet) findMinM m | size m == 0 = Nothing | otherwise = Just $ findMin m -- |Determines which events are expired, running all their actions runExpire :: EventSystem -> IO () runExpire evtSys = do now <- getClockTime atomically (do evts <- readTVar (esEvents evtSys) let (exp, newMap) = getEarlierKeys now evts newAlarm = getAlarm newMap writeTVar (esAlarm evtSys) newAlarm writeTVar (esEvents evtSys) newMap if (fold (\(_,m) n -> size m + n) 0 newMap) + sum (map (\(_,m) -> size m) exp) /= (fold (\(_,m) n -> size m + n) 0 evts) then unsafeIOToSTM $ putStrLn "Expire is dropping events." else return () exps <- readTVar (esExpired evtSys) writeTVar (esExpired evtSys) (exp:exps) ) where getEarlierKeys :: ClockTime -> Map ClockTime EventSet -> ([EventSet], Map ClockTime EventSet) getEarlierKeys clk m = case deleteFindMinM m of Just ((k,es), m') -> if k < clk then let (exp, lastMap) = getEarlierKeys clk m' in (es:exp, lastMap) else ([], m) Nothing -> ([], m) getAlarm m | size m == 0 = TOD (-1) (-1) | otherwise = fst $ findMin m deleteFindMinM :: Map k a -> Maybe ((k, a), Map k a) deleteFindMinM m = if size m == 0 then Nothing else Just (deleteFindMin m) -- |Execute expired events monitorExpiredQueue :: TVar [[EventSet]] -> IO () monitorExpiredQueue exp = do exp <- atomically (do e <- readTVar exp case e of (a:as) -> writeTVar exp [] >> return e _ -> retry ) mapM_ (mapM_ runEvents) exp -- |Runs all provided events (which must have expired) runEvents :: EventSet -> IO () runEvents (_,set) = do let actions = map snd (toList set) -- mapM_ forkIO actions sequence_ actions -- FIXME why does this line work and not the top! -- |Add an *action* to be performed at *time* by *system*, returning a unique id. addEvent :: EventSystem -> ClockTime -> IO () -> IO EventId addEvent sys clk act = atomically (addEventSTM sys clk act) -- |Atomically add an action to be performed at specified time and returning a unique id. addEventSTM :: EventSystem -> ClockTime -> IO () -> STM EventId addEventSTM sys clk act = do evts <- readTVar (esEvents sys) let (old, newMap) = insertLookupWithKey (\_ _ o -> insertEvent o) clk (singletonSet act) evts num = case old of Nothing -> 0 Just (n,_) -> n eid = EvtId clk num writeTVar (esEvents sys) newMap alm <- readTVar (esAlarm sys) if clk < alm || alm == (TOD (-1) (-1)) then writeTVar (esAlarm sys) clk >> writeTVar (esNewAlarm sys) True else return () if fold (\(_,m) n -> n + size m) 0 newMap /= fold (\(_,m) n -> n + size m) 1 evts then unsafeIOToSTM $ putStrLn "Event mapping has not grown!" else return () return eid where insertEvent :: EventSet -> EventSet insertEvent (num,set) | num == maxBound = error "maxBound events at given time, something is broken." | otherwise = (num+1, insert num act set) -- |Cancel an event from the system, returning True on success. cancelEvent :: EventSystem -> EventId -> IO Bool cancelEvent sys eid = atomically (cancelEventSTM sys eid) -- |Atomically cancel an event from the system, returning True on success. cancelEventSTM :: EventSystem -> EventId -> STM Bool cancelEventSTM sys eid@(EvtId clk num) = do evts <- readTVar (esEvents sys) let newMap :: Map ClockTime EventSet prev :: Maybe EventSet (prev,newMap) = updateLookupWithKey (\_ (num, old) -> Just (num,delete num old)) clk evts ret = case prev of Nothing -> False -- error "Canceling an event that never existed." Just (_,p) -> case lookup clk newMap of Nothing -> False Just (_,m) -> (size p /= size m) writeTVar (esEvents sys) newMap return ret evtSystemSize :: EventSystem -> STM Int evtSystemSize sys = do evts <- readTVar (esEvents sys) return $ fold (\(_,m) n -> n + size m) 0 evts -- |Tracks the alarm time and the earliest event. If an earlier event is added -- the alarm time is updated and TimerReset is thrown to the expireEvent thread trackAlarm :: EventSystem -> IO () trackAlarm sys = do tid <- atomically (do newAlm <- readTVar (esNewAlarm sys) if newAlm then writeTVar (esNewAlarm sys) False else retry tid <- readTVar (esThread sys) i <- case tid of Just i -> return i Nothing -> retry return i ) throwDynTo tid TimerReset -- |Returns the time difference in microseconds (potentially returning maxBound <= the real difference) timeDiffToMicroSec :: TimeDiff -> Int timeDiffToMicroSec (TimeDiff _ _ _ _ _ sec picosec) = if realTime > fromIntegral (maxBound :: Int) then maxBound else fromIntegral realTime where realTime :: Integer realTime = (fromIntegral sec) * (10^6) + fromIntegral (picosec `div` (10^6)) data TimerReset = TimerReset deriving (Eq, Ord, Show, Typeable)