module Eventloop.Core where
import Control.Exception
import Control.Concurrent
import Control.Concurrent.MVar
import Control.Concurrent.ExceptionCollection
import Control.Concurrent.Suspend.Lifted
import Control.Concurrent.Thread
import Control.Concurrent.Timer
import Control.Concurrent.Datastructures.BlockingConcurrentQueue
import Eventloop.System.DisplayExceptionThread
import Eventloop.System.EventloopThread
import Eventloop.System.InitializationThread
import Eventloop.System.OutRouterThread
import Eventloop.System.RetrieverThread
import Eventloop.System.SenderThread
import Eventloop.System.Setup
import Eventloop.System.TeardownThread
import Eventloop.Types.Events
import Eventloop.Types.Exception
import Eventloop.Types.System
startEventloopSystem :: EventloopSetupConfiguration progstateT
-> IO ()
startEventloopSystem setupConfig
= do
systemConfig <- setupEventloopSystemConfig setupConfig
systemConfig' <- catch (startInitializing systemConfig)
(\shutdownException -> do
logException (exceptions systemConfig) shutdownException
return systemConfig
)
failed <- hasExceptions (exceptions systemConfig')
case failed of
True -> return ()
False -> do
let
moduleConfigs_ = moduleConfigs systemConfig'
exceptions_ = exceptions systemConfig'
isStoppingM_ = isStoppingM systemConfig'
catch ( do
let
retrieverThreadActions = threadActionsBasedOnModule systemConfig' startRetrieving retrieverM moduleConfigs_
outRouterAction = startOutRouting systemConfig'
senderThreadActions = threadActionsBasedOnModule systemConfig' startSending senderConfigM moduleConfigs_
mapM_ (spawnWorkerThread systemConfig' registerRetrieverThread) retrieverThreadActions
spawnWorkerThread systemConfig' registerOutRouterThread outRouterAction
mapM_ (spawnWorkerThread systemConfig' registerSenderThread) senderThreadActions
startEventlooping systemConfig'
)
( \shutdownException -> do
swapMVar isStoppingM_ True
logException exceptions_ shutdownException
outRouter <- outRouterThread systemConfig'
let
eventloopConfig_ = eventloopConfig systemConfig'
outEventQueue_ = outEventQueue eventloopConfig_
putInBlockingConcurrentQueue outEventQueue_ Stop
threadDelay 1000000
senders <- senderThreads systemConfig'
senderTimers <- mapM (terminateWithinOrThrowException 2000000 (toException ShuttingDownException)) (outRouter ++ senders)
joinThreads (outRouter ++ senders)
mapM_ stopTimer senderTimers
retrievers <- retrieverThreads systemConfig'
mapM_ throwShutdownExceptionToThread retrievers
joinThreads retrievers
)
startTeardowning systemConfig'
startDisplayingExceptions systemConfig'
terminateWithinOrThrowException :: Int
-> SomeException
-> Thread
-> IO TimerIO
terminateWithinOrThrowException delay e t
= oneShotTimer ( do
term <- isTerminated t
case term of
True -> return ()
False -> throwTo (getThreadId t) e
)
((usDelay.fromIntegral) delay)
threadActionsBasedOnModule :: EventloopSystemConfiguration progstateT
-> (EventloopSystemConfiguration progstateT -> (EventloopModuleConfiguration, resource) -> IO ())
-> (EventloopModuleConfiguration -> Maybe resource)
-> [EventloopModuleConfiguration]
-> [IO ()]
threadActionsBasedOnModule _ _ _ [] = []
threadActionsBasedOnModule systemconfig action getResourceFunc (moduleConfig:mcs)
= case (getResourceFunc moduleConfig) of
Nothing -> otherThreadActions
(Just resource) -> (action systemconfig (moduleConfig, resource)):otherThreadActions
where
otherThreadActions = threadActionsBasedOnModule systemconfig action getResourceFunc mcs
spawnWorkerThread :: EventloopSystemConfiguration progstateT
-> (EventloopSystemConfiguration progstateT -> Thread -> IO ())
-> IO ()
-> IO ()
spawnWorkerThread systemconfig logAction action
= do
thread <- forkThread $ do
catch action
( \exception ->
case (fromException exception) of
(Just ShuttingDownException) ->
return ()
_ -> do
isStopping <- takeMVar isStoppingM_
putMVar isStoppingM_ True
case isStopping of
True -> do
case (fromException exception) of
(Just RequestShutdownException) -> return ()
_ -> logException exceptions_ exception
False -> throwTo systemTid exception
)
logAction systemconfig thread
where
exceptions_ = exceptions systemconfig
systemTid = systemThreadId systemconfig
isStoppingM_ = isStoppingM systemconfig
registerRetrieverThread :: EventloopSystemConfiguration progstateT
-> Thread
-> IO ()
registerRetrieverThread systemconfig thread
= do
retrieverThreads <- takeMVar retrieverThreadsM_
putMVar retrieverThreadsM_ (retrieverThreads ++ [thread])
where
retrieverThreadsM_ = retrieverThreadsM systemconfig
registerOutRouterThread :: EventloopSystemConfiguration progstateT
-> Thread
-> IO ()
registerOutRouterThread systemconfig thread
= putMVar (outRouterThreadM systemconfig) thread
registerSenderThread :: EventloopSystemConfiguration progstateT
-> Thread
-> IO ()
registerSenderThread systemconfig thread
= do
senderThreads <- takeMVar senderThreadsM_
putMVar senderThreadsM_ (senderThreads ++ [thread])
where
senderThreadsM_ = senderThreadsM systemconfig
throwShutdownExceptionToThread :: Thread -> IO ()
throwShutdownExceptionToThread thread
= throwTo (getThreadId thread) ShuttingDownException
allWorkerThreads :: EventloopSystemConfiguration progstateT
-> IO [Thread]
allWorkerThreads systemconfig
= do
retrievers <- retrieverThreads systemconfig
outRouter <- outRouterThread systemconfig
senders <- senderThreads systemconfig
return (retrievers ++ outRouter ++ senders)
retrieverThreads :: EventloopSystemConfiguration progstateT
-> IO [Thread]
retrieverThreads systemconfig
= readMVar retrieverThreadsM_
where
retrieverThreadsM_ = retrieverThreadsM systemconfig
outRouterThread :: EventloopSystemConfiguration progstateT
-> IO [Thread]
outRouterThread systemconfig
= do
hasNotOutRouterThread <- isEmptyMVar outRouterThreadM_
case hasNotOutRouterThread of
True -> return []
False -> do
outRouterThread <- readMVar outRouterThreadM_
return [outRouterThread]
where
outRouterThreadM_ = outRouterThreadM systemconfig
senderThreads :: EventloopSystemConfiguration progstateT
-> IO [Thread]
senderThreads systemconfig
= readMVar senderThreadsM_
where
senderThreadsM_ = senderThreadsM systemconfig