{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE NumericUnderscores #-}

-- | Implementation of a jobserver using system semaphores.
--
--
module GHC.Driver.MakeSem
  ( -- * JSem: parallelism semaphore backed
    -- by a system semaphore (Posix/Windows)
    runJSemAbstractSem

  -- * System semaphores
  , Semaphore, SemaphoreName(..)

  -- * Abstract semaphores
  , AbstractSem(..)
  , withAbstractSem
  )
  where

import GHC.Prelude
import GHC.Conc
import GHC.Data.OrdList
import GHC.IO.Exception
import GHC.Utils.Outputable
import GHC.Utils.Panic
import GHC.Utils.Json

import System.Semaphore

import Control.Monad
import qualified Control.Monad.Catch as MC
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Data.Foldable
import Data.Functor
import GHC.Stack
import Debug.Trace

---------------------------------------
-- Semaphore jobserver

-- | A jobserver based off a system 'Semaphore'.
--
-- Keeps track of the pending jobs and resources
-- available from the semaphore.
data Jobserver
  = Jobserver
  { Jobserver -> Semaphore
jSemaphore :: !Semaphore
    -- ^ The semaphore which controls available resources
  , Jobserver -> TVar JobResources
jobs :: !(TVar JobResources)
    -- ^ The currently pending jobs, and the resources
    -- obtained from the semaphore
  }

data JobserverOptions
  = JobserverOptions
  { JobserverOptions -> Int
releaseDebounce    :: !Int
     -- ^ Minimum delay, in milliseconds, between acquiring a token
     -- and releasing a token.
  , JobserverOptions -> Int
setNumCapsDebounce :: !Int
    -- ^ Minimum delay, in milliseconds, between two consecutive
    -- calls of 'setNumCapabilities'.
  }

defaultJobserverOptions :: JobserverOptions
defaultJobserverOptions :: JobserverOptions
defaultJobserverOptions =
  JobserverOptions
    { releaseDebounce :: Int
releaseDebounce    = Int
1000 -- 1 second
    , setNumCapsDebounce :: Int
setNumCapsDebounce = Int
1000 -- 1 second
    }

-- | Resources available for running jobs, i.e.
-- tokens obtained from the parallelism semaphore.
data JobResources
  = Jobs
  { JobResources -> Int
tokensOwned :: !Int
    -- ^ How many tokens have been claimed from the semaphore
  , JobResources -> Int
tokensFree  :: !Int
    -- ^ How many tokens are not currently being used
  , JobResources -> OrdList (TMVar ())
jobsWaiting :: !(OrdList (TMVar ()))
    -- ^ Pending jobs waiting on a token, the job will be blocked on the TMVar so putting into
    -- the TMVar will allow the job to continue.
  }

instance Outputable JobResources where
  ppr :: JobResources -> SDoc
ppr Jobs{Int
OrdList (TMVar ())
tokensOwned :: JobResources -> Int
tokensFree :: JobResources -> Int
jobsWaiting :: JobResources -> OrdList (TMVar ())
tokensOwned :: Int
tokensFree :: Int
jobsWaiting :: OrdList (TMVar ())
..}
    = String -> SDoc
forall doc. IsLine doc => String -> doc
text String
"JobResources" SDoc -> SDoc -> SDoc
forall doc. IsLine doc => doc -> doc -> doc
<+>
        ( SDoc -> SDoc
forall doc. IsLine doc => doc -> doc
braces (SDoc -> SDoc) -> SDoc -> SDoc
forall a b. (a -> b) -> a -> b
$ [SDoc] -> SDoc
forall doc. IsLine doc => [doc] -> doc
hsep
          [ String -> SDoc
forall doc. IsLine doc => String -> doc
text String
"owned=" SDoc -> SDoc -> SDoc
forall doc. IsLine doc => doc -> doc -> doc
<> Int -> SDoc
forall a. Outputable a => a -> SDoc
ppr Int
tokensOwned
          , String -> SDoc
forall doc. IsLine doc => String -> doc
text String
"free=" SDoc -> SDoc -> SDoc
forall doc. IsLine doc => doc -> doc -> doc
<> Int -> SDoc
forall a. Outputable a => a -> SDoc
ppr Int
tokensFree
          , String -> SDoc
forall doc. IsLine doc => String -> doc
text String
"num_waiting=" SDoc -> SDoc -> SDoc
forall doc. IsLine doc => doc -> doc -> doc
<> Int -> SDoc
forall a. Outputable a => a -> SDoc
ppr (OrdList (TMVar ()) -> Int
forall a. OrdList a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length OrdList (TMVar ())
jobsWaiting)
          ] )

-- | Add one new token.
addToken :: JobResources -> JobResources
addToken :: JobResources -> JobResources
addToken jobs :: JobResources
jobs@( Jobs { tokensOwned :: JobResources -> Int
tokensOwned = Int
owned, tokensFree :: JobResources -> Int
tokensFree = Int
free })
  = JobResources
jobs { tokensOwned = owned + 1, tokensFree = free + 1 }

-- | Free one token.
addFreeToken :: JobResources -> JobResources
addFreeToken :: JobResources -> JobResources
addFreeToken jobs :: JobResources
jobs@( Jobs { tokensFree :: JobResources -> Int
tokensFree = Int
free })
  = Bool -> SDoc -> JobResources -> JobResources
forall a. HasCallStack => Bool -> SDoc -> a -> a
assertPpr (JobResources -> Int
tokensOwned JobResources
jobs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
free)
      (String -> SDoc
forall doc. IsLine doc => String -> doc
text String
"addFreeToken:" SDoc -> SDoc -> SDoc
forall doc. IsLine doc => doc -> doc -> doc
<+> Int -> SDoc
forall a. Outputable a => a -> SDoc
ppr (JobResources -> Int
tokensOwned JobResources
jobs) SDoc -> SDoc -> SDoc
forall doc. IsLine doc => doc -> doc -> doc
<+> Int -> SDoc
forall a. Outputable a => a -> SDoc
ppr Int
free)
  (JobResources -> JobResources) -> JobResources -> JobResources
forall a b. (a -> b) -> a -> b
$ JobResources
jobs { tokensFree = free + 1 }

-- | Use up one token.
removeFreeToken :: JobResources -> JobResources
removeFreeToken :: JobResources -> JobResources
removeFreeToken jobs :: JobResources
jobs@( Jobs { tokensFree :: JobResources -> Int
tokensFree = Int
free })
  = Bool -> SDoc -> JobResources -> JobResources
forall a. HasCallStack => Bool -> SDoc -> a -> a
assertPpr (Int
free Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0)
      (String -> SDoc
forall doc. IsLine doc => String -> doc
text String
"removeFreeToken:" SDoc -> SDoc -> SDoc
forall doc. IsLine doc => doc -> doc -> doc
<+> Int -> SDoc
forall a. Outputable a => a -> SDoc
ppr Int
free)
  (JobResources -> JobResources) -> JobResources -> JobResources
forall a b. (a -> b) -> a -> b
$ JobResources
jobs { tokensFree = free - 1 }

-- | Return one owned token.
removeOwnedToken :: JobResources -> JobResources
removeOwnedToken :: JobResources -> JobResources
removeOwnedToken jobs :: JobResources
jobs@( Jobs { tokensOwned :: JobResources -> Int
tokensOwned = Int
owned })
  = Bool -> SDoc -> JobResources -> JobResources
forall a. HasCallStack => Bool -> SDoc -> a -> a
assertPpr (Int
owned Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1)
      (String -> SDoc
forall doc. IsLine doc => String -> doc
text String
"removeOwnedToken:" SDoc -> SDoc -> SDoc
forall doc. IsLine doc => doc -> doc -> doc
<+> Int -> SDoc
forall a. Outputable a => a -> SDoc
ppr Int
owned)
  (JobResources -> JobResources) -> JobResources -> JobResources
forall a b. (a -> b) -> a -> b
$ JobResources
jobs { tokensOwned = owned - 1 }

-- | Add one new job to the end of the list of pending jobs.
addJob :: TMVar () -> JobResources -> JobResources
addJob :: TMVar () -> JobResources -> JobResources
addJob TMVar ()
job jobs :: JobResources
jobs@( Jobs { jobsWaiting :: JobResources -> OrdList (TMVar ())
jobsWaiting = OrdList (TMVar ())
wait })
  = JobResources
jobs { jobsWaiting = wait `SnocOL` job }

-- | The state of the semaphore job server.
data JobserverState
  = JobserverState
    { JobserverState -> JobserverAction
jobserverAction  :: !JobserverAction
      -- ^ The current action being performed by the
      -- job server.
    , JobserverState -> TVar Bool
canChangeNumCaps :: !(TVar Bool)
      -- ^ A TVar that signals whether it has been long
      -- enough since we last changed 'numCapabilities'.
    , JobserverState -> TVar Bool
canReleaseToken  :: !(TVar Bool)
      -- ^ A TVar that signals whether we last acquired
      -- a token long enough ago that we can now release
      -- a token.
    }
data JobserverAction
  -- | The jobserver is idle: no thread is currently
  -- interacting with the semaphore.
  = Idle
  -- | A thread is waiting for a token on the semaphore.
  | Acquiring
    { JobserverAction -> WaitId
activeWaitId   :: WaitId
    , JobserverAction -> TMVar (Maybe SomeException)
threadFinished :: TMVar (Maybe MC.SomeException) }

-- | Retrieve the 'TMVar' that signals if the current thread has finished,
-- if any thread is currently active in the jobserver.
activeThread_maybe :: JobserverAction -> Maybe (TMVar (Maybe MC.SomeException))
activeThread_maybe :: JobserverAction -> Maybe (TMVar (Maybe SomeException))
activeThread_maybe JobserverAction
Idle                                   = Maybe (TMVar (Maybe SomeException))
forall a. Maybe a
Nothing
activeThread_maybe (Acquiring { threadFinished :: JobserverAction -> TMVar (Maybe SomeException)
threadFinished = TMVar (Maybe SomeException)
tmvar }) = TMVar (Maybe SomeException) -> Maybe (TMVar (Maybe SomeException))
forall a. a -> Maybe a
Just TMVar (Maybe SomeException)
tmvar

-- | Whether we should try to acquire a new token from the semaphore:
-- there is a pending job and no free tokens.
guardAcquire :: JobResources -> Bool
guardAcquire :: JobResources -> Bool
guardAcquire ( Jobs { Int
tokensFree :: JobResources -> Int
tokensFree :: Int
tokensFree, OrdList (TMVar ())
jobsWaiting :: JobResources -> OrdList (TMVar ())
jobsWaiting :: OrdList (TMVar ())
jobsWaiting } )
  = Int
tokensFree Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 Bool -> Bool -> Bool
&& Bool -> Bool
not (OrdList (TMVar ()) -> Bool
forall a. OrdList a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null OrdList (TMVar ())
jobsWaiting)

-- | Whether we should release a token from the semaphore:
-- there are no pending jobs and we can release a token.
guardRelease :: JobResources -> Bool
guardRelease :: JobResources -> Bool
guardRelease ( Jobs { Int
tokensFree :: JobResources -> Int
tokensFree :: Int
tokensFree, Int
tokensOwned :: JobResources -> Int
tokensOwned :: Int
tokensOwned, OrdList (TMVar ())
jobsWaiting :: JobResources -> OrdList (TMVar ())
jobsWaiting :: OrdList (TMVar ())
jobsWaiting } )
  = OrdList (TMVar ()) -> Bool
forall a. OrdList a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null OrdList (TMVar ())
jobsWaiting Bool -> Bool -> Bool
&& Int
tokensFree Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 Bool -> Bool -> Bool
&& Int
tokensOwned Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1

---------------------------------------
-- Semaphore jobserver implementation

-- | Add one pending job to the jobserver.
--
-- Blocks, waiting on the jobserver to supply a free token.
acquireJob :: TVar JobResources -> IO ()
acquireJob :: TVar JobResources -> IO ()
acquireJob TVar JobResources
jobs_tvar = do
  (TMVar ()
job_tmvar, JobResources
_jobs0) <- String
-> STM ((TMVar (), JobResources), Maybe JobResources)
-> IO (TMVar (), JobResources)
forall a. String -> STM (a, Maybe JobResources) -> IO a
tracedAtomically String
"acquire" (STM ((TMVar (), JobResources), Maybe JobResources)
 -> IO (TMVar (), JobResources))
-> STM ((TMVar (), JobResources), Maybe JobResources)
-> IO (TMVar (), JobResources)
forall a b. (a -> b) -> a -> b
$
    TVar JobResources
-> (JobResources -> STM ((TMVar (), JobResources), JobResources))
-> STM ((TMVar (), JobResources), Maybe JobResources)
forall a.
HasCallStack =>
TVar JobResources
-> (JobResources -> STM (a, JobResources))
-> STM (a, Maybe JobResources)
modifyJobResources TVar JobResources
jobs_tvar \ JobResources
jobs -> do
      TMVar ()
job_tmvar <- STM (TMVar ())
forall a. STM (TMVar a)
newEmptyTMVar
      ((TMVar (), JobResources), JobResources)
-> STM ((TMVar (), JobResources), JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ((TMVar ()
job_tmvar, JobResources
jobs), TMVar () -> JobResources -> JobResources
addJob TMVar ()
job_tmvar JobResources
jobs)
  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar () -> STM ()
forall a. TMVar a -> STM a
takeTMVar TMVar ()
job_tmvar

-- | Signal to the job server that one job has completed,
-- releasing its corresponding token.
releaseJob :: TVar JobResources -> IO ()
releaseJob :: TVar JobResources -> IO ()
releaseJob TVar JobResources
jobs_tvar = do
  String -> STM ((), Maybe JobResources) -> IO ()
forall a. String -> STM (a, Maybe JobResources) -> IO a
tracedAtomically String
"release" do
    TVar JobResources
-> (JobResources -> STM ((), JobResources))
-> STM ((), Maybe JobResources)
forall a.
HasCallStack =>
TVar JobResources
-> (JobResources -> STM (a, JobResources))
-> STM (a, Maybe JobResources)
modifyJobResources TVar JobResources
jobs_tvar \ JobResources
jobs -> do
      Bool -> SDoc -> STM ()
forall (m :: * -> *).
(HasCallStack, Applicative m) =>
Bool -> SDoc -> m ()
massertPpr (JobResources -> Int
tokensFree JobResources
jobs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< JobResources -> Int
tokensOwned JobResources
jobs)
        (String -> SDoc
forall doc. IsLine doc => String -> doc
text String
"releaseJob: more free jobs than owned jobs!")
      ((), JobResources) -> STM ((), JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ((), JobResources -> JobResources
addFreeToken JobResources
jobs)


-- | Release all tokens owned from the semaphore (to clean up
-- the jobserver at the end).
cleanupJobserver :: Jobserver -> IO ()
cleanupJobserver :: Jobserver -> IO ()
cleanupJobserver (Jobserver { jSemaphore :: Jobserver -> Semaphore
jSemaphore = Semaphore
sem
                            , jobs :: Jobserver -> TVar JobResources
jobs       = TVar JobResources
jobs_tvar })
  = do
    Jobs { tokensOwned :: JobResources -> Int
tokensOwned = Int
owned } <- TVar JobResources -> IO JobResources
forall a. TVar a -> IO a
readTVarIO TVar JobResources
jobs_tvar
    let toks_to_release :: Int
toks_to_release = Int
owned Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
      -- Subtract off the implicit token: whoever spawned the ghc process
      -- in the first place is responsible for that token.
    Semaphore -> Int -> IO ()
releaseSemaphore Semaphore
sem Int
toks_to_release

-- | Dispatch the available tokens acquired from the semaphore
-- to the pending jobs in the job server.
dispatchTokens :: JobResources -> STM JobResources
dispatchTokens :: JobResources -> STM JobResources
dispatchTokens jobs :: JobResources
jobs@( Jobs { tokensFree :: JobResources -> Int
tokensFree = Int
toks_free, jobsWaiting :: JobResources -> OrdList (TMVar ())
jobsWaiting = OrdList (TMVar ())
wait } )
  | Int
toks_free Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
  , TMVar ()
next `ConsOL` OrdList (TMVar ())
rest <- OrdList (TMVar ())
wait
  -- There's a pending job and a free token:
  -- pass on the token to that job, and recur.
  = do
      TMVar () -> () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar ()
next ()
      let jobs' :: JobResources
jobs' = JobResources
jobs { tokensFree = toks_free - 1, jobsWaiting = rest }
      JobResources -> STM JobResources
dispatchTokens JobResources
jobs'
  | Bool
otherwise
  = JobResources -> STM JobResources
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return JobResources
jobs

-- | Update the available resources used from a semaphore, dispatching
-- any newly acquired resources.
--
-- Invariant: if the number of available resources decreases, there
-- must be no pending jobs.
--
-- All modifications should go through this function to ensure the contents
-- of the 'TVar' remains in normal form.
modifyJobResources :: HasCallStack => TVar JobResources
                   -> (JobResources -> STM (a, JobResources))
                   -> STM (a, Maybe JobResources)
modifyJobResources :: forall a.
HasCallStack =>
TVar JobResources
-> (JobResources -> STM (a, JobResources))
-> STM (a, Maybe JobResources)
modifyJobResources TVar JobResources
jobs_tvar JobResources -> STM (a, JobResources)
action = do
  JobResources
old_jobs  <- TVar JobResources -> STM JobResources
forall a. TVar a -> STM a
readTVar TVar JobResources
jobs_tvar
  (a
a, JobResources
jobs) <- JobResources -> STM (a, JobResources)
action JobResources
old_jobs

  -- Check the invariant: if the number of free tokens has decreased,
  -- there must be no pending jobs.
  Bool -> SDoc -> STM ()
forall (m :: * -> *).
(HasCallStack, Applicative m) =>
Bool -> SDoc -> m ()
massertPpr (OrdList (TMVar ()) -> Bool
forall a. OrdList a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (JobResources -> OrdList (TMVar ())
jobsWaiting JobResources
jobs) Bool -> Bool -> Bool
|| JobResources -> Int
tokensFree JobResources
jobs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= JobResources -> Int
tokensFree JobResources
old_jobs) (SDoc -> STM ()) -> SDoc -> STM ()
forall a b. (a -> b) -> a -> b
$
    [SDoc] -> SDoc
forall doc. IsDoc doc => [doc] -> doc
vcat [ String -> SDoc
forall doc. IsLine doc => String -> doc
text String
"modiyJobResources: pending jobs but fewer free tokens" ]
  JobResources
dispatched_jobs <- JobResources -> STM JobResources
dispatchTokens JobResources
jobs
  TVar JobResources -> JobResources -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar JobResources
jobs_tvar JobResources
dispatched_jobs
  (a, Maybe JobResources) -> STM (a, Maybe JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, JobResources -> Maybe JobResources
forall a. a -> Maybe a
Just JobResources
dispatched_jobs)


tracedAtomically_ :: String -> STM (Maybe JobResources) -> IO ()
tracedAtomically_ :: String -> STM (Maybe JobResources) -> IO ()
tracedAtomically_ String
s STM (Maybe JobResources)
act = String -> STM ((), Maybe JobResources) -> IO ()
forall a. String -> STM (a, Maybe JobResources) -> IO a
tracedAtomically String
s (((),) (Maybe JobResources -> ((), Maybe JobResources))
-> STM (Maybe JobResources) -> STM ((), Maybe JobResources)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (Maybe JobResources)
act)

tracedAtomically :: String -> STM (a, Maybe JobResources) -> IO a
tracedAtomically :: forall a. String -> STM (a, Maybe JobResources) -> IO a
tracedAtomically String
origin STM (a, Maybe JobResources)
act = do
  (a
a, Maybe JobResources
mjr) <- STM (a, Maybe JobResources) -> IO (a, Maybe JobResources)
forall a. STM a -> IO a
atomically STM (a, Maybe JobResources)
act
  Maybe JobResources -> (JobResources -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe JobResources
mjr ((JobResources -> IO ()) -> IO ())
-> (JobResources -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ JobResources
jr -> do
    -- Use the "jsem:" prefix to identify where the write traces are
    String -> IO ()
traceEventIO (String
"jsem:" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String -> JobResources -> String
renderJobResources String
origin JobResources
jr)
  a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a

renderJobResources :: String -> JobResources -> String
renderJobResources :: String -> JobResources -> String
renderJobResources String
origin (Jobs Int
own Int
free OrdList (TMVar ())
pending) = SDoc -> String
showSDocUnsafe (SDoc -> String) -> SDoc -> String
forall a b. (a -> b) -> a -> b
$ JsonDoc -> SDoc
renderJSON (JsonDoc -> SDoc) -> JsonDoc -> SDoc
forall a b. (a -> b) -> a -> b
$
  [(String, JsonDoc)] -> JsonDoc
JSObject [ (String
"name", String -> JsonDoc
JSString String
origin)
           , (String
"owned", Int -> JsonDoc
JSInt Int
own)
           , (String
"free", Int -> JsonDoc
JSInt Int
free)
           , (String
"pending", Int -> JsonDoc
JSInt (OrdList (TMVar ()) -> Int
forall a. OrdList a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length OrdList (TMVar ())
pending) )
           ]


-- | Spawn a new thread that waits on the semaphore in order to acquire
-- an additional token.
acquireThread :: Jobserver -> IO JobserverAction
acquireThread :: Jobserver -> IO JobserverAction
acquireThread (Jobserver { jSemaphore :: Jobserver -> Semaphore
jSemaphore = Semaphore
sem, jobs :: Jobserver -> TVar JobResources
jobs = TVar JobResources
jobs_tvar }) = do
    TMVar (Maybe SomeException)
threadFinished_tmvar <- IO (TMVar (Maybe SomeException))
forall a. IO (TMVar a)
newEmptyTMVarIO
    let
      wait_result_action :: Either MC.SomeException Bool -> IO ()
      wait_result_action :: Either SomeException Bool -> IO ()
wait_result_action Either SomeException Bool
wait_res =
        String -> STM (Maybe JobResources) -> IO ()
tracedAtomically_ String
"acquire_thread" do
          (Maybe SomeException
r, Maybe JobResources
jb) <- case Either SomeException Bool
wait_res of
            Left (SomeException
e :: MC.SomeException) -> do
              (Maybe SomeException, Maybe JobResources)
-> STM (Maybe SomeException, Maybe JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ((Maybe SomeException, Maybe JobResources)
 -> STM (Maybe SomeException, Maybe JobResources))
-> (Maybe SomeException, Maybe JobResources)
-> STM (Maybe SomeException, Maybe JobResources)
forall a b. (a -> b) -> a -> b
$ (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e, Maybe JobResources
forall a. Maybe a
Nothing)
            Right Bool
success -> do
              if Bool
success
                then do
                  TVar JobResources
-> (JobResources -> STM (Maybe SomeException, JobResources))
-> STM (Maybe SomeException, Maybe JobResources)
forall a.
HasCallStack =>
TVar JobResources
-> (JobResources -> STM (a, JobResources))
-> STM (a, Maybe JobResources)
modifyJobResources TVar JobResources
jobs_tvar \ JobResources
jobs ->
                    (Maybe SomeException, JobResources)
-> STM (Maybe SomeException, JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe SomeException
forall a. Maybe a
Nothing, JobResources -> JobResources
addToken JobResources
jobs)
                else
                  (Maybe SomeException, Maybe JobResources)
-> STM (Maybe SomeException, Maybe JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe SomeException
forall a. Maybe a
Nothing, Maybe JobResources
forall a. Maybe a
Nothing)
          TMVar (Maybe SomeException) -> Maybe SomeException -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe SomeException)
threadFinished_tmvar Maybe SomeException
r
          Maybe JobResources -> STM (Maybe JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe JobResources
jb
    WaitId
wait_id <- Semaphore -> (Either SomeException Bool -> IO ()) -> IO WaitId
forkWaitOnSemaphoreInterruptible Semaphore
sem Either SomeException Bool -> IO ()
wait_result_action
    ThreadId -> String -> IO ()
labelThread (WaitId -> ThreadId
waitingThreadId WaitId
wait_id) String
"acquire_thread"
    JobserverAction -> IO JobserverAction
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (JobserverAction -> IO JobserverAction)
-> JobserverAction -> IO JobserverAction
forall a b. (a -> b) -> a -> b
$ Acquiring { activeWaitId :: WaitId
activeWaitId   = WaitId
wait_id
                       , threadFinished :: TMVar (Maybe SomeException)
threadFinished = TMVar (Maybe SomeException)
threadFinished_tmvar }

-- | Spawn a thread to release ownership of one resource from the semaphore,
-- provided we have spare resources and no pending jobs.
releaseThread :: Jobserver -> IO JobserverAction
releaseThread :: Jobserver -> IO JobserverAction
releaseThread (Jobserver { jSemaphore :: Jobserver -> Semaphore
jSemaphore = Semaphore
sem, jobs :: Jobserver -> TVar JobResources
jobs = TVar JobResources
jobs_tvar }) = do
  TMVar (Maybe SomeException)
threadFinished_tmvar <- IO (TMVar (Maybe SomeException))
forall a. IO (TMVar a)
newEmptyTMVarIO
  IO JobserverAction -> IO JobserverAction
forall (m :: * -> *) a. (HasCallStack, MonadMask m) => m a -> m a
MC.mask_ do
    -- Pre-release the resource so that another thread doesn't take control of it
    -- just as we release the lock on the semaphore.
    Bool
still_ok_to_release
      <- String -> STM (Bool, Maybe JobResources) -> IO Bool
forall a. String -> STM (a, Maybe JobResources) -> IO a
tracedAtomically String
"pre_release" (STM (Bool, Maybe JobResources) -> IO Bool)
-> STM (Bool, Maybe JobResources) -> IO Bool
forall a b. (a -> b) -> a -> b
$
         TVar JobResources
-> (JobResources -> STM (Bool, JobResources))
-> STM (Bool, Maybe JobResources)
forall a.
HasCallStack =>
TVar JobResources
-> (JobResources -> STM (a, JobResources))
-> STM (a, Maybe JobResources)
modifyJobResources TVar JobResources
jobs_tvar \ JobResources
jobs ->
           if JobResources -> Bool
guardRelease JobResources
jobs
               -- TODO: should this also debounce?
           then (Bool, JobResources) -> STM (Bool, JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True , JobResources -> JobResources
removeOwnedToken (JobResources -> JobResources) -> JobResources -> JobResources
forall a b. (a -> b) -> a -> b
$ JobResources -> JobResources
removeFreeToken JobResources
jobs)
           else (Bool, JobResources) -> STM (Bool, JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
False, JobResources
jobs)
    if Bool -> Bool
not Bool
still_ok_to_release
    then JobserverAction -> IO JobserverAction
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return JobserverAction
Idle
    else do
      ThreadId
tid <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
        Either SomeException ()
x <- IO () -> IO (Either SomeException ())
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ Semaphore -> Int -> IO ()
releaseSemaphore Semaphore
sem Int
1
        String -> STM (Maybe JobResources) -> IO ()
tracedAtomically_ String
"post-release" (STM (Maybe JobResources) -> IO ())
-> STM (Maybe JobResources) -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          (Maybe SomeException
r, Maybe JobResources
jobs) <- case Either SomeException ()
x of
            Left (SomeException
e :: MC.SomeException) -> do
              TVar JobResources
-> (JobResources -> STM (Maybe SomeException, JobResources))
-> STM (Maybe SomeException, Maybe JobResources)
forall a.
HasCallStack =>
TVar JobResources
-> (JobResources -> STM (a, JobResources))
-> STM (a, Maybe JobResources)
modifyJobResources TVar JobResources
jobs_tvar \ JobResources
jobs ->
                (Maybe SomeException, JobResources)
-> STM (Maybe SomeException, JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e, JobResources -> JobResources
addToken JobResources
jobs)
            Right ()
_ -> do
              (Maybe SomeException, Maybe JobResources)
-> STM (Maybe SomeException, Maybe JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe SomeException
forall a. Maybe a
Nothing, Maybe JobResources
forall a. Maybe a
Nothing)
          TMVar (Maybe SomeException) -> Maybe SomeException -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe SomeException)
threadFinished_tmvar Maybe SomeException
r
          Maybe JobResources -> STM (Maybe JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe JobResources
jobs
      ThreadId -> String -> IO ()
labelThread ThreadId
tid String
"release_thread"
      JobserverAction -> IO JobserverAction
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return JobserverAction
Idle

-- | When there are pending jobs but no free tokens,
-- spawn a thread to acquire a new token from the semaphore.
--
-- See 'acquireThread'.
tryAcquire :: JobserverOptions
           -> Jobserver
           -> JobserverState
           -> STM (IO JobserverState)
tryAcquire :: JobserverOptions
-> Jobserver -> JobserverState -> STM (IO JobserverState)
tryAcquire JobserverOptions
opts js :: Jobserver
js@( Jobserver { jobs :: Jobserver -> TVar JobResources
jobs = TVar JobResources
jobs_tvar })
  st :: JobserverState
st@( JobserverState { jobserverAction :: JobserverState -> JobserverAction
jobserverAction = JobserverAction
Idle } )
  = do
    JobResources
jobs <- TVar JobResources -> STM JobResources
forall a. TVar a -> STM a
readTVar TVar JobResources
jobs_tvar
    Bool -> STM ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (Bool -> STM ()) -> Bool -> STM ()
forall a b. (a -> b) -> a -> b
$ JobResources -> Bool
guardAcquire JobResources
jobs
    IO JobserverState -> STM (IO JobserverState)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return do
      JobserverAction
action           <- Jobserver -> IO JobserverAction
acquireThread Jobserver
js
      -- Set a debounce after acquiring a token.
      TVar Bool
can_release_tvar <- Int -> IO (TVar Bool)
registerDelay (Int -> IO (TVar Bool)) -> Int -> IO (TVar Bool)
forall a b. (a -> b) -> a -> b
$ (JobserverOptions -> Int
releaseDebounce JobserverOptions
opts Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)
      JobserverState -> IO JobserverState
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (JobserverState -> IO JobserverState)
-> JobserverState -> IO JobserverState
forall a b. (a -> b) -> a -> b
$ JobserverState
st { jobserverAction = action
                  , canReleaseToken = can_release_tvar }
tryAcquire JobserverOptions
_ Jobserver
_ JobserverState
_ = STM (IO JobserverState)
forall a. STM a
retry

-- | When there are free tokens and no pending jobs,
-- spawn a thread to release a token from the semamphore.
--
-- See 'releaseThread'.
tryRelease :: Jobserver
           -> JobserverState
           -> STM (IO JobserverState)
tryRelease :: Jobserver -> JobserverState -> STM (IO JobserverState)
tryRelease sjs :: Jobserver
sjs@( Jobserver { jobs :: Jobserver -> TVar JobResources
jobs = TVar JobResources
jobs_tvar } )
  st :: JobserverState
st@( JobserverState
      { jobserverAction :: JobserverState -> JobserverAction
jobserverAction = JobserverAction
Idle
      , canReleaseToken :: JobserverState -> TVar Bool
canReleaseToken = TVar Bool
can_release_tvar } )
  = do
    JobResources
jobs <- TVar JobResources -> STM JobResources
forall a. TVar a -> STM a
readTVar TVar JobResources
jobs_tvar
    Bool -> STM ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard  (Bool -> STM ()) -> Bool -> STM ()
forall a b. (a -> b) -> a -> b
$ JobResources -> Bool
guardRelease JobResources
jobs
    Bool
can_release <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
can_release_tvar
    Bool -> STM ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard Bool
can_release
    IO JobserverState -> STM (IO JobserverState)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return do
      JobserverAction
action <- Jobserver -> IO JobserverAction
releaseThread Jobserver
sjs
      JobserverState -> IO JobserverState
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (JobserverState -> IO JobserverState)
-> JobserverState -> IO JobserverState
forall a b. (a -> b) -> a -> b
$ JobserverState
st { jobserverAction = action }
tryRelease Jobserver
_ JobserverState
_ = STM (IO JobserverState)
forall a. STM a
retry

-- | Wait for an active thread to finish. Once it finishes:
--
--  - set the 'JobserverAction' to 'Idle',
--  - update the number of capabilities to reflect the number
--    of owned tokens from the semaphore.
tryNoticeIdle :: JobserverOptions
              -> TVar JobResources
              -> JobserverState
              -> STM (IO JobserverState)
tryNoticeIdle :: JobserverOptions
-> TVar JobResources -> JobserverState -> STM (IO JobserverState)
tryNoticeIdle JobserverOptions
opts TVar JobResources
jobs_tvar JobserverState
jobserver_state
  | Just TMVar (Maybe SomeException)
threadFinished_tmvar <- JobserverAction -> Maybe (TMVar (Maybe SomeException))
activeThread_maybe (JobserverAction -> Maybe (TMVar (Maybe SomeException)))
-> JobserverAction -> Maybe (TMVar (Maybe SomeException))
forall a b. (a -> b) -> a -> b
$ JobserverState -> JobserverAction
jobserverAction JobserverState
jobserver_state
  = TVar Bool -> TMVar (Maybe SomeException) -> STM (IO JobserverState)
sync_num_caps (JobserverState -> TVar Bool
canChangeNumCaps JobserverState
jobserver_state) TMVar (Maybe SomeException)
threadFinished_tmvar
  | Bool
otherwise
  = STM (IO JobserverState)
forall a. STM a
retry -- no active thread: wait until jobserver isn't idle
  where
    sync_num_caps :: TVar Bool
                  -> TMVar (Maybe MC.SomeException)
                  -> STM (IO JobserverState)
    sync_num_caps :: TVar Bool -> TMVar (Maybe SomeException) -> STM (IO JobserverState)
sync_num_caps TVar Bool
can_change_numcaps_tvar TMVar (Maybe SomeException)
threadFinished_tmvar = do
      Maybe SomeException
mb_ex <- TMVar (Maybe SomeException) -> STM (Maybe SomeException)
forall a. TMVar a -> STM a
takeTMVar TMVar (Maybe SomeException)
threadFinished_tmvar
      Maybe SomeException -> (SomeException -> STM Any) -> STM ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe SomeException
mb_ex SomeException -> STM Any
forall e a. (HasCallStack, Exception e) => e -> STM a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
MC.throwM
      Jobs { Int
tokensOwned :: JobResources -> Int
tokensOwned :: Int
tokensOwned } <- TVar JobResources -> STM JobResources
forall a. TVar a -> STM a
readTVar TVar JobResources
jobs_tvar
      Bool
can_change_numcaps <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
can_change_numcaps_tvar
      Bool -> STM ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard Bool
can_change_numcaps
      IO JobserverState -> STM (IO JobserverState)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return do
        Int
x <- IO Int
getNumCapabilities
        TVar Bool
can_change_numcaps_tvar_2 <-
          if Int
x Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
tokensOwned
          then TVar Bool -> IO (TVar Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return TVar Bool
can_change_numcaps_tvar
          else do
            Int -> IO ()
setNumCapabilities Int
tokensOwned
            Int -> IO (TVar Bool)
registerDelay (Int -> IO (TVar Bool)) -> Int -> IO (TVar Bool)
forall a b. (a -> b) -> a -> b
$ (JobserverOptions -> Int
setNumCapsDebounce JobserverOptions
opts Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)
        JobserverState -> IO JobserverState
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (JobserverState -> IO JobserverState)
-> JobserverState -> IO JobserverState
forall a b. (a -> b) -> a -> b
$
          JobserverState
jobserver_state
            { jobserverAction  = Idle
            , canChangeNumCaps = can_change_numcaps_tvar_2 }

-- | Try to stop the current thread which is acquiring/releasing resources
-- if that operation is no longer relevant.
tryStopThread :: TVar JobResources
              -> JobserverState
              -> STM (IO JobserverState)
tryStopThread :: TVar JobResources -> JobserverState -> STM (IO JobserverState)
tryStopThread TVar JobResources
jobs_tvar JobserverState
jsj = do
  case JobserverState -> JobserverAction
jobserverAction JobserverState
jsj of
    Acquiring { activeWaitId :: JobserverAction -> WaitId
activeWaitId = WaitId
wait_id } -> do
     JobResources
jobs <- TVar JobResources -> STM JobResources
forall a. TVar a -> STM a
readTVar TVar JobResources
jobs_tvar
     Bool -> STM ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (Bool -> STM ()) -> Bool -> STM ()
forall a b. (a -> b) -> a -> b
$ OrdList (TMVar ()) -> Bool
forall a. OrdList a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null (JobResources -> OrdList (TMVar ())
jobsWaiting JobResources
jobs)
     IO JobserverState -> STM (IO JobserverState)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return do
       WaitId -> IO ()
interruptWaitOnSemaphore WaitId
wait_id
       JobserverState -> IO JobserverState
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (JobserverState -> IO JobserverState)
-> JobserverState -> IO JobserverState
forall a b. (a -> b) -> a -> b
$ JobserverState
jsj { jobserverAction = Idle }
    JobserverAction
_ -> STM (IO JobserverState)
forall a. STM a
retry

-- | Main jobserver loop: acquire/release resources as
-- needed for the pending jobs and available semaphore tokens.
jobserverLoop :: JobserverOptions -> Jobserver -> IO ()
jobserverLoop :: JobserverOptions -> Jobserver -> IO ()
jobserverLoop JobserverOptions
opts sjs :: Jobserver
sjs@(Jobserver { jobs :: Jobserver -> TVar JobResources
jobs = TVar JobResources
jobs_tvar })
  = do
      TVar Bool
true_tvar <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
True
      let init_state :: JobserverState
          init_state :: JobserverState
init_state =
            JobserverState
              { jobserverAction :: JobserverAction
jobserverAction  = JobserverAction
Idle
              , canChangeNumCaps :: TVar Bool
canChangeNumCaps = TVar Bool
true_tvar
              , canReleaseToken :: TVar Bool
canReleaseToken  = TVar Bool
true_tvar }
      JobserverState -> IO ()
loop JobserverState
init_state
  where
    loop :: JobserverState -> IO ()
loop JobserverState
s = do
      IO JobserverState
action <- STM (IO JobserverState) -> IO (IO JobserverState)
forall a. STM a -> IO a
atomically (STM (IO JobserverState) -> IO (IO JobserverState))
-> STM (IO JobserverState) -> IO (IO JobserverState)
forall a b. (a -> b) -> a -> b
$ [STM (IO JobserverState)] -> STM (IO JobserverState)
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum ([STM (IO JobserverState)] -> STM (IO JobserverState))
-> [STM (IO JobserverState)] -> STM (IO JobserverState)
forall a b. (a -> b) -> a -> b
$ (\JobserverState -> STM (IO JobserverState)
x -> JobserverState -> STM (IO JobserverState)
x JobserverState
s) ((JobserverState -> STM (IO JobserverState))
 -> STM (IO JobserverState))
-> [JobserverState -> STM (IO JobserverState)]
-> [STM (IO JobserverState)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
        [ Jobserver -> JobserverState -> STM (IO JobserverState)
tryRelease    Jobserver
sjs
        , JobserverOptions
-> Jobserver -> JobserverState -> STM (IO JobserverState)
tryAcquire    JobserverOptions
opts Jobserver
sjs
        , JobserverOptions
-> TVar JobResources -> JobserverState -> STM (IO JobserverState)
tryNoticeIdle JobserverOptions
opts TVar JobResources
jobs_tvar
        , TVar JobResources -> JobserverState -> STM (IO JobserverState)
tryStopThread TVar JobResources
jobs_tvar
        ]
      JobserverState
s <- IO JobserverState
action
      JobserverState -> IO ()
loop JobserverState
s

-- | Create a new jobserver using the given semaphore handle.
makeJobserver :: SemaphoreName -> IO (AbstractSem, IO ())
makeJobserver :: SemaphoreName -> IO (AbstractSem, IO ())
makeJobserver SemaphoreName
sem_name = do
  Semaphore
semaphore <- SemaphoreName -> IO Semaphore
openSemaphore SemaphoreName
sem_name
  let
    init_jobs :: JobResources
init_jobs =
      Jobs { tokensOwned :: Int
tokensOwned = Int
1
           , tokensFree :: Int
tokensFree  = Int
1
           , jobsWaiting :: OrdList (TMVar ())
jobsWaiting = OrdList (TMVar ())
forall a. OrdList a
NilOL
           }
  TVar JobResources
jobs_tvar <- JobResources -> IO (TVar JobResources)
forall a. a -> IO (TVar a)
newTVarIO JobResources
init_jobs
  let
    opts :: JobserverOptions
opts = JobserverOptions
defaultJobserverOptions -- TODO: allow this to be configured
    sjs :: Jobserver
sjs = Jobserver { jSemaphore :: Semaphore
jSemaphore = Semaphore
semaphore
                    , jobs :: TVar JobResources
jobs       = TVar JobResources
jobs_tvar }
  MVar (Maybe SomeException)
loop_finished_mvar <- IO (MVar (Maybe SomeException))
forall a. IO (MVar a)
newEmptyMVar
  ThreadId
loop_tid <- ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask \ forall a. IO a -> IO a
unmask -> do
    Either SomeException ()
r <- IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
unmask (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ JobserverOptions -> Jobserver -> IO ()
jobserverLoop JobserverOptions
opts Jobserver
sjs
    MVar (Maybe SomeException) -> Maybe SomeException -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Maybe SomeException)
loop_finished_mvar (Maybe SomeException -> IO ()) -> Maybe SomeException -> IO ()
forall a b. (a -> b) -> a -> b
$
      case Either SomeException ()
r of
        Left SomeException
e
          | Just AsyncException
ThreadKilled <- SomeException -> Maybe AsyncException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e
          -> Maybe SomeException
forall a. Maybe a
Nothing
          | Bool
otherwise
          -> SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e
        Right () -> Maybe SomeException
forall a. Maybe a
Nothing
  ThreadId -> String -> IO ()
labelThread ThreadId
loop_tid String
"job_server"
  let
    acquireSem :: IO ()
acquireSem = TVar JobResources -> IO ()
acquireJob TVar JobResources
jobs_tvar
    releaseSem :: IO ()
releaseSem = TVar JobResources -> IO ()
releaseJob TVar JobResources
jobs_tvar
    cleanupSem :: IO ()
cleanupSem = do
      -- this is interruptible
      Jobserver -> IO ()
cleanupJobserver Jobserver
sjs
      ThreadId -> IO ()
killThread ThreadId
loop_tid
      Maybe SomeException
mb_ex <- MVar (Maybe SomeException) -> IO (Maybe SomeException)
forall a. MVar a -> IO a
takeMVar MVar (Maybe SomeException)
loop_finished_mvar
      Maybe SomeException -> (SomeException -> IO Any) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe SomeException
mb_ex SomeException -> IO Any
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
MC.throwM

  (AbstractSem, IO ()) -> IO (AbstractSem, IO ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (AbstractSem{IO ()
acquireSem :: IO ()
releaseSem :: IO ()
acquireSem :: IO ()
releaseSem :: IO ()
..}, IO ()
cleanupSem)

-- | Implement an abstract semaphore using a semaphore 'Jobserver'
-- which queries the system semaphore of the given name for resources.
runJSemAbstractSem :: SemaphoreName         -- ^ the system semaphore to use
                   -> (AbstractSem -> IO a) -- ^ the operation to run
                                            -- which requires a semaphore
                   -> IO a
runJSemAbstractSem :: forall a. SemaphoreName -> (AbstractSem -> IO a) -> IO a
runJSemAbstractSem SemaphoreName
sem AbstractSem -> IO a
action = ((forall a. IO a -> IO a) -> IO a) -> IO a
forall b.
HasCallStack =>
((forall a. IO a -> IO a) -> IO b) -> IO b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
MC.mask \ forall a. IO a -> IO a
unmask -> do
  (AbstractSem
abs, IO ()
cleanup) <- SemaphoreName -> IO (AbstractSem, IO ())
makeJobserver SemaphoreName
sem
  Either SomeException a
r <- IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO (Either SomeException a))
-> IO a -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ IO a -> IO a
forall a. IO a -> IO a
unmask (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ AbstractSem -> IO a
action AbstractSem
abs
  case Either SomeException a
r of
    Left (SomeException
e1 :: MC.SomeException) -> do
      (Either SomeException ()
_ :: Either MC.SomeException ()) <- IO () -> IO (Either SomeException ())
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try IO ()
cleanup
      SomeException -> IO a
forall e a. (HasCallStack, Exception e) => e -> IO a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
MC.throwM SomeException
e1
    Right a
x -> IO ()
cleanup IO () -> a -> IO a
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> a
x

{- Note [Architecture of the Job Server]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In `-jsem` mode, the amount of parallelism that GHC can use is controlled by a
system semaphore. We take resources from the semaphore when we need them, and
give them back if we don't have enough to do.

A naive implementation would just take and release the semaphore around performing
the action, but this leads to two issues:

* When taking a token in the semaphore, we must call `setNumCapabilities` in order
  to adjust how many capabilities are available for parallel garbage collection.
  This causes unnecessary synchronisations.
* We want to implement a debounce, so that whilst there is pending work in the
  current process we prefer to keep hold of resources from the semaphore.
  This reduces overall memory usage, as there are fewer live GHC processes at once.

Therefore, the obtention of semaphore resources is separated away from the
request for the resource in the driver.

A token from the semaphore is requested using `acquireJob`. This creates a pending
job, which is a MVar that can be filled in to signal that the requested token is ready.

When the job is finished, the token is released by calling `releaseJob`, which just
increases the number of `free` jobs. If there are more pending jobs when the free count
is increased, the token is immediately reused (see `modifyJobResources`).

The `jobServerLoop` interacts with the system semaphore: when there are pending
jobs, `acquireThread` blocks, waiting for a token from the semaphore. Once a
token is obtained, it increases the owned count.

When GHC has free tokens (tokens from the semaphore that it is not using),
no pending jobs, and the debounce has expired, then `releaseThread` will
release tokens back to the global semaphore.

`tryStopThread` attempts to kill threads which are waiting to acquire a resource
when we no longer need it. For example, consider that we attempt to acquire two
tokens, but the first job finishes before we acquire the second token.
This second token is no longer needed, so we should cancel the wait
(as it would not be used to do any work, and not be returned until the debounce).
We only need to kill `acquireJob`, because `releaseJob` never blocks.

Note [Eventlog Messages for jsem]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
It can be tricky to verify that the work is shared adequately across different
processes. To help debug this, we output the values of `JobResource` to the
eventlog whenever the global state changes. There are some scripts which can be used
to analyse this output and report statistics about core saturation in the
GitHub repo (https://github.com/mpickering/ghc-jsem-analyse).

-}