module Control.Eff.Concurrent.Process.SingleThreadedScheduler
( schedule
, defaultMain
, singleThreadedIoScheduler
, LoggingAndIo) where
import Control.Eff
import Control.Eff.Lift
import Control.Eff.Log
import Control.Eff.Concurrent.Process
import Control.Lens hiding ((|>), Empty)
import qualified Data.Sequence as Seq
import Data.Sequence (Seq (..))
import qualified Data.Map.Strict as Map
import GHC.Stack
import Data.Kind ()
import Data.Dynamic
import Data.Maybe
import Control.Monad
schedule
:: forall r . Eff (Process r ': r) ()
-> Eff r ()
schedule mainProcessAction =
do
y <- runAsCoroutine mainProcessAction
go 1 (Map.singleton 0 Seq.empty) (Seq.singleton (y, 0))
where
go :: ProcessId -> Map.Map ProcessId (Seq Dynamic) -> Seq (OnYield r, ProcessId) -> Eff r ()
go _newPid _msgQs Empty = return ()
go newPid msgQs allProcs@((processState, pid) :<| rest) =
let handleExit =
if pid == 0 then return ()
else go newPid
(msgQs & at pid .~ Nothing)
rest
in case processState of
OnDone -> handleExit
OnRaiseError _ -> handleExit
OnExitError _ -> handleExit
OnSendShutdown targetPid k ->
do let allButTarget =
Seq.filter (\(_,e) -> e /= pid && e /= targetPid) allProcs
targets =
Seq.filter (\(_,e) -> e == targetPid) allProcs
suicide = targetPid == pid
targetFound = suicide || not (Seq.null targets)
if suicide
then
do nextK <- k ShutdownRequested
go newPid
msgQs
(rest :|> (nextK, pid))
else
do let deliverTheGoodNews (targetState, tPid) =
do nextTargetState <-
case targetState of
OnSendShutdown _ tk ->
tk ShutdownRequested
OnSelf tk ->
tk ShutdownRequested
OnSend _ _ tk ->
tk ShutdownRequested
OnRecv tk ->
tk ShutdownRequested
OnSpawn _ tk ->
tk ShutdownRequested
OnDone -> return OnDone
OnExitError er -> return (OnExitError er)
OnRaiseError er -> return (OnExitError er)
return (nextTargetState, tPid)
nextTargets <- traverse deliverTheGoodNews targets
nextK <- k (ResumeWith targetFound)
go newPid
msgQs
(allButTarget Seq.>< (nextTargets :|> (nextK, pid)))
OnSelf k ->
do nextK <- k (ResumeWith pid)
go newPid
msgQs
(rest :|> (nextK, pid))
OnSend toPid msg k ->
do nextK <- k (ResumeWith (msgQs ^. at toPid . to isJust))
go newPid
(msgQs & at toPid . _Just %~ (:|> msg))
(rest :|> (nextK, pid))
recv@(OnRecv k) ->
case msgQs ^. at pid of
Nothing ->
do nextK <- k (OnError (show pid ++ " has no message queue!"))
go newPid msgQs (rest :|> (nextK, pid))
Just Empty ->
if Seq.length rest == 0 then
do nextK <- k (OnError ("Process " ++ show pid ++ " deadlocked!"))
go newPid msgQs (rest :|> (nextK, pid))
else
go newPid msgQs (rest :|> (recv, pid))
Just (nextMessage :<| restMessages) ->
do nextK <- k (ResumeWith nextMessage)
go newPid
(msgQs & at pid . _Just .~ restMessages)
(rest :|> (nextK, pid))
OnSpawn f k ->
do nextK <- k (ResumeWith newPid)
fk <- runAsCoroutine f
go (newPid + 1)
(msgQs & at newPid .~ Just Seq.empty)
( rest :|> (nextK, pid) :|> (fk, newPid))
data OnYield r where
OnSelf :: (ResumeProcess ProcessId -> Eff r (OnYield r))
-> OnYield r
OnSpawn :: Eff (Process r ': r) ()
-> (ResumeProcess ProcessId -> Eff r (OnYield r))
-> OnYield r
OnDone :: OnYield r
OnExitError :: String -> OnYield r
OnRaiseError :: String -> OnYield r
OnSend :: ProcessId -> Dynamic
-> (ResumeProcess Bool -> Eff r (OnYield r))
-> OnYield r
OnRecv :: (ResumeProcess Dynamic -> Eff r (OnYield r))
-> OnYield r
OnSendShutdown :: ProcessId -> (ResumeProcess Bool -> Eff r (OnYield r)) -> OnYield r
runAsCoroutine :: forall r v . Eff (Process r ': r) v
-> Eff r (OnYield r)
runAsCoroutine m = handle_relay (const $ return OnDone) cont m
where
cont :: Process r x -> (x -> Eff r (OnYield r)) -> Eff r (OnYield r)
cont SelfPid k = return (OnSelf k)
cont (Spawn e) k = return (OnSpawn e k)
cont Shutdown _k = return OnDone
cont (ExitWithError e) _k = return (OnExitError e)
cont (RaiseError e) _k = return (OnRaiseError e)
cont (SendMessage tp msg) k = return (OnSend tp msg k)
cont ReceiveMessage k = return (OnRecv k)
cont (SendShutdown pid) k = return (OnSendShutdown pid k)
type LoggingAndIo =
'[ Logs String
, Lift IO
]
singleThreadedIoScheduler :: SchedulerProxy LoggingAndIo
singleThreadedIoScheduler = SchedulerProxy
defaultMain
:: HasCallStack
=> Eff '[Process '[Logs String, Lift IO], Logs String, Lift IO] ()
-> IO ()
defaultMain go = runLift $ handleLogsWith (schedule go) ($ putStrLn)
_example :: IO ()
_example = defaultMain go
where
px :: SchedulerProxy '[Logs String, Lift IO]
px = SchedulerProxy
go :: Eff (ConsProcess '[Logs String, Lift IO]) ()
go = do
p1 <- spawn $ do
logMsg "Hello World!"
me <- self px
logMsg ("I am: " ++ show me)
d <- receiveMessage px
let m = fromDyn @String d ""
logMsg ("Got: " ++ m)
void $ sendMessage px p1 (toDyn "huhu")
return ()