module BuildBox.Control.Gang
( Gang
, GangState(..)
, forkGangActions
, joinGang
, pauseGang
, resumeGang
, flushGang
, killGang
, getGangState
, waitForGangState)
where
import Control.Concurrent
import Data.IORef
import qualified Data.Set as Set
import Data.Set (Set)
data Gang
= Gang
{ gangThreads :: Int
, gangThreadsAvailable :: QSemN
, gangState :: IORef GangState
, gangActionsRunning :: IORef Int
, gangThreadsRunning :: IORef (Set ThreadId) }
data GangState
=
GangRunning
| GangPaused
| GangFlushing
| GangFinished
| GangKilled
deriving (Show, Eq)
getGangState :: Gang -> IO GangState
getGangState gang
= readIORef (gangState gang)
joinGang :: Gang -> IO ()
joinGang gang
= do state <- readIORef (gangState gang)
if state == GangFinished || state == GangKilled
then return ()
else do
threadDelay 1000
joinGang gang
flushGang :: Gang -> IO ()
flushGang gang
= do writeIORef (gangState gang) GangFlushing
waitForGangState gang GangFinished
pauseGang :: Gang -> IO ()
pauseGang gang
= writeIORef (gangState gang) GangPaused
resumeGang :: Gang -> IO ()
resumeGang gang
= writeIORef (gangState gang) GangRunning
killGang :: Gang -> IO ()
killGang gang
= do writeIORef (gangState gang) GangKilled
tids <- readIORef (gangThreadsRunning gang)
mapM_ killThread $ Set.toList tids
waitForGangState :: Gang -> GangState -> IO ()
waitForGangState gang waitState
= do state <- readIORef (gangState gang)
if state == waitState
then return ()
else do
threadDelay 1000
waitForGangState gang waitState
forkGangActions
:: Int
-> [IO ()]
-> IO Gang
forkGangActions threads actions
= do semThreads <- newQSemN threads
refState <- newIORef GangRunning
refActionsRunning <- newIORef 0
refThreadsRunning <- newIORef (Set.empty)
let gang
= Gang
{ gangThreads = threads
, gangThreadsAvailable = semThreads
, gangState = refState
, gangActionsRunning = refActionsRunning
, gangThreadsRunning = refThreadsRunning }
_ <- forkIO $ gangLoop gang actions
return gang
gangLoop :: Gang -> [IO ()] -> IO ()
gangLoop gang []
= do
waitQSemN
(gangThreadsAvailable gang)
(gangThreads gang)
writeIORef (gangState gang) GangFinished
gangLoop gang actions@(action:actionsRest)
= do state <- readIORef (gangState gang)
case state of
GangRunning
-> do
waitQSemN (gangThreadsAvailable gang) 1
gangLoop_withWorker gang action actionsRest
GangPaused
-> do threadDelay 1000
gangLoop gang actions
GangFlushing
-> do actionsRunning <- readIORef (gangActionsRunning gang)
if actionsRunning == 0
then writeIORef (gangState gang) GangFinished
else do
threadDelay 1000
gangLoop gang []
GangFinished -> return ()
GangKilled -> return ()
gangLoop_withWorker :: Gang -> IO () -> [IO ()] -> IO ()
gangLoop_withWorker gang action actionsRest
= do
state <- readIORef (gangState gang)
case state of
GangRunning
-> do
tid <- forkOS $ do
action
signalQSemN (gangThreadsAvailable gang) 1
tid <- myThreadId
atomicModifyIORef (gangThreadsRunning gang)
(\tids -> (Set.delete tid tids, ()))
atomicModifyIORef (gangThreadsRunning gang)
(\tids -> (Set.insert tid tids, ()))
gangLoop gang actionsRest
_ -> do
signalQSemN (gangThreadsAvailable gang) 1
gangLoop gang (action:actionsRest)