module Control.Eff.Concurrent.Api.Server2
(
spawnApiServer
, spawnApiServerStateful
, spawnApiServerEffectful
, CallbackResult(..)
, MessageCallback(..)
, handleCasts
, handleCalls
, handleCastsAndCalls
, handleMessages
, handleSelectedMessages
, handleAnyMessages
, handleProcessDowns
, dropUnhandledMessages
, exitOnUnhandled
, logUnhandledMessages
, (^:)
, fallbackHandler
, ToServerPids(..)
, InterruptCallback(..)
, stopServerOnInterrupt
)
where
import Control.Eff
import Control.Eff.Extend
import Control.Eff.Log
import Control.Eff.State.Lazy
import Control.Eff.Concurrent.Api
import Control.Eff.Concurrent.Process
import Control.Monad ( (>=>) )
import Data.Proxy
import Data.Dynamic
import Control.Applicative
import GHC.Stack
import GHC.Generics
import Control.DeepSeq
import Data.Kind
import Data.Default
spawnApiServer
:: forall api eff
. (ToServerPids api, HasCallStack)
=> MessageCallback api (InterruptableProcess eff)
-> InterruptCallback (ConsProcess eff)
-> Eff (InterruptableProcess eff) (ServerPids api)
spawnApiServer (MessageCallback sel cb) (InterruptCallback intCb) =
toServerPids (Proxy @api)
<$> (spawnRaw $ receiveSelectedLoop
(SP @eff)
sel
( either (fmap Left . intCb) (fmap Right . provideInterrupts . cb)
>=> handleCallbackResult
)
)
where
handleCallbackResult
:: Either CallbackResult (Either InterruptReason CallbackResult)
-> Eff (ConsProcess eff) (Maybe ())
handleCallbackResult (Left HandleNext) = return Nothing
handleCallbackResult (Left (StopServer r)) = exitBecause SP (NotRecovered r)
handleCallbackResult (Right (Right HandleNext)) = return Nothing
handleCallbackResult (Right (Right (StopServer r))) =
intCb r >>= handleCallbackResult . Left
handleCallbackResult (Right (Left r)) =
intCb r >>= handleCallbackResult . Left
spawnApiServerStateful
:: forall api eff state
. (HasCallStack)
=> Eff (InterruptableProcess eff) state
-> MessageCallback api (State state ': InterruptableProcess eff)
-> InterruptCallback (State state ': ConsProcess eff)
-> Eff (InterruptableProcess eff) (Server api)
spawnApiServerStateful initEffect (MessageCallback sel cb) (InterruptCallback intCb)
= fmap asServer $ spawnRaw $ do
state <- provideInterruptsShutdown initEffect
evalState state $ receiveSelectedLoop (SP @eff) sel $ \msg -> case msg of
Left m -> invokeIntCb m
Right m -> do
s <- get
r <- raise (provideInterrupts (evalState s (cb m)))
case r of
Left i -> invokeIntCb i
Right (StopServer i) -> invokeIntCb i
Right HandleNext -> return Nothing
where
invokeIntCb j = do
l <- intCb j
case l of
HandleNext -> return Nothing
StopServer ProcessFinished -> return (Just ())
StopServer k -> exitBecause SP (NotRecovered k)
spawnApiServerEffectful
:: forall api eff serverEff
. ( HasCallStack
, Member Interrupts serverEff
, SetMember Process (Process eff) serverEff
)
=> (forall b . Eff serverEff b -> Eff (InterruptableProcess eff) b)
-> MessageCallback api serverEff
-> InterruptCallback serverEff
-> Eff (InterruptableProcess eff) (Server api)
spawnApiServerEffectful handleServerInteralEffects scb (InterruptCallback intCb)
= asServer <$> (spawn (handleServerInteralEffects (go scb)))
where
go (MessageCallback sel cb) = receiveSelectedLoop
(SP @eff)
sel
( either (fmap Left . intCb) (fmap Right . tryUninterrupted . cb)
>=> handleCallbackResult
)
where
handleCallbackResult
:: Either CallbackResult (Either InterruptReason CallbackResult)
-> Eff serverEff (Maybe ())
handleCallbackResult (Left HandleNext) = return Nothing
handleCallbackResult (Left (StopServer r)) =
exitBecause SP (NotRecovered r)
handleCallbackResult (Right (Right HandleNext)) = return Nothing
handleCallbackResult (Right (Right (StopServer r))) =
intCb r >>= handleCallbackResult . Left
handleCallbackResult (Right (Left r)) =
intCb r >>= handleCallbackResult . Left
data CallbackResult where
HandleNext :: CallbackResult
StopServer :: InterruptReason -> CallbackResult
deriving (Show, Typeable, Generic)
instance NFData CallbackResult
data MessageCallback api eff where
MessageCallback :: MessageSelector a -> (a -> Eff eff CallbackResult) -> MessageCallback api eff
instance Semigroup (MessageCallback api eff) where
(MessageCallback selL runL) <> (MessageCallback selR runR) =
MessageCallback (Left <$> selL <|> Right <$> selR) (either runL runR)
instance Monoid (MessageCallback api eff) where
mappend = (<>)
mempty = MessageCallback selectAnyMessageLazy (const (pure HandleNext))
instance Default (MessageCallback api eff) where
def = mempty
handleMessages
:: forall eff a
. (HasCallStack, NFData a, Typeable a)
=> (a -> Eff eff CallbackResult)
-> MessageCallback '[] eff
handleMessages = MessageCallback selectMessage
handleSelectedMessages
:: forall eff a
. HasCallStack
=> MessageSelector a
-> (a -> Eff eff CallbackResult)
-> MessageCallback '[] eff
handleSelectedMessages = MessageCallback
handleAnyMessages
:: forall eff
. HasCallStack
=> (Dynamic -> Eff eff CallbackResult)
-> MessageCallback '[] eff
handleAnyMessages = MessageCallback selectAnyMessageLazy
handleCasts
:: forall api eff
. ( HasCallStack
, NFData (Api api 'Asynchronous)
, Typeable (Api api 'Asynchronous)
)
=> (Api api 'Asynchronous -> Eff eff CallbackResult)
-> MessageCallback api eff
handleCasts = MessageCallback selectMessage
handleCalls
:: forall api eff reply
. ( HasCallStack
, NFData (Api api ( 'Synchronous reply))
, Typeable (Api api ( 'Synchronous reply))
)
=> (Api api ( 'Synchronous reply) -> Eff eff CallbackResult)
-> MessageCallback api eff
handleCalls = MessageCallback selectMessage
handleCastsAndCalls
:: forall api eff reply
. ( HasCallStack
, NFData (Api api ( 'Synchronous reply))
, Typeable (Api api ( 'Synchronous reply))
, NFData (Api api 'Asynchronous)
, Typeable (Api api 'Asynchronous)
)
=> (Api api 'Asynchronous -> Eff eff CallbackResult)
-> (Api api ( 'Synchronous reply) -> Eff eff CallbackResult)
-> MessageCallback api eff
handleCastsAndCalls onCast onCall = handleCalls onCall <> handleCasts onCast
handleProcessDowns
:: forall eff
. HasCallStack
=> (MonitorReference -> Eff eff CallbackResult)
-> MessageCallback '[] eff
handleProcessDowns k = MessageCallback selectMessage (k . downReference)
(^:)
:: forall (api1 :: Type) (apis2 :: [Type]) eff
. HasCallStack
=> MessageCallback api1 eff
-> MessageCallback apis2 eff
-> MessageCallback (api1 ': apis2) eff
(MessageCallback selL runL) ^: (MessageCallback selR runR) =
MessageCallback (Left <$> selL <|> Right <$> selR) (either runL runR)
infixr 5 ^:
fallbackHandler
:: forall api eff
. HasCallStack
=> MessageCallback api eff
-> MessageCallback '[] eff
fallbackHandler (MessageCallback s r) = MessageCallback s r
dropUnhandledMessages :: forall eff . HasCallStack => MessageCallback '[] eff
dropUnhandledMessages =
MessageCallback selectAnyMessageLazy (const (return HandleNext))
exitOnUnhandled :: forall eff . HasCallStack => MessageCallback '[] eff
exitOnUnhandled = MessageCallback selectAnyMessageLazy $ \msg ->
return (StopServer (ProcessError ("unhandled message " ++ show msg)))
logUnhandledMessages
:: forall eff
. (Member (Logs LogMessage) eff, HasCallStack)
=> MessageCallback '[] eff
logUnhandledMessages = MessageCallback selectAnyMessageLazy $ \msg -> do
logWarning ("ignoring unhandled message " ++ show msg)
return HandleNext
class ToServerPids (t :: k) where
type ServerPids t
toServerPids :: proxy t -> ProcessId -> ServerPids t
instance ToServerPids '[] where
type ServerPids '[] = ProcessId
toServerPids _ = id
instance
forall (api1 :: Type) (api2 :: [Type])
. (ToServerPids api1, ToServerPids api2)
=> ToServerPids (api1 ': api2) where
type ServerPids (api1 ': api2) = (ServerPids api1, ServerPids api2)
toServerPids _ p =
(toServerPids (Proxy @api1) p, toServerPids (Proxy @api2) p)
instance
forall (api1 :: Type)
. (ToServerPids api1)
=> ToServerPids api1 where
type ServerPids api1 = Server api1
toServerPids _ = asServer
data InterruptCallback eff where
InterruptCallback ::
(InterruptReason -> Eff eff CallbackResult) -> InterruptCallback eff
instance Default (InterruptCallback eff) where
def = stopServerOnInterrupt
stopServerOnInterrupt :: forall eff . HasCallStack => InterruptCallback eff
stopServerOnInterrupt = InterruptCallback (pure . StopServer)