-- | Implement Erlang style message passing concurrency.
--
-- This module contains 'spawn' which handles the 'Process' effects, using
-- 'STM.TQueue's and 'Control.Concurrent.Async.withAsync'.
--
-- This aims to be a pragmatic implementation, so even logging is
-- supported.
--
-- At the core is a /main process/ that enters 'schedule'
-- and creates all of the internal state stored in 'STM.TVar's
-- to manage processes with message queues.
--
module Control.Eff.Concurrent.Process.ForkIOScheduler
  ( schedule
  , defaultMain
  , defaultMainWithLogWriter
  , ProcEff
  , InterruptableProcEff
  , SchedulerIO
  , HasSchedulerIO
  ) where

import Control.Concurrent (yield)
import qualified Control.Concurrent.Async as Async
import Control.Concurrent.Async (Async(..))
import Control.Concurrent.STM as STM
import Control.Eff
import Control.Eff.Concurrent.Process
import qualified Control.Eff.ExceptionExtra as ExcExtra ()
import Control.Eff.Extend
import Control.Eff.Log
import Control.Eff.Reader.Strict as Reader
import Control.Exception.Safe as Safe
import Control.Lens
import Control.Monad (void, when)
import Control.Monad.Trans.Control (MonadBaseControl(..), control)
import Data.Default
import Data.Dynamic
import Data.Foldable
import Data.Function (fix)
import Data.Kind ()
import Data.Map (Map)
import qualified Data.Map as Map
import Data.Maybe
import Data.Sequence (Seq(..))
import qualified Data.Sequence as Seq
import Data.Set (Set)
import qualified Data.Set as Set
import GHC.Stack
import System.Timeout
import Text.Printf

-- * Process Types

-- | A message queue of a process, contains the actual queue and maybe an
-- exit reason. The message queue is backed by a 'Seq' sequence with 'Dynamic' values.
data MessageQ = MessageQ
  { _incomingMessages :: Seq Dynamic
  , _shutdownRequests :: Maybe SomeExitReason
  }

instance Default MessageQ where
  def = MessageQ def def

makeLenses ''MessageQ

-- | Return any '_shutdownRequests' from a 'MessageQ' in a 'TVar' and
-- reset the '_shutdownRequests' field to 'Nothing' in the 'TVar'.
tryTakeNextShutdownRequestSTM :: TVar MessageQ -> STM (Maybe SomeExitReason)
tryTakeNextShutdownRequestSTM mqVar = do
  mq <- readTVar mqVar
  when (isJust (mq ^. shutdownRequests)) (writeTVar mqVar (mq & shutdownRequests .~ Nothing))
  return (mq ^. shutdownRequests)

-- | Information about a process, needed to implement
-- 'Process' handlers. The message queue is backed by a 'STM.TVar' that contains
-- a 'MessageQ'.
data ProcessInfo = ProcessInfo
  { _processId :: ProcessId
  , _processState :: TVar ProcessState
  , _messageQ :: TVar MessageQ
  , _processLinks :: TVar (Set ProcessId)
  }

makeLenses ''ProcessInfo

-- * Scheduler Types

-- | Contains all process info'elements, as well as the state needed to
-- implement inter process communication.
data SchedulerState = SchedulerState
  { _nextPid :: TVar ProcessId
  , _processTable :: TVar (Map ProcessId ProcessInfo)
  , _processCancellationTable :: TVar (Map ProcessId (Async (ExitReason 'NoRecovery)))
  , _processMonitors :: TVar (Set (MonitorReference, ProcessId))  -- ^ Set of monitors and monitor owners
  , _nextMonitorIndex :: TVar Int
  }

makeLenses ''SchedulerState

-- | Add monitor: If the process is dead, enqueue a 'ProcessDown' message into the
-- owners message queue
addMonitoring :: ProcessId -> ProcessId -> SchedulerState -> STM MonitorReference
addMonitoring target owner schedulerState = do
  aNewMonitorIndex <- readTVar (schedulerState ^. nextMonitorIndex)
  modifyTVar' (schedulerState ^. nextMonitorIndex) (+ 1)
  let monitorRef = MonitorReference aNewMonitorIndex target
  when (target /= owner) $ do
    pt <- readTVar (schedulerState ^. processTable)
    if Map.member target pt
      then modifyTVar' (schedulerState ^. processMonitors) (Set.insert (monitorRef, owner))
      else let processDownMessage = ProcessDown monitorRef (SomeExitReason (ProcessNotRunning target))
            in enqueueMessageOtherProcess owner (toDyn processDownMessage) schedulerState
  return monitorRef

removeMonitoring :: MonitorReference -> SchedulerState -> STM ()
removeMonitoring monitorRef schedulerState =
  modifyTVar' (schedulerState ^. processMonitors) (Set.filter (\(ref, _) -> ref /= monitorRef))

triggerAndRemoveMonitor :: ProcessId -> SomeExitReason -> SchedulerState -> STM ()
triggerAndRemoveMonitor downPid reason schedulerState = do
  monRefs <- readTVar (schedulerState ^. processMonitors)
  traverse_ go monRefs
  where
    go (mr, owner) =
      when
        (monitoredProcess mr == downPid)
        (do let processDownMessage = ProcessDown mr reason
            enqueueMessageOtherProcess owner (toDyn processDownMessage) schedulerState
            removeMonitoring mr schedulerState)

-- * Process Implementation
instance Show ProcessInfo where
  show p = "process info: " ++ show (p ^. processId)

-- | Create a new 'ProcessInfo'
newProcessInfo :: HasCallStack => ProcessId -> STM ProcessInfo
newProcessInfo a = ProcessInfo a <$> newTVar ProcessBooting <*> newTVar def <*> newTVar def

-- * Scheduler Implementation

-- | Create a new 'SchedulerState'
newSchedulerState :: HasCallStack => STM SchedulerState
newSchedulerState = SchedulerState <$> newTVar 1 <*> newTVar def <*> newTVar def <*> newTVar def <*> newTVar def

-- | Create a new 'SchedulerState' run an IO action, catching all exceptions,
-- and when the actions returns, clean up and kill all processes.
withNewSchedulerState :: (HasCallStack) => Eff SchedulerIO () -> Eff LoggingAndIo ()
withNewSchedulerState mainProcessAction =
  Safe.bracketWithError
    (lift (atomically newSchedulerState))
    (\exceptions schedulerState -> do
       traverse_ (logError . ("scheduler setup crashed with: " ++) . Safe.displayException) exceptions
       logDebug "scheduler cleanup begin"
       runReader schedulerState tearDownScheduler)
    (\schedulerState -> do
       logDebug "scheduler loop entered"
       x <- runReader schedulerState mainProcessAction
       logDebug "scheduler loop returned"
       return x)
  where
    tearDownScheduler :: Eff SchedulerIO ()
    tearDownScheduler = do
      schedulerState <- getSchedulerState
      let cancelTableVar = schedulerState ^. processCancellationTable
      -- cancel all processes
      allProcesses <- lift (atomically (readTVar cancelTableVar <* writeTVar cancelTableVar def))
      logNotice ("cancelling processes: " ++ show (toListOf (ifolded . asIndex) allProcesses))
      void
        (liftBaseWith
           (\runS ->
              timeout
                5000000
                (Async.mapConcurrently
                   (\a -> do
                      Async.cancel a
                      runS (logNotice ("process cancelled: " ++ show (asyncThreadId a))))
                   allProcesses >>
                 runS (logNotice "all processes cancelled"))))

-- | The concrete list of 'Eff'ects of processes compatible with this scheduler.
-- This builds upon 'SchedulerIO'.
type ProcEff = ConsProcess SchedulerIO

-- | The concrete list of the effects, that the 'Process' uses
type InterruptableProcEff = InterruptableProcess SchedulerIO

-- | Type class constraint to indicate that an effect union contains the
-- effects required by every process and the scheduler implementation itself.
type HasSchedulerIO r = (HasCallStack, Lifted IO r, SchedulerIO <:: r)

-- | The concrete list of 'Eff'ects for this scheduler implementation.
type SchedulerIO = (Reader SchedulerState : LoggingAndIo)

-- | Start the message passing concurrency system then execute a 'Process' on
-- top of 'SchedulerIO' effect. All logging is sent to standard output.
defaultMain :: HasCallStack => Eff InterruptableProcEff () -> IO ()
defaultMain = defaultMainWithLogWriter consoleLogWriter

-- | Start the message passing concurrency system then execute a 'Process' on
-- top of 'SchedulerIO' effect. All logging is sent to standard output.
defaultMainWithLogWriter :: HasCallStack => LogWriter IO -> Eff InterruptableProcEff () -> IO ()
defaultMainWithLogWriter lw =
  runLift . withSomeLogging . withAsyncLogging (1024 :: Int) lw . schedule

-- ** Process Execution

handleProcess ::
     (HasCallStack) => ProcessInfo -> Eff ProcEff (ExitReason 'NoRecovery) -> Eff SchedulerIO (ExitReason 'NoRecovery)
handleProcess myProcessInfo actionToRun =
  fix (handle_relay' singleStep (const . const (return ExitNormally))) actionToRun 0
  where
    singleStep ::
         (Eff ProcEff xx -> (Int -> Eff SchedulerIO (ExitReason 'NoRecovery)))
      -> Arrs ProcEff x xx
      -> Process SchedulerIO x
      -> (Int -> Eff SchedulerIO (ExitReason 'NoRecovery))
    singleStep k q p !nextRef = stepProcessInterpreter nextRef p (\nextNextRef x -> k (qApp q x) nextNextRef) return
    myPid = myProcessInfo ^. processId
    myProcessStateVar = myProcessInfo ^. processState
    setMyProcessState = lift . atomically . setMyProcessStateSTM
  -- DEBUG variant:
  -- setMyProcessState st = do
  --  oldSt <- lift (atomically (readTVar myProcessStateVar <* setMyProcessStateSTM st))
  --  logDebug ("state change: "++ show oldSt ++ " -> " ++ show st)
    setMyProcessStateSTM = writeTVar myProcessStateVar
    myMessageQVar = myProcessInfo ^. messageQ
    kontinueWith ::
         forall s v a. HasCallStack
      => (s -> Arr SchedulerIO v a)
      -> (s -> Arr SchedulerIO v a)
    kontinueWith kontinue !nextRef !result = do
      setMyProcessState ProcessIdle
      lift yield
      kontinue nextRef result
    diskontinueWith ::
         forall a. HasCallStack
      => Arr SchedulerIO (ExitReason 'NoRecovery) a
      -> Arr SchedulerIO (ExitReason 'NoRecovery) a
    diskontinueWith diskontinue !reason = do
      setMyProcessState ProcessShuttingDown
      diskontinue reason
    stepProcessInterpreter ::
         forall v a. HasCallStack
      => Int
      -> Process SchedulerIO v
      -> (Int -> Arr SchedulerIO v a)
      -> Arr SchedulerIO (ExitReason 'NoRecovery) a
      -> Eff SchedulerIO a
    stepProcessInterpreter !nextRef !request kontinue diskontinue
      -- handle process shutdown requests:
      --   1. take process exit reason
      --   2. set process state to ProcessShuttingDown
      --   3. apply kontinue to (Right Interrupted)
      --
     =
      tryTakeNextShutdownRequest >>=
      maybe noShutdownRequested (either onShutdownRequested onInterruptRequested . fromSomeExitReason)
      where
        tryTakeNextShutdownRequest = lift (atomically (tryTakeNextShutdownRequestSTM myMessageQVar))
        onShutdownRequested shutdownRequest = do
          logDebug ("shutdown requested: " ++ show shutdownRequest)
          setMyProcessState ProcessShuttingDown
          interpretRequestAfterShutdownRequest (diskontinueWith diskontinue) shutdownRequest request
        onInterruptRequested interruptRequest = do
          logDebug ("interrupt requested: " ++ show interruptRequest)
          setMyProcessState ProcessShuttingDown
          interpretRequestAfterInterruptRequest
            (kontinueWith kontinue nextRef)
            (diskontinueWith diskontinue)
            interruptRequest
            request
        noShutdownRequested = do
          setMyProcessState ProcessBusy
          interpretRequest (kontinueWith kontinue) (diskontinueWith diskontinue) nextRef request
  -- This gets no nextRef and may not pass a Left value to the continuation.
  -- This forces the caller to defer the process exit to the next request
  -- and hence ensures that the scheduler code cannot forget to allow the
  -- client code to react to a shutdown request.
    interpretRequestAfterShutdownRequest ::
         forall v a. HasCallStack
      => Arr SchedulerIO (ExitReason 'NoRecovery) a
      -> ExitReason 'NoRecovery
      -> Process SchedulerIO v
      -> Eff SchedulerIO a
    interpretRequestAfterShutdownRequest diskontinue shutdownRequest =
      \case
        SendMessage _ _ -> diskontinue shutdownRequest
        SendInterrupt _ _ -> diskontinue shutdownRequest
        SendShutdown toPid r ->
          if toPid == myPid
            then diskontinue r
            else diskontinue shutdownRequest
        Spawn _ -> diskontinue shutdownRequest
        SpawnLink _ -> diskontinue shutdownRequest
        ReceiveSelectedMessage _ -> diskontinue shutdownRequest
        FlushMessages -> diskontinue shutdownRequest
        SelfPid -> diskontinue shutdownRequest
        MakeReference -> diskontinue shutdownRequest
        YieldProcess -> diskontinue shutdownRequest
        Shutdown r -> diskontinue r
        GetProcessState _ -> diskontinue shutdownRequest
        Monitor _ -> diskontinue shutdownRequest
        Demonitor _ -> diskontinue shutdownRequest
        Link _ -> diskontinue shutdownRequest
        Unlink _ -> diskontinue shutdownRequest
    interpretRequestAfterInterruptRequest ::
         forall v a. HasCallStack
      => Arr SchedulerIO v a
      -> Arr SchedulerIO (ExitReason 'NoRecovery) a
      -> ExitReason 'Recoverable
      -> Process SchedulerIO v
      -> Eff SchedulerIO a
    interpretRequestAfterInterruptRequest kontinue diskontinue interruptRequest =
      \case
        SendMessage _ _ -> kontinue (Interrupted interruptRequest)
        SendInterrupt _ _ -> kontinue (Interrupted interruptRequest)
        SendShutdown toPid r ->
          if toPid == myPid
            then diskontinue r
            else kontinue (Interrupted interruptRequest)
        Spawn _ -> kontinue (Interrupted interruptRequest)
        SpawnLink _ -> kontinue (Interrupted interruptRequest)
        ReceiveSelectedMessage _ -> kontinue (Interrupted interruptRequest)
        FlushMessages -> kontinue (Interrupted interruptRequest)
        SelfPid -> kontinue (Interrupted interruptRequest)
        MakeReference -> kontinue (Interrupted interruptRequest)
        YieldProcess -> kontinue (Interrupted interruptRequest)
        Shutdown r -> diskontinue r
        GetProcessState _ -> kontinue (Interrupted interruptRequest)
        Monitor _ -> kontinue (Interrupted interruptRequest)
        Demonitor _ -> kontinue (Interrupted interruptRequest)
        Link _ -> kontinue (Interrupted interruptRequest)
        Unlink _ -> kontinue (Interrupted interruptRequest)
    interpretRequest ::
         forall v a. HasCallStack
      => (Int -> Arr SchedulerIO v a)
      -> Arr SchedulerIO (ExitReason 'NoRecovery) a
      -> Int
      -> Process SchedulerIO v
      -> Eff SchedulerIO a
    interpretRequest kontinue diskontinue nextRef =
      \case
        SendMessage toPid msg -> interpretSend toPid msg >>= kontinue nextRef . ResumeWith
        SendInterrupt toPid msg ->
          if toPid == myPid
            then kontinue nextRef (Interrupted msg)
            else interpretSendShutdownOrInterrupt toPid (SomeExitReason msg) >>= kontinue nextRef . ResumeWith
        SendShutdown toPid msg ->
          if toPid == myPid
            then diskontinue msg
            else interpretSendShutdownOrInterrupt toPid (SomeExitReason msg) >>= kontinue nextRef . ResumeWith
        Spawn child -> spawnNewProcess Nothing child >>= kontinue nextRef . ResumeWith . fst
        SpawnLink child -> spawnNewProcess (Just myProcessInfo) child >>= kontinue nextRef . ResumeWith . fst
        ReceiveSelectedMessage f -> interpretReceive f >>= either diskontinue (kontinue nextRef)
        FlushMessages -> interpretFlush >>= kontinue nextRef
        SelfPid -> kontinue nextRef (ResumeWith myPid)
        MakeReference -> kontinue (nextRef + 1) (ResumeWith nextRef)
        YieldProcess -> kontinue nextRef (ResumeWith ())
        Shutdown r -> diskontinue r
        GetProcessState toPid -> interpretGetProcessState toPid >>= kontinue nextRef . ResumeWith
        Monitor target -> interpretMonitor target >>= kontinue nextRef . ResumeWith
        Demonitor ref -> interpretDemonitor ref >>= kontinue nextRef . ResumeWith
        Link toPid -> interpretLink toPid >>= kontinue nextRef . either Interrupted ResumeWith
        Unlink toPid -> interpretUnlink toPid >>= kontinue nextRef . ResumeWith
      where
        interpretMonitor !target = do
          setMyProcessState ProcessBusyMonitoring
          schedulerState <- getSchedulerState
          lift (atomically (addMonitoring target myPid schedulerState))
        interpretDemonitor !ref = do
          setMyProcessState ProcessBusyMonitoring
          schedulerState <- getSchedulerState
          lift (atomically (removeMonitoring ref schedulerState))
        interpretUnlink !toPid = do
          setMyProcessState ProcessBusyUnlinking
          schedulerState <- getSchedulerState
          let procInfoVar = schedulerState ^. processTable
          lift $
            atomically $ do
              procInfo <- readTVar procInfoVar
              traverse_
                (\toProcInfo -> modifyTVar' (toProcInfo ^. processLinks) (Set.delete myPid))
                (procInfo ^. at toPid)
              modifyTVar' (myProcessInfo ^. processLinks) (Set.delete toPid)
        interpretGetProcessState !toPid = do
          setMyProcessState ProcessBusy
          schedulerState <- getSchedulerState
          let procInfoVar = schedulerState ^. processTable
          lift $
            atomically $ do
              procInfoTable <- readTVar procInfoVar
              traverse (\toProcInfo -> readTVar (toProcInfo ^. processState)) (procInfoTable ^. at toPid)
        interpretLink !toPid = do
          setMyProcessState ProcessBusyLinking
          schedulerState <- getSchedulerState
          let procInfoVar = schedulerState ^. processTable
          lift $
            atomically $ do
              procInfoTable <- readTVar procInfoVar
              case procInfoTable ^. at toPid of
                Just toProcInfo -> do
                  modifyTVar' (toProcInfo ^. processLinks) (Set.insert myPid)
                  modifyTVar' (myProcessInfo ^. processLinks) (Set.insert toPid)
                  return (Right ())
                Nothing -> return (Left (LinkedProcessCrashed toPid))
        interpretSend !toPid msg =
          setMyProcessState ProcessBusySending *> getSchedulerState >>=
          lift . atomically . enqueueMessageOtherProcess toPid msg
        interpretSendShutdownOrInterrupt !toPid !msg =
          setMyProcessState
            (either (const ProcessBusySendingShutdown) (const ProcessBusySendingInterrupt) (fromSomeExitReason msg)) *>
          getSchedulerState >>=
          lift . atomically . enqueueShutdownRequest toPid msg
        interpretFlush :: Eff SchedulerIO (ResumeProcess [Dynamic])
        interpretFlush = do
          setMyProcessState ProcessBusyReceiving
          lift $
            atomically $ do
              myMessageQ <- readTVar myMessageQVar
              modifyTVar' myMessageQVar (incomingMessages .~ Seq.Empty)
              return (ResumeWith (toList (myMessageQ ^. incomingMessages)))
        interpretReceive :: MessageSelector b -> Eff SchedulerIO (Either (ExitReason 'NoRecovery) (ResumeProcess b))
        interpretReceive f = do
          setMyProcessState ProcessBusyReceiving
          lift $
            atomically $ do
              myMessageQ <- readTVar myMessageQVar
              case myMessageQ ^. shutdownRequests of
                Nothing ->
                  case partitionMessages (myMessageQ ^. incomingMessages) Seq.Empty of
                    Nothing -> retry
                    Just (selectedMessage, otherMessages) -> do
                      modifyTVar' myMessageQVar (incomingMessages .~ otherMessages)
                      return (Right (ResumeWith selectedMessage))
                Just shutdownRequest -> do
                  modifyTVar' myMessageQVar (shutdownRequests .~ Nothing)
                  case fromSomeExitReason shutdownRequest of
                    Left sr -> return (Left sr)
                    Right ir -> return (Right (Interrupted ir))
          where
            partitionMessages Seq.Empty _acc = Nothing
            partitionMessages (m :<| msgRest) acc =
              maybe
                (partitionMessages msgRest (acc :|> m))
                (\res -> Just (res, acc Seq.>< msgRest))
                (runMessageSelector f m)

-- | This is the main entry point to running a message passing concurrency
-- application. This function takes a 'Process' on top of the 'SchedulerIO'
-- effect for concurrent logging.
schedule :: (HasCallStack) => Eff InterruptableProcEff () -> Eff LoggingAndIo ()
schedule procEff =
  liftBaseWith
    (\runS ->
       Async.withAsync
         (runS $
          withNewSchedulerState $ do
            (_, mainProcAsync) <-
              spawnNewProcess Nothing $ do
                logNotice "++++++++ main process started ++++++++"
                provideInterruptsShutdown procEff
                logNotice "++++++++ main process returned ++++++++"
            lift (void (Async.wait mainProcAsync)))
         (\ast ->
            runS $ do
              a <- restoreM ast
              void $ lift (Async.wait a))) >>=
  restoreM

spawnNewProcess ::
     (HasCallStack)
  => Maybe ProcessInfo
  -> Eff ProcEff ()
  -> Eff SchedulerIO (ProcessId, Async (ExitReason 'NoRecovery))
spawnNewProcess mLinkedParent mfa = do
  schedulerState <- getSchedulerState
  procInfo <- allocateProcInfo schedulerState
  traverse_ (linkToParent procInfo) mLinkedParent
  procAsync <- doForkProc procInfo schedulerState
  return (procInfo ^. processId, procAsync)
  where
    linkToParent toProcInfo parent = do
      let toPid = toProcInfo ^. processId
          parentPid = parent ^. processId
      lift $
        atomically $ do
          modifyTVar' (toProcInfo ^. processLinks) (Set.insert parentPid)
          modifyTVar' (parent ^. processLinks) (Set.insert toPid)
    allocateProcInfo schedulerState =
      lift
        (atomically
           (do let nextPidVar = schedulerState ^. nextPid
                   processInfoVar = schedulerState ^. processTable
               pid <- readTVar nextPidVar
               modifyTVar' nextPidVar (+ 1)
               procInfo <- newProcessInfo pid
               modifyTVar' processInfoVar (at pid ?~ procInfo)
               return procInfo))
    logAppendProcInfo pid =
      let addProcessId = over lmProcessId (maybe (Just (printf "% 9s" (show pid))) Just)
       in censorLogs @IO addProcessId
    triggerProcessLinksAndMonitors :: ProcessId -> ExitReason e -> TVar (Set ProcessId) -> Eff SchedulerIO ()
    triggerProcessLinksAndMonitors !pid !reason !linkSetVar = do
      schedulerState <- getSchedulerState
      lift $ atomically $ triggerAndRemoveMonitor pid (SomeExitReason reason) schedulerState
      let exitSeverity = toExitSeverity reason
          sendIt !linkedPid = do
            let msg = SomeExitReason (LinkedProcessCrashed pid)
            lift $
              atomically $ do
                procInfoTable <- readTVar (schedulerState ^. processTable)
                let mLinkedProcInfo = procInfoTable ^? ix linkedPid
                case mLinkedProcInfo of
                  Nothing -> return (Left linkedPid)
                  Just linkedProcInfo ->
                    let linkedMsgQVar = linkedProcInfo ^. messageQ
                        linkedLinkSetVar = linkedProcInfo ^. processLinks
                     in do linkedLinkSet <- readTVar linkedLinkSetVar
                           if Set.member pid linkedLinkSet
                             then do
                               writeTVar linkedLinkSetVar (Set.delete pid linkedLinkSet)
                               when (exitSeverity == Crash) (modifyTVar' linkedMsgQVar (shutdownRequests ?~ msg))
                               return (Right linkedPid)
                             else return (Left linkedPid)
      linkedPids <-
        lift
          (atomically
             (do linkSet <- readTVar linkSetVar
                 writeTVar linkSetVar def
                 return linkSet))
      res <- traverse sendIt (toList linkedPids)
      traverse_
        (logDebug . either (("linked process no found: " ++) . show) (("sent shutdown to linked process: " ++) . show))
        res
    doForkProc :: ProcessInfo -> SchedulerState -> Eff SchedulerIO (Async (ExitReason 'NoRecovery))
    doForkProc procInfo schedulerState =
      control
        (\inScheduler -> do
           let cancellationsVar = schedulerState ^. processCancellationTable
               processInfoVar = schedulerState ^. processTable
               pid = procInfo ^. processId
           procAsync <-
             Async.async
               (inScheduler
                  (logAppendProcInfo
                     pid
                     (Safe.bracketWithError
                        (logDebug "enter process")
                        (\mExc () -> do
                           lift
                             (atomically
                                (do modifyTVar' processInfoVar (at pid .~ Nothing)
                                    modifyTVar' cancellationsVar (at pid .~ Nothing)))
                           traverse_ (\exc -> logExitAndTriggerLinksAndMonitors (exitReasonFromException exc) pid) mExc)
                        (const
                           (do res <- handleProcess procInfo (mfa >> return ExitNormally)
                               logExitAndTriggerLinksAndMonitors res pid)))))
           atomically (modifyTVar' cancellationsVar (at pid ?~ procAsync))
           return procAsync)
      where
        exitReasonFromException exc =
          case Safe.fromException exc of
            Just Async.AsyncCancelled -> Killed
            Nothing -> UnexpectedException (prettyCallStack callStack) (Safe.displayException exc)
        logExitAndTriggerLinksAndMonitors reason pid = do
          triggerProcessLinksAndMonitors pid reason (procInfo ^. processLinks)
          logProcessExit reason
          return reason

-- * Scheduler Accessor
getSchedulerState :: HasSchedulerIO r => Eff r SchedulerState
getSchedulerState = ask

enqueueMessageOtherProcess :: HasCallStack => ProcessId -> Dynamic -> SchedulerState -> STM ()
enqueueMessageOtherProcess toPid msg schedulerState =
  view (at toPid) <$> readTVar (schedulerState ^. processTable) >>=
  maybe
    (return ())
    (\toProcessTable -> do
       modifyTVar' (toProcessTable ^. messageQ) (incomingMessages %~ (:|> msg))
       return ())

enqueueShutdownRequest :: HasCallStack => ProcessId -> SomeExitReason -> SchedulerState -> STM ()
enqueueShutdownRequest toPid msg schedulerState =
  view (at toPid) <$> readTVar (schedulerState ^. processTable) >>=
  maybe
    (return ())
    (\toProcessTable -> do
       modifyTVar' (toProcessTable ^. messageQ) (shutdownRequests ?~ msg)
       return ())