module Control.Eff.Concurrent.Process.Interactive
( SchedulerSession()
, forkInteractiveScheduler
, killInteractiveScheduler
, submit
, submitCast
, submitCall
)
where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Eff
import Control.Eff.Concurrent.Protocol
import Control.Eff.Concurrent.Protocol.Client
import Control.Eff.Concurrent.Process
import Control.Monad
import Data.Foldable
import System.Timeout
newtype SchedulerSession r = SchedulerSession (TMVar (SchedulerQueue r))
newtype SchedulerQueue r =
SchedulerQueue (TChan (Eff (Processes r) (Maybe String)))
forkInteractiveScheduler
:: forall r
. (SetMember Lift (Lift IO) r)
=> (Eff (Processes r) () -> IO ())
-> IO (SchedulerSession r)
forkInteractiveScheduler ioScheduler = do
inQueue <- newTChanIO
queueVar <- newEmptyTMVarIO
void $ forkIO
(do
ioScheduler
(do
lift (atomically (putTMVar queueVar (SchedulerQueue inQueue)))
readEvalPrintLoop queueVar
)
atomically (void (takeTMVar queueVar))
)
return (SchedulerSession queueVar)
where
readEvalPrintLoop
:: TMVar (SchedulerQueue r) -> Eff (Processes r) ()
readEvalPrintLoop queueVar = do
nextActionOrExit <- readAction
case nextActionOrExit of
Left True -> return ()
Left False -> readEvalPrintLoop queueVar
Right nextAction -> do
res <- nextAction
traverse_ (lift . putStrLn . (">>> " ++)) res
yieldProcess
readEvalPrintLoop queueVar
where
readAction = lift $ atomically $ do
mInQueue <- tryReadTMVar queueVar
case mInQueue of
Nothing -> return (Left True)
Just (SchedulerQueue inQueue) -> do
mNextAction <- tryReadTChan inQueue
case mNextAction of
Nothing -> return (Left False)
Just nextAction -> return (Right nextAction)
killInteractiveScheduler :: SchedulerSession r -> IO ()
killInteractiveScheduler (SchedulerSession qVar) =
atomically (void (tryTakeTMVar qVar))
submit
:: forall r a
. (SetMember Lift (Lift IO) r)
=> SchedulerSession r
-> Eff (Processes r) a
-> IO a
submit (SchedulerSession qVar) theAction = do
mResVar <- timeout 5000000 $ atomically
(do
SchedulerQueue inQueue <- readTMVar qVar
resVar <- newEmptyTMVar
writeTChan inQueue (runAndPutResult resVar)
return resVar
)
case mResVar of
Just resVar -> atomically (takeTMVar resVar)
Nothing -> fail "ERROR: No Scheduler"
where
runAndPutResult resVar = do
res <- theAction
lift (atomically (putTMVar resVar $! res))
return Nothing
submitCast
:: forall o r
. ( SetMember Lift (Lift IO) r
, HasPdu o
, Tangible (Pdu o 'Asynchronous)
, Member Interrupts r)
=> SchedulerSession r
-> Endpoint o
-> Pdu o 'Asynchronous
-> IO ()
submitCast sc svr request = submit sc (cast svr request)
submitCall
:: forall o q r
. ( SetMember Lift (Lift IO) r
, Member Interrupts r
, Tangible (Pdu o ('Synchronous q))
, HasPdu o
, Tangible q
)
=> SchedulerSession r
-> Endpoint o
-> Pdu o ( 'Synchronous q)
-> IO q
submitCall sc svr request = submit sc (call svr request)