-- | A coroutine based, single threaded scheduler for 'Process'es. module Control.Eff.Concurrent.Process.SingleThreadedScheduler ( scheduleM , schedulePure , scheduleIO , scheduleMonadIOEff , scheduleIOWithLogging , defaultMain , singleThreadedIoScheduler , LoggingAndIo ) where import Control.Concurrent ( yield ) import Control.DeepSeq import Control.Eff import Control.Eff.Lift import Control.Eff.Extend import Control.Eff.Concurrent.Process import Control.Eff.Log import Control.Lens hiding ( (|>) , Empty ) import Control.Monad ( void ) import Control.Monad.IO.Class import qualified Data.Sequence as Seq import Data.Sequence ( Seq(..) ) import qualified Data.Map.Strict as Map import GHC.Stack import Data.Kind ( ) import Data.Dynamic import Data.Maybe -- | Like 'schedule' but /pure/. The @yield@ effect is just @return ()@. -- @schedulePure == runIdentity . 'scheduleM' (Identity . run) (return ())@ -- -- @since 0.3.0.2 schedulePure :: Eff (ConsProcess '[]) a -> Either String a schedulePure = runIdentity . scheduleM (Identity . run) (return ()) -- | Invoke 'schedule' with @lift 'Control.Concurrent.yield'@ as yield effect. -- @scheduleIO runEff == 'scheduleM' (runLift . runEff) (liftIO 'yield')@ -- -- @since 0.4.0.0 scheduleIO :: MonadIO m => (forall b . Eff r b -> Eff '[Lift m] b) -> Eff (ConsProcess r) a -> m (Either String a) scheduleIO runEff = scheduleM (runLift . runEff) (liftIO yield) -- | Invoke 'schedule' with @lift 'Control.Concurrent.yield'@ as yield effect. -- @scheduleMonadIOEff == 'scheduleM' id (liftIO 'yield')@ -- -- @since 0.3.0.2 scheduleMonadIOEff :: MonadIO (Eff r) => Eff (ConsProcess r) a -> Eff r (Either String a) scheduleMonadIOEff = -- schedule (lift yield) scheduleM id (liftIO yield) -- | Run processes that have the 'Logs' and the 'Lift' effects. -- The user must provide a log handler function. -- -- Log messages are evaluated strict. -- -- @scheduleIOWithLogging == 'run' . 'captureLogs' . 'schedule' (return ())@ -- -- @since 0.4.0.0 scheduleIOWithLogging :: (NFData l, MonadIO m) => (l -> m ()) -> Eff (ConsProcess '[Logs l, Lift m]) a -> m (Either String a) scheduleIOWithLogging handleLog = scheduleIO (handleLogsWith handleLog) -- | Handle the 'Process' effect, as well as all lower effects using an effect handler function. -- -- Execute the __main__ 'Process' and all the other processes 'spawn'ed by it in the -- current thread concurrently, using a co-routine based, round-robin -- scheduler. If a process exits with 'exitNormally', 'exitWithError', -- 'raiseError' or is killed by another process @Left ...@ is returned. -- Otherwise, the result will be wrapped in a @Right@. -- -- Every time a process _yields_ the effects are evaluated down to the a value -- of type @m (Either String a)@. -- -- If the evaluator function runs the action down e.g. @IO@ this might improve -- memory consumption, for long running services, with processes that loop -- endlessly. -- -- @since 0.4.0.0 scheduleM :: Monad m => (forall b . Eff r b -> m b) -> m () -- ^ An that performs a __yield__ w.r.t. the underlying effect -- @r@. E.g. if @Lift IO@ is present, this might be: -- @lift 'Control.Concurrent.yield'. -> Eff (ConsProcess r) a -> m (Either String a) scheduleM runEff yieldEff e = do y <- runAsCoroutinePure runEff e handleProcess runEff yieldEff 1 (Map.singleton 0 Seq.empty) (Seq.singleton (y, 0)) -- | Internal data structure that is part of the coroutine based scheduler -- implementation. data OnYield r a where OnYield :: (ResumeProcess () -> Eff r (OnYield r a)) -> OnYield r a OnSelf :: (ResumeProcess ProcessId -> Eff r (OnYield r a)) -> OnYield r a OnSpawn :: Eff (Process r ': r) () -> (ResumeProcess ProcessId -> Eff r (OnYield r a)) -> OnYield r a OnDone :: !a -> OnYield r a OnShutdown :: OnYield r a OnExitError :: !String -> OnYield r a OnRaiseError :: !String -> OnYield r a OnSend :: !ProcessId -> !Dynamic -> (ResumeProcess Bool -> Eff r (OnYield r a)) -> OnYield r a OnRecv :: (ResumeProcess Dynamic -> Eff r (OnYield r a)) -> OnYield r a OnSendShutdown :: !ProcessId -> (ResumeProcess Bool -> Eff r (OnYield r a)) -> OnYield r a -- | Internal 'Process' handler function. handleProcess :: Monad m => (forall a . Eff r a -> m a) -> m () -> ProcessId -> Map.Map ProcessId (Seq Dynamic) -> Seq (OnYield r finalResult, ProcessId) -> m (Either String finalResult) handleProcess _runEff _yieldEff _newPid _msgQs Empty = return $ Left "no main process" handleProcess runEff yieldEff !newPid !msgQs allProcs@((!processState, !pid) :<| rest) = let handleExit res = if pid == 0 then return res else handleProcess runEff yieldEff newPid (msgQs & at pid .~ Nothing) rest in case processState of OnDone r -> handleExit (Right r) OnShutdown -> handleExit (Left "process exited normally") OnRaiseError errM -> handleExit (Left errM) OnExitError errM -> handleExit (Left errM) OnSendShutdown targetPid k -> do let allButTarget = Seq.filter (\(_, e) -> e /= pid && e /= targetPid) allProcs targets = Seq.filter (\(_, e) -> e == targetPid) allProcs suicide = targetPid == pid targetFound = suicide || not (Seq.null targets) if suicide then do nextK <- runEff $ k ShutdownRequested handleProcess runEff yieldEff newPid msgQs (rest :|> (nextK, pid)) else do let deliverTheGoodNews (targetState, tPid) = do nextTargetState <- case targetState of OnSendShutdown _ tk -> tk ShutdownRequested OnYield tk -> tk ShutdownRequested OnSelf tk -> tk ShutdownRequested OnSend _ _ tk -> tk ShutdownRequested OnRecv tk -> tk ShutdownRequested OnSpawn _ tk -> tk ShutdownRequested OnDone x -> return (OnDone x) OnShutdown -> return OnShutdown OnExitError er -> return (OnExitError er) OnRaiseError er -> return (OnExitError er) return (nextTargetState, tPid) nextTargets <- runEff $ traverse deliverTheGoodNews targets nextK <- runEff $ k (ResumeWith targetFound) handleProcess runEff yieldEff newPid msgQs (allButTarget Seq.>< (nextTargets :|> (nextK, pid))) OnSelf k -> do nextK <- runEff $ k (ResumeWith pid) handleProcess runEff yieldEff newPid msgQs (rest :|> (nextK, pid)) OnYield k -> do yieldEff nextK <- runEff $ k (ResumeWith ()) handleProcess runEff yieldEff newPid msgQs (rest :|> (nextK, pid)) OnSend toPid msg k -> do nextK <- runEff $ k (ResumeWith (msgQs ^. at toPid . to isJust)) handleProcess runEff yieldEff newPid (msgQs & at toPid . _Just %~ (:|> msg)) (rest :|> (nextK, pid)) recv@(OnRecv k) -> case msgQs ^. at pid of Nothing -> do nextK <- runEff $ k (OnError (show pid ++ " has no message queue!")) handleProcess runEff yieldEff newPid msgQs (rest :|> (nextK, pid)) Just Empty -> if Seq.length rest == 0 then do nextK <- runEff $ k (OnError ("Process " ++ show pid ++ " deadlocked!")) handleProcess runEff yieldEff newPid msgQs (rest :|> (nextK, pid)) else handleProcess runEff yieldEff newPid msgQs (rest :|> (recv, pid)) Just (nextMessage :<| restMessages) -> do nextK <- runEff $ k (ResumeWith nextMessage) handleProcess runEff yieldEff newPid (msgQs & at pid . _Just .~ restMessages) (rest :|> (nextK, pid)) OnSpawn f k -> do nextK <- runEff $ k (ResumeWith newPid) fk <- runAsCoroutinePure runEff (f >> exitNormally SP) handleProcess runEff yieldEff (newPid + 1) (msgQs & at newPid ?~ Seq.empty) (rest :|> (nextK, pid) :|> (fk, newPid)) runAsCoroutinePure :: forall v r m . Monad m => (forall a . Eff r a -> m a) -> Eff (ConsProcess r) v -> m (OnYield r v) runAsCoroutinePure runEff = runEff . handle_relay (return . OnDone) cont where cont :: Process r x -> (x -> Eff r (OnYield r v)) -> Eff r (OnYield r v) cont YieldProcess k = return (OnYield k) cont SelfPid k = return (OnSelf k) cont (Spawn e) k = return (OnSpawn e k) cont Shutdown _k = return OnShutdown cont (ExitWithError !e ) _k = return (OnExitError e) cont (RaiseError !e ) _k = return (OnRaiseError e) cont (SendMessage !tp !msg) k = return (OnSend tp msg k) cont ReceiveMessage k = return (OnRecv k) cont (SendShutdown !pid) k = return (OnSendShutdown pid k) -- | The concrete list of 'Eff'ects for running this pure scheduler on @IO@ and -- with string logging. type LoggingAndIo = '[ Logs String , Lift IO ] -- | A 'SchedulerProxy' for 'LoggingAndIo'. singleThreadedIoScheduler :: SchedulerProxy LoggingAndIo singleThreadedIoScheduler = SchedulerProxy -- | Execute a 'Process' using 'schedule' on top of 'Lift' @IO@ and 'Logs' -- @String@ effects. defaultMain :: HasCallStack => Eff '[Process '[Logs String, Lift IO], Logs String, Lift IO] () -> IO () defaultMain e = void $ runLift $ handleLogsWithLoggingTHandler (scheduleMonadIOEff e) ($! putStrLn)