-- | Observer Effects -- -- This module supports the implementation of observers and observables. One -- more concrete perspective might be to understand observers as event listeners -- and observables as event sources. The tools in this module are tailored -- towards 'Control.Eff.Concurrent.Api.Api' endpoints {-# LANGUAGE IncoherentInstances #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE KindSignatures #-} {-# LANGUAGE DataKinds #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE DeriveFunctor #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeOperators #-} {-# LANGUAGE TypeApplications #-} {-# LANGUAGE GADTs #-} module Control.Eff.Concurrent.Observer ( Observer(..) , Observable(..) , notifyObserver , registerObserver , forgetObserver , SomeObserver(..) , notifySomeObserver , Observers() , manageObservers , addObserver , removeObserver , notifyObservers , CallbackObserver , spawnCallbackObserver ) where import GHC.Stack import Data.Dynamic import Data.Set (Set) import qualified Data.Set as Set import Control.Eff import Control.Eff.Concurrent.MessagePassing import Control.Eff.Concurrent.Api import Control.Eff.Concurrent.Api.Client import Control.Eff.Concurrent.Api.Server import Control.Eff.Concurrent.Dispatcher import Control.Eff.Log import Control.Eff.State.Lazy import Control.Lens import Control.Monad -- * Observation API -- | An 'Api' index that support observation of the -- another 'Api' that is 'Observable'. class (Typeable p, Observable o) => Observer p o where -- | Wrap the 'Observation' and the 'ProcessId' (i.e. the 'Server') -- that caused the observation into a 'Api' value that the -- 'Observable' understands. observationMessage :: Server o -> Observation o -> Api p 'Asynchronous -- | An 'Api' index that supports registration and de-registration of -- 'Observer's. class (Typeable o, Typeable (Observation o)) => Observable o where -- | Type of observations visible on this observable data Observation o -- | Return the 'Api' value for the 'cast_' that registeres an observer registerObserverMessage :: SomeObserver o -> Api o 'Asynchronous -- | Return the 'Api' value for the 'cast_' that de-registeres an observer forgetObserverMessage :: SomeObserver o -> Api o 'Asynchronous -- | Send an 'Observation' to an 'Observer' notifyObserver :: ( Member Process r , Member MessagePassing r , Observable o , Observer p o , HasCallStack ) => Server p -> Server o -> Observation o -> Eff r () notifyObserver observer observed observation = cast observer (observationMessage observed observation) -- | Send the 'registerObserverMessage' registerObserver :: ( Member Process r , Member MessagePassing r , Observable o , Observer p o , HasCallStack ) => Server p -> Server o -> Eff r () registerObserver observer observed = cast observed (registerObserverMessage (SomeObserver observer)) -- | Send the 'forgetObserverMessage' forgetObserver :: ( Member Process r , Member MessagePassing r , Observable o , Observer p o) => Server p -> Server o -> Eff r () forgetObserver observer observed = cast observed (forgetObserverMessage (SomeObserver observer)) -- ** Generalized observation -- | An existential wrapper around a 'Server' of an 'Observer'. -- Needed to support different types of observers to observe the -- same 'Observable' in a general fashion. data SomeObserver o where SomeObserver :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o deriving instance Show (SomeObserver o) instance Ord (SomeObserver o) where compare (SomeObserver (Server o1)) (SomeObserver (Server o2)) = compare o1 o2 instance Eq (SomeObserver o) where (==) (SomeObserver (Server o1)) (SomeObserver (Server o2)) = o1 == o2 -- | Send an 'Observation' to 'SomeObserver'. notifySomeObserver :: ( Member Process r , Member MessagePassing r , Observable o , HasCallStack ) => Server o -> Observation o -> SomeObserver o -> Eff r () notifySomeObserver observed observation (SomeObserver observer) = notifyObserver observer observed observation -- ** Manage 'Observers's -- | Internal state for 'manageobservers' data Observers o = Observers { _observers :: Set (SomeObserver o) } observers :: Iso' (Observers o) (Set (SomeObserver o)) observers = iso _observers Observers -- | Keep track of registered 'Observer's Observers can be added and removed, -- and an 'Observation' can be sent to all registerd observers at once. manageObservers :: Eff (State (Observers o) ': r) a -> Eff r a manageObservers = flip evalState (Observers Set.empty) -- | Add an 'Observer' to the 'Observers' managed by 'manageObservers'. addObserver :: ( Member MessagePassing r , Member (State (Observers o)) r , Observable o) => SomeObserver o -> Eff r () addObserver = modify . over observers . Set.insert -- | Delete an 'Observer' from the 'Observers' managed by 'manageObservers'. removeObserver :: ( Member MessagePassing r , Member (State (Observers o)) r , Observable o) => SomeObserver o -> Eff r () removeObserver = modify . over observers . Set.delete -- | Send an 'Observation' to all 'SomeObserver's in the 'Observers' state. notifyObservers :: forall o r . ( Observable o , Member MessagePassing r , Member Process r , Member (State (Observers o)) r) => Observation o -> Eff r () notifyObservers observation = do me <- asServer @o <$> self os <- view observers <$> get mapM_ (notifySomeObserver me observation) os -- * Callback 'Observer' -- | An 'Observer' that dispatches the observations to an effectful callback. data CallbackObserver o deriving Typeable data instance Api (CallbackObserver o) r where CbObserved :: (Typeable o, Typeable (Observation o)) => Server o -> Observation o -> Api (CallbackObserver o) 'Asynchronous deriving Typeable deriving instance Show (Observation o) => Show (Api (CallbackObserver o) r) instance (Observable o) => Observer (CallbackObserver o) o where observationMessage = CbObserved -- | Start a new process for an 'Observer' that dispatches -- all observations to an effectful callback. spawnCallbackObserver :: forall o r . (HasDispatcherIO r, Typeable o, Show (Observation o), Observable o) => (Server o -> Observation o -> Eff ProcIO Bool) -> Eff r (Server (CallbackObserver o)) spawnCallbackObserver onObserve = asServer @(CallbackObserver o) <$> (spawn $ do trapExit True me <- asServer @(CallbackObserver o) <$> self let loopUntil = serve_ (ApiHandler @(CallbackObserver o) (handleCast loopUntil) unhandledCallError (logMsg . ((show me ++ " observer terminating ") ++))) loopUntil ) where handleCast :: Eff ProcIO () -> Api (CallbackObserver o) 'Asynchronous -> Eff ProcIO () handleCast k (CbObserved fromSvr v) = onObserve fromSvr v >>= flip when k