{-# LANGUAGE DeriveTraversable #-}
{-# LANGUAGE DeriveFoldable #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE GADTs #-}
module Control.Eff.Concurrent.Process
( ProcessId(..)
, fromProcessId
, Process(..)
, ConsProcess
, ResumeProcess(..)
, SchedulerProxy(..)
, thisSchedulerProxy
, yieldAndCatchProcess
, sendMessage
, spawn
, receiveMessage
, receiveMessageAs
, receiveLoop
, self
, sendShutdown
, exitWithError
, exitNormally
, raiseError
, catchRaisedError
, ignoreProcessError
)
where
import GHC.Stack
import Control.Eff
import Control.Lens
import Control.Monad (forever)
import Data.Dynamic
import Data.Kind
import Text.Printf
data Process (r :: [Type -> Type]) b where
SelfPid :: Process r (ResumeProcess ProcessId)
Spawn :: Eff (Process r ': r) () -> Process r (ResumeProcess ProcessId)
Shutdown :: Process r a
ExitWithError :: String -> Process r b
RaiseError :: String -> Process r b
SendShutdown :: ProcessId -> Process r (ResumeProcess Bool)
SendMessage :: ProcessId -> Dynamic -> Process r (ResumeProcess Bool)
ReceiveMessage :: Process r (ResumeProcess Dynamic)
data ResumeProcess v where
ShutdownRequested :: ResumeProcess v
OnError :: String -> ResumeProcess v
ResumeWith :: a -> ResumeProcess a
RetryLastAction :: ResumeProcess v
deriving (Typeable, Foldable, Functor, Show, Eq, Ord, Traversable)
type ConsProcess r = Process r ': r
yieldProcess :: forall r q v .
( SetMember Process (Process q) r
, HasCallStack)
=> Process q (ResumeProcess v)
-> Eff r v
yieldProcess processAction =
do result <- send processAction
case result of
ResumeWith value -> return value
RetryLastAction -> yieldProcess processAction
ShutdownRequested -> send (Shutdown @q)
OnError e -> send (ExitWithError @q e)
yieldAndCatchProcess :: forall q r v.
(SetMember Process (Process q) r, HasCallStack)
=> SchedulerProxy q
-> Eff r (ResumeProcess v)
-> Eff r (Either String v)
yieldAndCatchProcess px processAction =
do result <- processAction
case result of
ResumeWith value -> return (Right value)
RetryLastAction -> yieldAndCatchProcess px processAction
ShutdownRequested -> send (Shutdown @q)
OnError e -> return (Left e)
data SchedulerProxy :: [Type -> Type] -> Type where
SchedulerProxy :: SchedulerProxy q
thisSchedulerProxy :: Eff (Process r ': r) (SchedulerProxy r)
thisSchedulerProxy = return SchedulerProxy
sendMessage
:: forall r q
. (HasCallStack, SetMember Process (Process q) r)
=> SchedulerProxy q
-> ProcessId
-> Dynamic
-> Eff r Bool
sendMessage _ pid message =
yieldProcess (SendMessage pid message)
sendShutdown
:: forall r q
. (HasCallStack, SetMember Process (Process q) r)
=> SchedulerProxy q
-> ProcessId
-> Eff r Bool
sendShutdown _ pid =
yieldProcess (SendShutdown pid)
spawn :: forall r q .
(HasCallStack, SetMember Process (Process q) r)
=> Eff (Process q ': q) ()
-> Eff r ProcessId
spawn child =
yieldProcess (Spawn @q child)
receiveMessage
:: forall r q
. (HasCallStack, SetMember Process (Process q) r)
=> SchedulerProxy q -> Eff r Dynamic
receiveMessage _ = do
yieldProcess ReceiveMessage
receiveMessageAs
:: forall a r q .
(HasCallStack, Typeable a, SetMember Process (Process q) r)
=> SchedulerProxy q -> Eff r a
receiveMessageAs px =
do messageDynamic <- receiveMessage px
let castAndCheck dm =
case fromDynamic dm of
Nothing ->
Left ("Invalid message type received: " ++ show dm)
Just m ->
Right m
maybeMessage = castAndCheck messageDynamic
either (raiseError px) return maybeMessage
receiveLoop
:: forall r q
. ( SetMember Process (Process q) r
, HasCallStack)
=> SchedulerProxy q
-> (Either (Maybe String) Dynamic -> Eff r ())
-> Eff r ()
receiveLoop _ handlers =
forever $
do mReq <- send (ReceiveMessage @q)
case mReq of
RetryLastAction ->
return ()
ShutdownRequested ->
handlers (Left Nothing)
OnError reason ->
handlers (Left (Just reason))
ResumeWith message ->
handlers (Right message)
self :: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r ProcessId
self _px = yieldProcess SelfPid
exitNormally :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a
exitNormally _ = send (Shutdown @q)
exitWithError :: forall r q a. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r a
exitWithError _ = send . ExitWithError @q
raiseError :: forall r q b. (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> String -> Eff r b
raiseError _ = send . RaiseError @q
catchRaisedError
:: forall r q w . (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> (String -> Eff r w) -> Eff r w -> Eff r w
catchRaisedError _ onErr = interpose return go
where
go :: forall b . Process q b -> (b -> Eff r w) -> Eff r w
go (RaiseError emsg) _k = onErr emsg
go s k = send s >>= k
ignoreProcessError
:: (HasCallStack, SetMember Process (Process q) r) => SchedulerProxy q -> Eff r a -> Eff r (Either String a)
ignoreProcessError px = catchRaisedError px (return . Left) . fmap Right
newtype ProcessId = ProcessId { _fromProcessId :: Int }
deriving (Eq,Ord,Typeable,Bounded,Num, Enum, Integral, Real)
instance Read ProcessId where
readsPrec _ ('<':'0':'.':rest1) =
case reads rest1 of
[(c, '.':'0':'>':rest2)] -> [(ProcessId c, rest2)]
_ -> []
readsPrec _ _ = []
instance Show ProcessId where
show (ProcessId c) =
printf "<0.%d.0>" c
makeLenses ''ProcessId