-- | A gang consisting of a fixed number of threads that can run actions in parallel.
--   Good for constructing parallel test frameworks.
module BuildBox.Control.Gang
        ( Gang
        , GangState(..)

        , forkGangActions
        , joinGang
        , pauseGang
        , resumeGang
        , flushGang
        , killGang
        
        , getGangState
        , waitForGangState)
where
import Control.Concurrent
import Data.IORef
import qualified Data.Set       as Set
import Data.Set                 (Set)

-- Gang -----------------------------------------------------------------------
-- | Abstract gang of threads.
data Gang
        = Gang 
        { gangThreads           :: Int
        , gangThreadsAvailable  :: QSemN
        , gangState             :: IORef GangState
        , gangActionsRunning    :: IORef Int 
        , gangThreadsRunning    :: IORef (Set ThreadId) }


data GangState
        = -- | Gang is running and starting new actions.
          GangRunning

        -- | Gang may be running already started actions, 
        --   but no new ones are being started.
        | GangPaused

        -- | Gang is waiting for currently running actions to finish, 
        --   but not starting new ones.
        | GangFlushing

        -- | Gang is finished, all the actions have completed.
        | GangFinished
        
        -- | Gang was killed, all the threads are dead (or dying).
        | GangKilled
        deriving (Show, Eq)


-- | Get the state of a gang.
getGangState :: Gang -> IO GangState
getGangState gang
        = readIORef (gangState gang)


-- | Block until all actions have finished executing,
--   or the gang is killed.
joinGang :: Gang -> IO ()
joinGang gang
 = do   state   <- readIORef (gangState gang)
        if state == GangFinished || state == GangKilled
         then return ()
         else do
                threadDelay 1000
                joinGang gang


-- | Block until already started actions have completed, but don't start any more.
--   Gang state changes to `GangFlushing`.
flushGang :: Gang -> IO ()
flushGang gang
 = do   writeIORef (gangState gang) GangFlushing
        waitForGangState gang GangFinished


-- | Pause a gang. Actions that have already been started continue to run, 
--   but no more will be started until a `resumeGang` command is issued.
--   Gang state changes to `GangPaused`.
pauseGang :: Gang -> IO ()
pauseGang gang
        = writeIORef (gangState gang) GangPaused


-- | Resume a paused gang, which allows it to continue starting new actions.
--   Gang state changes to `GangRunning`.
resumeGang :: Gang -> IO ()
resumeGang gang
        = writeIORef (gangState gang) GangRunning


-- | Kill all the threads in a gang.
--   Gang stage changes to `GangKilled`.
killGang :: Gang -> IO ()
killGang gang
 = do   writeIORef (gangState gang) GangKilled
        tids    <- readIORef (gangThreadsRunning gang) 
        mapM_ killThread $ Set.toList tids


-- | Block until the gang is in the given state.
waitForGangState :: Gang -> GangState -> IO ()
waitForGangState gang waitState
 = do   state   <- readIORef (gangState gang)
        if state == waitState
         then return ()
         else do
                threadDelay 1000
                waitForGangState gang waitState


-- | Fork a new gang to run the given actions.
--   This function returns immediately, with the gang executing in the background.
--   Gang state starts as `GangRunning` then transitions to `GangFinished`.
--   To block until all the actions are finished use `joinGang`.
forkGangActions
        :: Int                  -- ^ Number of worker threads in the gang \/ maximum number
                                --   of actions to execute concurrenty.
        -> [IO ()]              -- ^ Actions to run. They are started in-order, but may finish
                                --   out-of-order depending on the run time of the individual action.
        -> IO Gang

forkGangActions threads actions
 = do   semThreads              <- newQSemN threads
        refState                <- newIORef GangRunning
        refActionsRunning       <- newIORef 0
        refThreadsRunning       <- newIORef (Set.empty)
        let gang        
                = Gang
                { gangThreads           = threads
                , gangThreadsAvailable  = semThreads 
                , gangState             = refState
                , gangActionsRunning    = refActionsRunning 
                , gangThreadsRunning    = refThreadsRunning }

        _ <- forkIO $ gangLoop gang actions
        return gang
        

-- | Run actions on a gang.
gangLoop :: Gang -> [IO ()] -> IO ()
gangLoop gang []
 = do   -- Wait for all the threads to finish.
        waitQSemN 
                (gangThreadsAvailable gang) 
                (gangThreads gang)
                
        -- Signal that the gang is finished running actions.
        writeIORef (gangState gang) GangFinished


gangLoop gang actions@(action:actionsRest)
 = do   state   <- readIORef (gangState gang)
        case state of
         GangRunning 
          -> do -- Wait for a worker thread to become available.
                waitQSemN (gangThreadsAvailable gang) 1
                gangLoop_withWorker gang action actionsRest

         GangPaused
          -> do threadDelay 1000
                gangLoop gang actions
                        
         GangFlushing
          -> do actionsRunning  <- readIORef (gangActionsRunning gang)
                if actionsRunning == 0
                 then   writeIORef (gangState gang) GangFinished
                 else do        
                        threadDelay 1000
                        gangLoop gang []

         GangFinished   -> return ()
         GangKilled     -> return ()

-- we have an available worker
gangLoop_withWorker :: Gang -> IO () -> [IO ()] -> IO ()
gangLoop_withWorker gang action actionsRest
 = do   -- See if we're supposed to be starting actions or not.
        state   <- readIORef (gangState gang)
        case state of
         GangRunning
          -> do -- fork off the first action
                tid <- forkOS $ do
                        -- run the action (and wait for it to complete)
                        action

                        -- signal that a new worker is available
                        signalQSemN (gangThreadsAvailable gang) 1
                        
                        -- remove our ThreadId from the set of running ThreadIds.
                        tid     <- myThreadId
                        atomicModifyIORef (gangThreadsRunning gang)
                                (\tids -> (Set.delete tid tids, ()))
        
                -- Add the ThreadId of the freshly forked thread to the set
                -- of running ThreadIds. We'll need this set if we want to kill
                -- the gang.
                atomicModifyIORef (gangThreadsRunning gang)
                        (\tids -> (Set.insert tid tids, ()))
        
                -- handle the rest of the actions.
                gangLoop gang actionsRest

         -- someone issued flush or pause command while we
         -- were waiting for a worker, so don't start next action.
         _ -> do
                signalQSemN (gangThreadsAvailable gang) 1
                gangLoop gang (action:actionsRest)