module Control.Eff.Concurrent.Api.Server
(
spawnApiServer
, spawnLinkApiServer
, spawnApiServerStateful
, spawnApiServerEffectful
, spawnLinkApiServerEffectful
, apiServerLoop
, CallbackResult(..)
, MessageCallback(..)
, handleCasts
, handleCalls
, handleCastsAndCalls
, handleCallsDeferred
, handleMessages
, handleSelectedMessages
, handleAnyMessages
, handleProcessDowns
, dropUnhandledMessages
, exitOnUnhandled
, logUnhandledMessages
, (^:)
, fallbackHandler
, ToServerPids(..)
, InterruptCallback(..)
, stopServerOnInterrupt
)
where
import Control.Eff
import Control.Eff.Extend
import Control.Eff.Log
import Control.Eff.State.Lazy
import Control.Eff.Concurrent.Api
import Control.Eff.Concurrent.Api.Request
import Control.Eff.Concurrent.Process
import Control.Monad ( (>=>) )
import Data.Proxy
import Data.Dynamic
import Control.Applicative
import GHC.Stack
import Control.DeepSeq
import Data.Kind
import Data.Foldable
import Data.Default
spawnApiServer
:: forall api eff
. (ToServerPids api, HasCallStack)
=> MessageCallback api (InterruptableProcess eff)
-> InterruptCallback (ConsProcess eff)
-> Eff (InterruptableProcess eff) (ServerPids api)
spawnApiServer scb (InterruptCallback icb) = toServerPids (Proxy @api)
<$> spawn (apiServerLoop scb (InterruptCallback (raise . icb)))
spawnLinkApiServer
:: forall api eff
. (ToServerPids api, HasCallStack)
=> MessageCallback api (InterruptableProcess eff)
-> InterruptCallback (ConsProcess eff)
-> Eff (InterruptableProcess eff) (ServerPids api)
spawnLinkApiServer scb (InterruptCallback icb) = toServerPids (Proxy @api)
<$> spawnLink (apiServerLoop scb (InterruptCallback (raise . icb)))
spawnApiServerStateful
:: forall api eff state
. (HasCallStack, ToServerPids api)
=> Eff (InterruptableProcess eff) state
-> MessageCallback api (State state ': InterruptableProcess eff)
-> InterruptCallback (State state ': ConsProcess eff)
-> Eff (InterruptableProcess eff) (ServerPids api)
spawnApiServerStateful initEffect (MessageCallback sel cb) (InterruptCallback intCb)
= fmap (toServerPids (Proxy @api)) $ spawnRaw $ do
state <- provideInterruptsShutdown initEffect
evalState state $ receiveSelectedLoop sel $ \msg -> case msg of
Left m -> invokeIntCb m
Right m -> do
s <- get
r <- raise (provideInterrupts (evalState s (cb m)))
case r of
Left i -> invokeIntCb i
Right (StopServer i) -> invokeIntCb i
Right AwaitNext -> return Nothing
where
invokeIntCb j = do
l <- intCb j
case l of
AwaitNext -> return Nothing
StopServer ProcessFinished -> return (Just ())
StopServer k -> exitBecause (NotRecovered k)
spawnApiServerEffectful
:: forall api eff serverEff
. ( HasCallStack
, ToServerPids api
, Member Interrupts serverEff
, SetMember Process (Process eff) serverEff
)
=> (forall b . Eff serverEff b -> Eff (InterruptableProcess eff) b)
-> MessageCallback api serverEff
-> InterruptCallback serverEff
-> Eff (InterruptableProcess eff) (ServerPids api)
spawnApiServerEffectful handleServerInternalEffects scb icb =
toServerPids (Proxy @api)
<$> spawn (handleServerInternalEffects (apiServerLoop scb icb))
spawnLinkApiServerEffectful
:: forall api eff serverEff
. ( HasCallStack
, ToServerPids api
, Member Interrupts serverEff
, SetMember Process (Process eff) serverEff
)
=> (forall b . Eff serverEff b -> Eff (InterruptableProcess eff) b)
-> MessageCallback api serverEff
-> InterruptCallback serverEff
-> Eff (InterruptableProcess eff) (ServerPids api)
spawnLinkApiServerEffectful handleServerInternalEffects scb icb =
toServerPids (Proxy @api)
<$> spawnLink (handleServerInternalEffects (apiServerLoop scb icb))
apiServerLoop
:: forall api eff serverEff
. ( HasCallStack
, ToServerPids api
, Member Interrupts serverEff
, SetMember Process (Process eff) serverEff
)
=> MessageCallback api serverEff
-> InterruptCallback serverEff
-> Eff serverEff ()
apiServerLoop (MessageCallback sel cb) (InterruptCallback intCb) =
receiveSelectedLoop
sel
( either (fmap Left . intCb) (fmap Right . tryUninterrupted . cb)
>=> handleCallbackResult
)
where
handleCallbackResult
:: Either CallbackResult (Either InterruptReason CallbackResult)
-> Eff serverEff (Maybe ())
handleCallbackResult (Left AwaitNext) = return Nothing
handleCallbackResult (Left (StopServer r)) = exitBecause (NotRecovered r)
handleCallbackResult (Right (Right AwaitNext)) = return Nothing
handleCallbackResult (Right (Right (StopServer r))) =
intCb r >>= handleCallbackResult . Left
handleCallbackResult (Right (Left r)) =
intCb r >>= handleCallbackResult . Left
data CallbackResult where
AwaitNext :: CallbackResult
StopServer :: InterruptReason -> CallbackResult
deriving ( Typeable )
data MessageCallback api eff where
MessageCallback :: MessageSelector a -> (a -> Eff eff CallbackResult) -> MessageCallback api eff
instance Semigroup (MessageCallback api eff) where
(MessageCallback selL runL) <> (MessageCallback selR runR) =
MessageCallback (Left <$> selL <|> Right <$> selR) (either runL runR)
instance Monoid (MessageCallback api eff) where
mappend = (<>)
mempty = MessageCallback selectAnyMessageLazy (const (pure AwaitNext))
instance Default (MessageCallback api eff) where
def = mempty
handleMessages
:: forall eff a
. (HasCallStack, NFData a, Typeable a)
=> (a -> Eff eff CallbackResult)
-> MessageCallback '[] eff
handleMessages = MessageCallback selectMessage
handleSelectedMessages
:: forall eff a
. HasCallStack
=> MessageSelector a
-> (a -> Eff eff CallbackResult)
-> MessageCallback '[] eff
handleSelectedMessages = MessageCallback
handleAnyMessages
:: forall eff
. HasCallStack
=> (Dynamic -> Eff eff CallbackResult)
-> MessageCallback '[] eff
handleAnyMessages = MessageCallback selectAnyMessageLazy
handleCasts
:: forall api eff
. (HasCallStack, Typeable api, Typeable (Api api 'Asynchronous))
=> (Api api 'Asynchronous -> Eff eff CallbackResult)
-> MessageCallback api eff
handleCasts h = MessageCallback
(selectMessageWithLazy
(\case
cr@(Cast _ :: Request api) -> Just cr
_callReq -> Nothing
)
)
(\(Cast req :: Request api) -> h req)
handleCalls
:: forall api eff effScheduler
. ( HasCallStack
, Typeable api
, SetMember Process (Process effScheduler) eff
, Member Interrupts eff
)
=> ( forall secret reply
. (Typeable reply, Typeable (Api api ( 'Synchronous reply)))
=> Api api ( 'Synchronous reply)
-> (Eff eff (Maybe reply, CallbackResult) -> secret)
-> secret
)
-> MessageCallback api eff
handleCalls h = MessageCallback
(selectMessageWithLazy
(\case
(Cast _ :: Request api) -> Nothing
callReq -> Just callReq
)
)
(\(Call callRef fromPid req :: Request api) -> h
req
(\resAction -> do
(mReply, cbResult) <- resAction
traverse_ (sendReply (mkRequestOrigin req fromPid callRef)) mReply
return cbResult
)
)
handleCastsAndCalls
:: forall api eff effScheduler
. ( HasCallStack
, Typeable api
, Typeable (Api api 'Asynchronous)
, SetMember Process (Process effScheduler) eff
, Member Interrupts eff
)
=> (Api api 'Asynchronous -> Eff eff CallbackResult)
-> ( forall secret reply
. (Typeable reply, Typeable (Api api ( 'Synchronous reply)))
=> Api api ( 'Synchronous reply)
-> (Eff eff (Maybe reply, CallbackResult) -> secret)
-> secret
)
-> MessageCallback api eff
handleCastsAndCalls onCast onCall = handleCalls onCall <> handleCasts onCast
handleCallsDeferred
:: forall api eff effScheduler
. ( HasCallStack
, Typeable api
, SetMember Process (Process effScheduler) eff
, Member Interrupts eff
)
=> ( forall reply
. (Typeable reply, Typeable (Api api ( 'Synchronous reply)))
=> RequestOrigin (Api api ( 'Synchronous reply))
-> Api api ( 'Synchronous reply)
-> Eff eff CallbackResult
)
-> MessageCallback api eff
handleCallsDeferred h = MessageCallback
(selectMessageWithLazy
(\case
(Cast _ :: Request api) -> Nothing
callReq -> Just callReq
)
)
(\(Call callRef fromPid req :: Request api) ->
h (RequestOrigin fromPid callRef) req
)
type family ResponseType request where
ResponseType (Api a ('Synchronous r)) = r
handleProcessDowns
:: forall eff
. HasCallStack
=> (MonitorReference -> Eff eff CallbackResult)
-> MessageCallback '[] eff
handleProcessDowns k = MessageCallback selectMessage (k . downReference)
(^:)
:: forall (api1 :: Type) (apis2 :: [Type]) eff
. HasCallStack
=> MessageCallback api1 eff
-> MessageCallback apis2 eff
-> MessageCallback (api1 ': apis2) eff
(MessageCallback selL runL) ^: (MessageCallback selR runR) =
MessageCallback (Left <$> selL <|> Right <$> selR) (either runL runR)
infixr 5 ^:
fallbackHandler
:: forall api eff
. HasCallStack
=> MessageCallback api eff
-> MessageCallback '[] eff
fallbackHandler (MessageCallback s r) = MessageCallback s r
dropUnhandledMessages :: forall eff . HasCallStack => MessageCallback '[] eff
dropUnhandledMessages =
MessageCallback selectAnyMessageLazy (const (return AwaitNext))
exitOnUnhandled :: forall eff . HasCallStack => MessageCallback '[] eff
exitOnUnhandled = MessageCallback selectAnyMessageLazy $ \msg ->
return (StopServer (ProcessError ("unhandled message " ++ show msg)))
logUnhandledMessages
:: forall eff
. (Member Logs eff, HasCallStack)
=> MessageCallback '[] eff
logUnhandledMessages = MessageCallback selectAnyMessageLazy $ \msg -> do
logWarning ("ignoring unhandled message " ++ show msg)
return AwaitNext
class ToServerPids (t :: k) where
type ServerPids t
toServerPids :: proxy t -> ProcessId -> ServerPids t
instance ToServerPids '[] where
type ServerPids '[] = ProcessId
toServerPids _ = id
instance
forall (api1 :: Type) (api2 :: [Type])
. (ToServerPids api1, ToServerPids api2)
=> ToServerPids (api1 ': api2) where
type ServerPids (api1 ': api2) = (ServerPids api1, ServerPids api2)
toServerPids _ p =
(toServerPids (Proxy @api1) p, toServerPids (Proxy @api2) p)
instance
forall (api1 :: Type)
. (ToServerPids api1)
=> ToServerPids api1 where
type ServerPids api1 = Server api1
toServerPids _ = asServer
data InterruptCallback eff where
InterruptCallback ::
(InterruptReason -> Eff eff CallbackResult) -> InterruptCallback eff
instance Default (InterruptCallback eff) where
def = stopServerOnInterrupt
stopServerOnInterrupt :: forall eff . HasCallStack => InterruptCallback eff
stopServerOnInterrupt = InterruptCallback (pure . StopServer)