{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE NumericUnderscores #-}

-- | Implementation of a jobserver using system semaphores.
--
--
module GHC.Driver.MakeSem
  ( -- * JSem: parallelism semaphore backed
    -- by a system semaphore (Posix/Windows)
    runJSemAbstractSem

  -- * System semaphores
  , Semaphore, SemaphoreName(..)

  -- * Abstract semaphores
  , AbstractSem(..)
  , withAbstractSem
  )
  where

import GHC.Prelude
import GHC.Conc
import GHC.Data.OrdList
import GHC.IO.Exception
import GHC.Utils.Outputable
import GHC.Utils.Panic
import GHC.Utils.Json

import System.Semaphore

import Control.Monad
import qualified Control.Monad.Catch as MC
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Data.Foldable
import Data.Functor
import GHC.Stack
import Debug.Trace

---------------------------------------
-- Semaphore jobserver

-- | A jobserver based off a system 'Semaphore'.
--
-- Keeps track of the pending jobs and resources
-- available from the semaphore.
data Jobserver
  = Jobserver
  { Jobserver -> Semaphore
jSemaphore :: !Semaphore
    -- ^ The semaphore which controls available resources
  , Jobserver -> TVar JobResources
jobs :: !(TVar JobResources)
    -- ^ The currently pending jobs, and the resources
    -- obtained from the semaphore
  }

data JobserverOptions
  = JobserverOptions
  { JobserverOptions -> Int
releaseDebounce    :: !Int
     -- ^ Minimum delay, in milliseconds, between acquiring a token
     -- and releasing a token.
  , JobserverOptions -> Int
setNumCapsDebounce :: !Int
    -- ^ Minimum delay, in milliseconds, between two consecutive
    -- calls of 'setNumCapabilities'.
  }

defaultJobserverOptions :: JobserverOptions
defaultJobserverOptions :: JobserverOptions
defaultJobserverOptions =
  JobserverOptions
    { releaseDebounce :: Int
releaseDebounce    = Int
1000 -- 1 second
    , setNumCapsDebounce :: Int
setNumCapsDebounce = Int
1000 -- 1 second
    }

-- | Resources available for running jobs, i.e.
-- tokens obtained from the parallelism semaphore.
data JobResources
  = Jobs
  { JobResources -> Int
tokensOwned :: !Int
    -- ^ How many tokens have been claimed from the semaphore
  , JobResources -> Int
tokensFree  :: !Int
    -- ^ How many tokens are not currently being used
  , JobResources -> OrdList (TMVar ())
jobsWaiting :: !(OrdList (TMVar ()))
    -- ^ Pending jobs waiting on a token, the job will be blocked on the TMVar so putting into
    -- the TMVar will allow the job to continue.
  }

instance Outputable JobResources where
  ppr :: JobResources -> SDoc
ppr Jobs{Int
OrdList (TMVar ())
tokensOwned :: JobResources -> Int
tokensFree :: JobResources -> Int
jobsWaiting :: JobResources -> OrdList (TMVar ())
tokensOwned :: Int
tokensFree :: Int
jobsWaiting :: OrdList (TMVar ())
..}
    = String -> SDoc
forall doc. IsLine doc => String -> doc
text String
"JobResources" SDoc -> SDoc -> SDoc
forall doc. IsLine doc => doc -> doc -> doc
<+>
        ( SDoc -> SDoc
forall doc. IsLine doc => doc -> doc
braces (SDoc -> SDoc) -> SDoc -> SDoc
forall a b. (a -> b) -> a -> b
$ [SDoc] -> SDoc
forall doc. IsLine doc => [doc] -> doc
hsep
          [ String -> SDoc
forall doc. IsLine doc => String -> doc
text String
"owned=" SDoc -> SDoc -> SDoc
forall doc. IsLine doc => doc -> doc -> doc
<> Int -> SDoc
forall a. Outputable a => a -> SDoc
ppr Int
tokensOwned
          , String -> SDoc
forall doc. IsLine doc => String -> doc
text String
"free=" SDoc -> SDoc -> SDoc
forall doc. IsLine doc => doc -> doc -> doc
<> Int -> SDoc
forall a. Outputable a => a -> SDoc
ppr Int
tokensFree
          , String -> SDoc
forall doc. IsLine doc => String -> doc
text String
"num_waiting=" SDoc -> SDoc -> SDoc
forall doc. IsLine doc => doc -> doc -> doc
<> Int -> SDoc
forall a. Outputable a => a -> SDoc
ppr (OrdList (TMVar ()) -> Int
forall a. OrdList a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length OrdList (TMVar ())
jobsWaiting)
          ] )

-- | Add one new token.
addToken :: JobResources -> JobResources
addToken :: JobResources -> JobResources
addToken jobs :: JobResources
jobs@( Jobs { tokensOwned :: JobResources -> Int
tokensOwned = Int
owned, tokensFree :: JobResources -> Int
tokensFree = Int
free })
  = JobResources
jobs { tokensOwned = owned + 1, tokensFree = free + 1 }

-- | Free one token.
addFreeToken :: JobResources -> JobResources
addFreeToken :: JobResources -> JobResources
addFreeToken jobs :: JobResources
jobs@( Jobs { tokensFree :: JobResources -> Int
tokensFree = Int
free })
  = Bool -> SDoc -> JobResources -> JobResources
forall a. HasCallStack => Bool -> SDoc -> a -> a
assertPpr (JobResources -> Int
tokensOwned JobResources
jobs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
free)
      (String -> SDoc
forall doc. IsLine doc => String -> doc
text String
"addFreeToken:" SDoc -> SDoc -> SDoc
forall doc. IsLine doc => doc -> doc -> doc
<+> Int -> SDoc
forall a. Outputable a => a -> SDoc
ppr (JobResources -> Int
tokensOwned JobResources
jobs) SDoc -> SDoc -> SDoc
forall doc. IsLine doc => doc -> doc -> doc
<+> Int -> SDoc
forall a. Outputable a => a -> SDoc
ppr Int
free)
  (JobResources -> JobResources) -> JobResources -> JobResources
forall a b. (a -> b) -> a -> b
$ JobResources
jobs { tokensFree = free + 1 }

-- | Use up one token.
removeFreeToken :: JobResources -> JobResources
removeFreeToken :: JobResources -> JobResources
removeFreeToken jobs :: JobResources
jobs@( Jobs { tokensFree :: JobResources -> Int
tokensFree = Int
free })
  = Bool -> SDoc -> JobResources -> JobResources
forall a. HasCallStack => Bool -> SDoc -> a -> a
assertPpr (Int
free Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0)
      (String -> SDoc
forall doc. IsLine doc => String -> doc
text String
"removeFreeToken:" SDoc -> SDoc -> SDoc
forall doc. IsLine doc => doc -> doc -> doc
<+> Int -> SDoc
forall a. Outputable a => a -> SDoc
ppr Int
free)
  (JobResources -> JobResources) -> JobResources -> JobResources
forall a b. (a -> b) -> a -> b
$ JobResources
jobs { tokensFree = free - 1 }

-- | Return one owned token.
removeOwnedToken :: JobResources -> JobResources
removeOwnedToken :: JobResources -> JobResources
removeOwnedToken jobs :: JobResources
jobs@( Jobs { tokensOwned :: JobResources -> Int
tokensOwned = Int
owned })
  = Bool -> SDoc -> JobResources -> JobResources
forall a. HasCallStack => Bool -> SDoc -> a -> a
assertPpr (Int
owned Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1)
      (String -> SDoc
forall doc. IsLine doc => String -> doc
text String
"removeOwnedToken:" SDoc -> SDoc -> SDoc
forall doc. IsLine doc => doc -> doc -> doc
<+> Int -> SDoc
forall a. Outputable a => a -> SDoc
ppr Int
owned)
  (JobResources -> JobResources) -> JobResources -> JobResources
forall a b. (a -> b) -> a -> b
$ JobResources
jobs { tokensOwned = owned - 1 }

-- | Add one new job to the end of the list of pending jobs.
addJob :: TMVar () -> JobResources -> JobResources
addJob :: TMVar () -> JobResources -> JobResources
addJob TMVar ()
job jobs :: JobResources
jobs@( Jobs { jobsWaiting :: JobResources -> OrdList (TMVar ())
jobsWaiting = OrdList (TMVar ())
wait })
  = JobResources
jobs { jobsWaiting = wait `SnocOL` job }

-- | The state of the semaphore job server.
data JobserverState
  = JobserverState
    { JobserverState -> JobserverAction
jobserverAction  :: !JobserverAction
      -- ^ The current action being performed by the
      -- job server.
    , JobserverState -> TVar Bool
canChangeNumCaps :: !(TVar Bool)
      -- ^ A TVar that signals whether it has been long
      -- enough since we last changed 'numCapabilities'.
    , JobserverState -> TVar Bool
canReleaseToken  :: !(TVar Bool)
      -- ^ A TVar that signals whether we last acquired
      -- a token long enough ago that we can now release
      -- a token.
    }
data JobserverAction
  -- | The jobserver is idle: no thread is currently
  -- interacting with the semaphore.
  = Idle
  -- | A thread is waiting for a token on the semaphore.
  | Acquiring
    { JobserverAction -> WaitId
activeWaitId   :: WaitId
    , JobserverAction -> TMVar (Maybe SomeException)
threadFinished :: TMVar (Maybe MC.SomeException) }

-- | Retrieve the 'TMVar' that signals if the current thread has finished,
-- if any thread is currently active in the jobserver.
activeThread_maybe :: JobserverAction -> Maybe (TMVar (Maybe MC.SomeException))
activeThread_maybe :: JobserverAction -> Maybe (TMVar (Maybe SomeException))
activeThread_maybe JobserverAction
Idle                                   = Maybe (TMVar (Maybe SomeException))
forall a. Maybe a
Nothing
activeThread_maybe (Acquiring { threadFinished :: JobserverAction -> TMVar (Maybe SomeException)
threadFinished = TMVar (Maybe SomeException)
tmvar }) = TMVar (Maybe SomeException) -> Maybe (TMVar (Maybe SomeException))
forall a. a -> Maybe a
Just TMVar (Maybe SomeException)
tmvar

-- | Whether we should try to acquire a new token from the semaphore:
-- there is a pending job and no free tokens.
guardAcquire :: JobResources -> Bool
guardAcquire :: JobResources -> Bool
guardAcquire ( Jobs { Int
tokensFree :: JobResources -> Int
tokensFree :: Int
tokensFree, OrdList (TMVar ())
jobsWaiting :: JobResources -> OrdList (TMVar ())
jobsWaiting :: OrdList (TMVar ())
jobsWaiting } )
  = Int
tokensFree Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 Bool -> Bool -> Bool
&& Bool -> Bool
not (OrdList (TMVar ()) -> Bool
forall a. OrdList a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null OrdList (TMVar ())
jobsWaiting)

-- | Whether we should release a token from the semaphore:
-- there are no pending jobs and we can release a token.
guardRelease :: JobResources -> Bool
guardRelease :: JobResources -> Bool
guardRelease ( Jobs { Int
tokensFree :: JobResources -> Int
tokensFree :: Int
tokensFree, Int
tokensOwned :: JobResources -> Int
tokensOwned :: Int
tokensOwned, OrdList (TMVar ())
jobsWaiting :: JobResources -> OrdList (TMVar ())
jobsWaiting :: OrdList (TMVar ())
jobsWaiting } )
  = OrdList (TMVar ()) -> Bool
forall a. OrdList a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null OrdList (TMVar ())
jobsWaiting Bool -> Bool -> Bool
&& Int
tokensFree Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 Bool -> Bool -> Bool
&& Int
tokensOwned Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
1

---------------------------------------
-- Semaphore jobserver implementation

-- | Add one pending job to the jobserver.
--
-- Blocks, waiting on the jobserver to supply a free token.
acquireJob :: TVar JobResources -> IO ()
acquireJob :: TVar JobResources -> IO ()
acquireJob TVar JobResources
jobs_tvar = do
  (job_tmvar, _jobs0) <- String
-> STM ((TMVar (), JobResources), Maybe JobResources)
-> IO (TMVar (), JobResources)
forall a. String -> STM (a, Maybe JobResources) -> IO a
tracedAtomically String
"acquire" (STM ((TMVar (), JobResources), Maybe JobResources)
 -> IO (TMVar (), JobResources))
-> STM ((TMVar (), JobResources), Maybe JobResources)
-> IO (TMVar (), JobResources)
forall a b. (a -> b) -> a -> b
$
    TVar JobResources
-> (JobResources -> STM ((TMVar (), JobResources), JobResources))
-> STM ((TMVar (), JobResources), Maybe JobResources)
forall a.
HasCallStack =>
TVar JobResources
-> (JobResources -> STM (a, JobResources))
-> STM (a, Maybe JobResources)
modifyJobResources TVar JobResources
jobs_tvar \ JobResources
jobs -> do
      job_tmvar <- STM (TMVar ())
forall a. STM (TMVar a)
newEmptyTMVar
      return ((job_tmvar, jobs), addJob job_tmvar jobs)
  atomically $ takeTMVar job_tmvar

-- | Signal to the job server that one job has completed,
-- releasing its corresponding token.
releaseJob :: TVar JobResources -> IO ()
releaseJob :: TVar JobResources -> IO ()
releaseJob TVar JobResources
jobs_tvar = do
  String -> STM ((), Maybe JobResources) -> IO ()
forall a. String -> STM (a, Maybe JobResources) -> IO a
tracedAtomically String
"release" do
    TVar JobResources
-> (JobResources -> STM ((), JobResources))
-> STM ((), Maybe JobResources)
forall a.
HasCallStack =>
TVar JobResources
-> (JobResources -> STM (a, JobResources))
-> STM (a, Maybe JobResources)
modifyJobResources TVar JobResources
jobs_tvar \ JobResources
jobs -> do
      Bool -> SDoc -> STM ()
forall (m :: * -> *).
(HasCallStack, Applicative m) =>
Bool -> SDoc -> m ()
massertPpr (JobResources -> Int
tokensFree JobResources
jobs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< JobResources -> Int
tokensOwned JobResources
jobs)
        (String -> SDoc
forall doc. IsLine doc => String -> doc
text String
"releaseJob: more free jobs than owned jobs!")
      ((), JobResources) -> STM ((), JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ((), JobResources -> JobResources
addFreeToken JobResources
jobs)


-- | Release all tokens owned from the semaphore (to clean up
-- the jobserver at the end).
cleanupJobserver :: Jobserver -> IO ()
cleanupJobserver :: Jobserver -> IO ()
cleanupJobserver (Jobserver { jSemaphore :: Jobserver -> Semaphore
jSemaphore = Semaphore
sem
                            , jobs :: Jobserver -> TVar JobResources
jobs       = TVar JobResources
jobs_tvar })
  = do
    Jobs { tokensOwned = owned } <- TVar JobResources -> IO JobResources
forall a. TVar a -> IO a
readTVarIO TVar JobResources
jobs_tvar
    let toks_to_release = Int
owned Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
      -- Subtract off the implicit token: whoever spawned the ghc process
      -- in the first place is responsible for that token.
    releaseSemaphore sem toks_to_release

-- | Dispatch the available tokens acquired from the semaphore
-- to the pending jobs in the job server.
dispatchTokens :: JobResources -> STM JobResources
dispatchTokens :: JobResources -> STM JobResources
dispatchTokens jobs :: JobResources
jobs@( Jobs { tokensFree :: JobResources -> Int
tokensFree = Int
toks_free, jobsWaiting :: JobResources -> OrdList (TMVar ())
jobsWaiting = OrdList (TMVar ())
wait } )
  | Int
toks_free Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
  , TMVar ()
next `ConsOL` OrdList (TMVar ())
rest <- OrdList (TMVar ())
wait
  -- There's a pending job and a free token:
  -- pass on the token to that job, and recur.
  = do
      TMVar () -> () -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar ()
next ()
      let jobs' :: JobResources
jobs' = JobResources
jobs { tokensFree = toks_free - 1, jobsWaiting = rest }
      JobResources -> STM JobResources
dispatchTokens JobResources
jobs'
  | Bool
otherwise
  = JobResources -> STM JobResources
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return JobResources
jobs

-- | Update the available resources used from a semaphore, dispatching
-- any newly acquired resources.
--
-- Invariant: if the number of available resources decreases, there
-- must be no pending jobs.
--
-- All modifications should go through this function to ensure the contents
-- of the 'TVar' remains in normal form.
modifyJobResources :: HasCallStack => TVar JobResources
                   -> (JobResources -> STM (a, JobResources))
                   -> STM (a, Maybe JobResources)
modifyJobResources :: forall a.
HasCallStack =>
TVar JobResources
-> (JobResources -> STM (a, JobResources))
-> STM (a, Maybe JobResources)
modifyJobResources TVar JobResources
jobs_tvar JobResources -> STM (a, JobResources)
action = do
  old_jobs  <- TVar JobResources -> STM JobResources
forall a. TVar a -> STM a
readTVar TVar JobResources
jobs_tvar
  (a, jobs) <- action old_jobs

  -- Check the invariant: if the number of free tokens has decreased,
  -- there must be no pending jobs.
  massertPpr (null (jobsWaiting jobs) || tokensFree jobs >= tokensFree old_jobs) $
    vcat [ text "modiyJobResources: pending jobs but fewer free tokens" ]
  dispatched_jobs <- dispatchTokens jobs
  writeTVar jobs_tvar dispatched_jobs
  return (a, Just dispatched_jobs)


tracedAtomically_ :: String -> STM (Maybe JobResources) -> IO ()
tracedAtomically_ :: String -> STM (Maybe JobResources) -> IO ()
tracedAtomically_ String
s STM (Maybe JobResources)
act = String -> STM ((), Maybe JobResources) -> IO ()
forall a. String -> STM (a, Maybe JobResources) -> IO a
tracedAtomically String
s (((),) (Maybe JobResources -> ((), Maybe JobResources))
-> STM (Maybe JobResources) -> STM ((), Maybe JobResources)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (Maybe JobResources)
act)

tracedAtomically :: String -> STM (a, Maybe JobResources) -> IO a
tracedAtomically :: forall a. String -> STM (a, Maybe JobResources) -> IO a
tracedAtomically String
origin STM (a, Maybe JobResources)
act = do
  (a, mjr) <- STM (a, Maybe JobResources) -> IO (a, Maybe JobResources)
forall a. STM a -> IO a
atomically STM (a, Maybe JobResources)
act
  forM_ mjr $ \ JobResources
jr -> do
    -- Use the "jsem:" prefix to identify where the write traces are
    String -> IO ()
traceEventIO (String
"jsem:" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String -> JobResources -> String
renderJobResources String
origin JobResources
jr)
  return a

renderJobResources :: String -> JobResources -> String
renderJobResources :: String -> JobResources -> String
renderJobResources String
origin (Jobs Int
own Int
free OrdList (TMVar ())
pending) = SDoc -> String
showSDocUnsafe (SDoc -> String) -> SDoc -> String
forall a b. (a -> b) -> a -> b
$ JsonDoc -> SDoc
renderJSON (JsonDoc -> SDoc) -> JsonDoc -> SDoc
forall a b. (a -> b) -> a -> b
$
  [(String, JsonDoc)] -> JsonDoc
JSObject [ (String
"name", String -> JsonDoc
JSString String
origin)
           , (String
"owned", Int -> JsonDoc
JSInt Int
own)
           , (String
"free", Int -> JsonDoc
JSInt Int
free)
           , (String
"pending", Int -> JsonDoc
JSInt (OrdList (TMVar ()) -> Int
forall a. OrdList a -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length OrdList (TMVar ())
pending) )
           ]


-- | Spawn a new thread that waits on the semaphore in order to acquire
-- an additional token.
acquireThread :: Jobserver -> IO JobserverAction
acquireThread :: Jobserver -> IO JobserverAction
acquireThread (Jobserver { jSemaphore :: Jobserver -> Semaphore
jSemaphore = Semaphore
sem, jobs :: Jobserver -> TVar JobResources
jobs = TVar JobResources
jobs_tvar }) = do
    threadFinished_tmvar <- IO (TMVar (Maybe SomeException))
forall a. IO (TMVar a)
newEmptyTMVarIO
    let
      wait_result_action :: Either MC.SomeException Bool -> IO ()
      wait_result_action Either SomeException Bool
wait_res =
        String -> STM (Maybe JobResources) -> IO ()
tracedAtomically_ String
"acquire_thread" do
          (r, jb) <- case Either SomeException Bool
wait_res of
            Left (SomeException
e :: MC.SomeException) -> do
              (Maybe SomeException, Maybe JobResources)
-> STM (Maybe SomeException, Maybe JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ((Maybe SomeException, Maybe JobResources)
 -> STM (Maybe SomeException, Maybe JobResources))
-> (Maybe SomeException, Maybe JobResources)
-> STM (Maybe SomeException, Maybe JobResources)
forall a b. (a -> b) -> a -> b
$ (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e, Maybe JobResources
forall a. Maybe a
Nothing)
            Right Bool
success -> do
              if Bool
success
                then do
                  TVar JobResources
-> (JobResources -> STM (Maybe SomeException, JobResources))
-> STM (Maybe SomeException, Maybe JobResources)
forall a.
HasCallStack =>
TVar JobResources
-> (JobResources -> STM (a, JobResources))
-> STM (a, Maybe JobResources)
modifyJobResources TVar JobResources
jobs_tvar \ JobResources
jobs ->
                    (Maybe SomeException, JobResources)
-> STM (Maybe SomeException, JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe SomeException
forall a. Maybe a
Nothing, JobResources -> JobResources
addToken JobResources
jobs)
                else
                  (Maybe SomeException, Maybe JobResources)
-> STM (Maybe SomeException, Maybe JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe SomeException
forall a. Maybe a
Nothing, Maybe JobResources
forall a. Maybe a
Nothing)
          putTMVar threadFinished_tmvar r
          return jb
    wait_id <- forkWaitOnSemaphoreInterruptible sem wait_result_action
    labelThread (waitingThreadId wait_id) "acquire_thread"
    return $ Acquiring { activeWaitId   = wait_id
                       , threadFinished = threadFinished_tmvar }

-- | Spawn a thread to release ownership of one resource from the semaphore,
-- provided we have spare resources and no pending jobs.
releaseThread :: Jobserver -> IO JobserverAction
releaseThread :: Jobserver -> IO JobserverAction
releaseThread (Jobserver { jSemaphore :: Jobserver -> Semaphore
jSemaphore = Semaphore
sem, jobs :: Jobserver -> TVar JobResources
jobs = TVar JobResources
jobs_tvar }) = do
  threadFinished_tmvar <- IO (TMVar (Maybe SomeException))
forall a. IO (TMVar a)
newEmptyTMVarIO
  MC.mask_ do
    -- Pre-release the resource so that another thread doesn't take control of it
    -- just as we release the lock on the semaphore.
    still_ok_to_release
      <- tracedAtomically "pre_release" $
         modifyJobResources jobs_tvar \ JobResources
jobs ->
           if JobResources -> Bool
guardRelease JobResources
jobs
               -- TODO: should this also debounce?
           then (Bool, JobResources) -> STM (Bool, JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True , JobResources -> JobResources
removeOwnedToken (JobResources -> JobResources) -> JobResources -> JobResources
forall a b. (a -> b) -> a -> b
$ JobResources -> JobResources
removeFreeToken JobResources
jobs)
           else (Bool, JobResources) -> STM (Bool, JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
False, JobResources
jobs)
    if not still_ok_to_release
    then return Idle
    else do
      tid <- forkIO $ do
        x <- MC.try $ releaseSemaphore sem 1
        tracedAtomically_ "post-release" $ do
          (r, jobs) <- case x of
            Left (SomeException
e :: MC.SomeException) -> do
              TVar JobResources
-> (JobResources -> STM (Maybe SomeException, JobResources))
-> STM (Maybe SomeException, Maybe JobResources)
forall a.
HasCallStack =>
TVar JobResources
-> (JobResources -> STM (a, JobResources))
-> STM (a, Maybe JobResources)
modifyJobResources TVar JobResources
jobs_tvar \ JobResources
jobs ->
                (Maybe SomeException, JobResources)
-> STM (Maybe SomeException, JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e, JobResources -> JobResources
addToken JobResources
jobs)
            Right ()
_ -> do
              (Maybe SomeException, Maybe JobResources)
-> STM (Maybe SomeException, Maybe JobResources)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe SomeException
forall a. Maybe a
Nothing, Maybe JobResources
forall a. Maybe a
Nothing)
          putTMVar threadFinished_tmvar r
          return jobs
      labelThread tid "release_thread"
      return Idle

-- | When there are pending jobs but no free tokens,
-- spawn a thread to acquire a new token from the semaphore.
--
-- See 'acquireThread'.
tryAcquire :: JobserverOptions
           -> Jobserver
           -> JobserverState
           -> STM (IO JobserverState)
tryAcquire :: JobserverOptions
-> Jobserver -> JobserverState -> STM (IO JobserverState)
tryAcquire JobserverOptions
opts js :: Jobserver
js@( Jobserver { jobs :: Jobserver -> TVar JobResources
jobs = TVar JobResources
jobs_tvar })
  st :: JobserverState
st@( JobserverState { jobserverAction :: JobserverState -> JobserverAction
jobserverAction = JobserverAction
Idle } )
  = do
    jobs <- TVar JobResources -> STM JobResources
forall a. TVar a -> STM a
readTVar TVar JobResources
jobs_tvar
    guard $ guardAcquire jobs
    return do
      action           <- acquireThread js
      -- Set a debounce after acquiring a token.
      can_release_tvar <- registerDelay $ (releaseDebounce opts * 1000)
      return $ st { jobserverAction = action
                  , canReleaseToken = can_release_tvar }
tryAcquire JobserverOptions
_ Jobserver
_ JobserverState
_ = STM (IO JobserverState)
forall a. STM a
retry

-- | When there are free tokens and no pending jobs,
-- spawn a thread to release a token from the semaphore.
--
-- See 'releaseThread'.
tryRelease :: Jobserver
           -> JobserverState
           -> STM (IO JobserverState)
tryRelease :: Jobserver -> JobserverState -> STM (IO JobserverState)
tryRelease sjs :: Jobserver
sjs@( Jobserver { jobs :: Jobserver -> TVar JobResources
jobs = TVar JobResources
jobs_tvar } )
  st :: JobserverState
st@( JobserverState
      { jobserverAction :: JobserverState -> JobserverAction
jobserverAction = JobserverAction
Idle
      , canReleaseToken :: JobserverState -> TVar Bool
canReleaseToken = TVar Bool
can_release_tvar } )
  = do
    jobs <- TVar JobResources -> STM JobResources
forall a. TVar a -> STM a
readTVar TVar JobResources
jobs_tvar
    guard  $ guardRelease jobs
    can_release <- readTVar can_release_tvar
    guard can_release
    return do
      action <- releaseThread sjs
      return $ st { jobserverAction = action }
tryRelease Jobserver
_ JobserverState
_ = STM (IO JobserverState)
forall a. STM a
retry

-- | Wait for an active thread to finish. Once it finishes:
--
--  - set the 'JobserverAction' to 'Idle',
--  - update the number of capabilities to reflect the number
--    of owned tokens from the semaphore.
tryNoticeIdle :: JobserverOptions
              -> TVar JobResources
              -> JobserverState
              -> STM (IO JobserverState)
tryNoticeIdle :: JobserverOptions
-> TVar JobResources -> JobserverState -> STM (IO JobserverState)
tryNoticeIdle JobserverOptions
opts TVar JobResources
jobs_tvar JobserverState
jobserver_state
  | Just TMVar (Maybe SomeException)
threadFinished_tmvar <- JobserverAction -> Maybe (TMVar (Maybe SomeException))
activeThread_maybe (JobserverAction -> Maybe (TMVar (Maybe SomeException)))
-> JobserverAction -> Maybe (TMVar (Maybe SomeException))
forall a b. (a -> b) -> a -> b
$ JobserverState -> JobserverAction
jobserverAction JobserverState
jobserver_state
  = TVar Bool -> TMVar (Maybe SomeException) -> STM (IO JobserverState)
sync_num_caps (JobserverState -> TVar Bool
canChangeNumCaps JobserverState
jobserver_state) TMVar (Maybe SomeException)
threadFinished_tmvar
  | Bool
otherwise
  = STM (IO JobserverState)
forall a. STM a
retry -- no active thread: wait until jobserver isn't idle
  where
    sync_num_caps :: TVar Bool
                  -> TMVar (Maybe MC.SomeException)
                  -> STM (IO JobserverState)
    sync_num_caps :: TVar Bool -> TMVar (Maybe SomeException) -> STM (IO JobserverState)
sync_num_caps TVar Bool
can_change_numcaps_tvar TMVar (Maybe SomeException)
threadFinished_tmvar = do
      mb_ex <- TMVar (Maybe SomeException) -> STM (Maybe SomeException)
forall a. TMVar a -> STM a
takeTMVar TMVar (Maybe SomeException)
threadFinished_tmvar
      for_ mb_ex MC.throwM
      Jobs { tokensOwned } <- readTVar jobs_tvar
      can_change_numcaps <- readTVar can_change_numcaps_tvar
      guard can_change_numcaps
      return do
        x <- getNumCapabilities
        can_change_numcaps_tvar_2 <-
          if x == tokensOwned
          then return can_change_numcaps_tvar
          else do
            setNumCapabilities tokensOwned
            registerDelay $ (setNumCapsDebounce opts * 1000)
        return $
          jobserver_state
            { jobserverAction  = Idle
            , canChangeNumCaps = can_change_numcaps_tvar_2 }

-- | Try to stop the current thread which is acquiring/releasing resources
-- if that operation is no longer relevant.
tryStopThread :: TVar JobResources
              -> JobserverState
              -> STM (IO JobserverState)
tryStopThread :: TVar JobResources -> JobserverState -> STM (IO JobserverState)
tryStopThread TVar JobResources
jobs_tvar JobserverState
jsj = do
  case JobserverState -> JobserverAction
jobserverAction JobserverState
jsj of
    Acquiring { activeWaitId :: JobserverAction -> WaitId
activeWaitId = WaitId
wait_id } -> do
     jobs <- TVar JobResources -> STM JobResources
forall a. TVar a -> STM a
readTVar TVar JobResources
jobs_tvar
     guard $ null (jobsWaiting jobs)
     return do
       interruptWaitOnSemaphore wait_id
       return $ jsj { jobserverAction = Idle }
    JobserverAction
_ -> STM (IO JobserverState)
forall a. STM a
retry

-- | Main jobserver loop: acquire/release resources as
-- needed for the pending jobs and available semaphore tokens.
jobserverLoop :: JobserverOptions -> Jobserver -> IO ()
jobserverLoop :: JobserverOptions -> Jobserver -> IO ()
jobserverLoop JobserverOptions
opts sjs :: Jobserver
sjs@(Jobserver { jobs :: Jobserver -> TVar JobResources
jobs = TVar JobResources
jobs_tvar })
  = do
      true_tvar <- Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
True
      let init_state :: JobserverState
          init_state =
            JobserverState
              { jobserverAction :: JobserverAction
jobserverAction  = JobserverAction
Idle
              , canChangeNumCaps :: TVar Bool
canChangeNumCaps = TVar Bool
true_tvar
              , canReleaseToken :: TVar Bool
canReleaseToken  = TVar Bool
true_tvar }
      loop init_state
  where
    loop :: JobserverState -> IO ()
loop JobserverState
s = do
      action <- STM (IO JobserverState) -> IO (IO JobserverState)
forall a. STM a -> IO a
atomically (STM (IO JobserverState) -> IO (IO JobserverState))
-> STM (IO JobserverState) -> IO (IO JobserverState)
forall a b. (a -> b) -> a -> b
$ [STM (IO JobserverState)] -> STM (IO JobserverState)
forall (t :: * -> *) (f :: * -> *) a.
(Foldable t, Alternative f) =>
t (f a) -> f a
asum ([STM (IO JobserverState)] -> STM (IO JobserverState))
-> [STM (IO JobserverState)] -> STM (IO JobserverState)
forall a b. (a -> b) -> a -> b
$ (\JobserverState -> STM (IO JobserverState)
x -> JobserverState -> STM (IO JobserverState)
x JobserverState
s) ((JobserverState -> STM (IO JobserverState))
 -> STM (IO JobserverState))
-> [JobserverState -> STM (IO JobserverState)]
-> [STM (IO JobserverState)]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
        [ Jobserver -> JobserverState -> STM (IO JobserverState)
tryRelease    Jobserver
sjs
        , JobserverOptions
-> Jobserver -> JobserverState -> STM (IO JobserverState)
tryAcquire    JobserverOptions
opts Jobserver
sjs
        , JobserverOptions
-> TVar JobResources -> JobserverState -> STM (IO JobserverState)
tryNoticeIdle JobserverOptions
opts TVar JobResources
jobs_tvar
        , TVar JobResources -> JobserverState -> STM (IO JobserverState)
tryStopThread TVar JobResources
jobs_tvar
        ]
      s <- action
      loop s

-- | Create a new jobserver using the given semaphore handle.
makeJobserver :: SemaphoreName -> IO (AbstractSem, IO ())
makeJobserver :: SemaphoreName -> IO (AbstractSem, IO ())
makeJobserver SemaphoreName
sem_name = do
  semaphore <- SemaphoreName -> IO Semaphore
openSemaphore SemaphoreName
sem_name
  let
    init_jobs =
      Jobs { tokensOwned :: Int
tokensOwned = Int
1
           , tokensFree :: Int
tokensFree  = Int
1
           , jobsWaiting :: OrdList (TMVar ())
jobsWaiting = OrdList (TMVar ())
forall a. OrdList a
NilOL
           }
  jobs_tvar <- newTVarIO init_jobs
  let
    opts = JobserverOptions
defaultJobserverOptions -- TODO: allow this to be configured
    sjs = Jobserver { jSemaphore :: Semaphore
jSemaphore = Semaphore
semaphore
                    , jobs :: TVar JobResources
jobs       = TVar JobResources
jobs_tvar }
  loop_finished_mvar <- newEmptyMVar
  loop_tid <- forkIOWithUnmask \ forall a. IO a -> IO a
unmask -> do
    r <- IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
unmask (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ JobserverOptions -> Jobserver -> IO ()
jobserverLoop JobserverOptions
opts Jobserver
sjs
    putMVar loop_finished_mvar $
      case r of
        Left SomeException
e
          | Just AsyncException
ThreadKilled <- SomeException -> Maybe AsyncException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e
          -> Maybe SomeException
forall a. Maybe a
Nothing
          | Bool
otherwise
          -> SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e
        Right () -> Maybe SomeException
forall a. Maybe a
Nothing
  labelThread loop_tid "job_server"
  let
    acquireSem = TVar JobResources -> IO ()
acquireJob TVar JobResources
jobs_tvar
    releaseSem = TVar JobResources -> IO ()
releaseJob TVar JobResources
jobs_tvar
    cleanupSem = do
      -- this is interruptible
      Jobserver -> IO ()
cleanupJobserver Jobserver
sjs
      ThreadId -> IO ()
killThread ThreadId
loop_tid
      mb_ex <- MVar (Maybe SomeException) -> IO (Maybe SomeException)
forall a. MVar a -> IO a
takeMVar MVar (Maybe SomeException)
loop_finished_mvar
      for_ mb_ex MC.throwM

  return (AbstractSem{..}, cleanupSem)

-- | Implement an abstract semaphore using a semaphore 'Jobserver'
-- which queries the system semaphore of the given name for resources.
runJSemAbstractSem :: SemaphoreName         -- ^ the system semaphore to use
                   -> (AbstractSem -> IO a) -- ^ the operation to run
                                            -- which requires a semaphore
                   -> IO a
runJSemAbstractSem :: forall a. SemaphoreName -> (AbstractSem -> IO a) -> IO a
runJSemAbstractSem SemaphoreName
sem AbstractSem -> IO a
action = ((forall a. IO a -> IO a) -> IO a) -> IO a
forall b.
HasCallStack =>
((forall a. IO a -> IO a) -> IO b) -> IO b
forall (m :: * -> *) b.
(MonadMask m, HasCallStack) =>
((forall a. m a -> m a) -> m b) -> m b
MC.mask \ forall a. IO a -> IO a
unmask -> do
  (abs, cleanup) <- SemaphoreName -> IO (AbstractSem, IO ())
makeJobserver SemaphoreName
sem
  r <- try $ unmask $ action abs
  case r of
    Left (SomeException
e1 :: MC.SomeException) -> do
      (_ :: Either MC.SomeException ()) <- IO () -> IO (Either SomeException ())
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try IO ()
cleanup
      MC.throwM e1
    Right a
x -> IO ()
cleanup IO () -> a -> IO a
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> a
x

{- Note [Architecture of the Job Server]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In `-jsem` mode, the amount of parallelism that GHC can use is controlled by a
system semaphore. We take resources from the semaphore when we need them, and
give them back if we don't have enough to do.

A naive implementation would just take and release the semaphore around performing
the action, but this leads to two issues:

* When taking a token in the semaphore, we must call `setNumCapabilities` in order
  to adjust how many capabilities are available for parallel garbage collection.
  This causes unnecessary synchronisations.
* We want to implement a debounce, so that whilst there is pending work in the
  current process we prefer to keep hold of resources from the semaphore.
  This reduces overall memory usage, as there are fewer live GHC processes at once.

Therefore, the obtention of semaphore resources is separated away from the
request for the resource in the driver.

A token from the semaphore is requested using `acquireJob`. This creates a pending
job, which is a MVar that can be filled in to signal that the requested token is ready.

When the job is finished, the token is released by calling `releaseJob`, which just
increases the number of `free` jobs. If there are more pending jobs when the free count
is increased, the token is immediately reused (see `modifyJobResources`).

The `jobServerLoop` interacts with the system semaphore: when there are pending
jobs, `acquireThread` blocks, waiting for a token from the semaphore. Once a
token is obtained, it increases the owned count.

When GHC has free tokens (tokens from the semaphore that it is not using),
no pending jobs, and the debounce has expired, then `releaseThread` will
release tokens back to the global semaphore.

`tryStopThread` attempts to kill threads which are waiting to acquire a resource
when we no longer need it. For example, consider that we attempt to acquire two
tokens, but the first job finishes before we acquire the second token.
This second token is no longer needed, so we should cancel the wait
(as it would not be used to do any work, and not be returned until the debounce).
We only need to kill `acquireJob`, because `releaseJob` never blocks.

Note [Eventlog Messages for jsem]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
It can be tricky to verify that the work is shared adequately across different
processes. To help debug this, we output the values of `JobResource` to the
eventlog whenever the global state changes. There are some scripts which can be used
to analyse this output and report statistics about core saturation in the
GitHub repo (https://github.com/mpickering/ghc-jsem-analyse).

-}