module Control.Eff.Concurrent.Api.Observer
( Observer(..)
, Api(RegisterObserver, ForgetObserver, Observed)
, registerObserver
, forgetObserver
, handleObservations
, toObserver
, toObserverFor
, ObserverRegistry
, ObserverState
, handleObserverRegistration
, manageObservers
, observed
)
where
import Control.Eff
import Control.Eff.Concurrent.Process
import Control.Eff.Concurrent.Api
import Control.Eff.Concurrent.Api.Client
import Control.Eff.Concurrent.Api.Server
import Control.Eff.State.Strict
import Control.Eff.Log
import Control.Lens
import Data.Data (typeOf)
import Data.Dynamic
import Data.Foldable
import Data.Proxy
import Data.Set ( Set )
import qualified Data.Set as Set
import Data.Text ( pack )
import Data.Typeable ( typeRep )
import GHC.Stack
import Control.DeepSeq (NFData(rnf))
data Observer o where
Observer
:: (Show (Server p), Typeable p, Typeable o, NFData o, NFData (Api p 'Asynchronous))
=> (o -> Maybe (Api p 'Asynchronous)) -> Server p -> Observer o
instance (NFData o) => NFData (Observer o) where
rnf (Observer k s) = rnf k `seq` rnf s
instance Show (Observer o) where
showsPrec d (Observer _ p) = showParen
(d >= 10)
(shows (typeRep (Proxy :: Proxy o)) . showString " observer: " . shows p)
instance Ord (Observer o) where
compare (Observer _ s1) (Observer _ s2) =
compare (s1 ^. fromServer) (s2 ^. fromServer)
instance Eq (Observer o) where
(==) (Observer _ s1) (Observer _ s2) =
(==) (s1 ^. fromServer) (s2 ^. fromServer)
registerObserver
:: ( SetMember Process (Process q) r
, HasCallStack
, Member Interrupts r
, Typeable o
, NFData o
)
=> Observer o
-> Server (ObserverRegistry o)
-> Eff r ()
registerObserver observer observerRegistry =
cast observerRegistry (RegisterObserver observer)
forgetObserver
:: ( SetMember Process (Process q) r
, HasCallStack
, Member Interrupts r
, Typeable o
, NFData o
)
=> Observer o
-> Server (ObserverRegistry o)
-> Eff r ()
forgetObserver observer observerRegistry =
cast observerRegistry (ForgetObserver observer)
data instance Api (Observer o) r where
Observed :: o -> Api (Observer o) 'Asynchronous
deriving Typeable
instance NFData o => NFData (Api (Observer o) 'Asynchronous) where
rnf (Observed o) = rnf o
handleObservations
:: (HasCallStack, Typeable o, SetMember Process (Process q) r, NFData (Observer o))
=> (o -> Eff r CallbackResult)
-> MessageCallback (Observer o) r
handleObservations k = handleCasts
(\case
Observed o -> k o
)
toObserver :: (NFData o, Typeable o, NFData (Api (Observer o) 'Asynchronous)) => Server (Observer o) -> Observer o
toObserver = toObserverFor Observed
toObserverFor
:: (Typeable a, NFData (Api a 'Asynchronous), Typeable o, NFData o)
=> (o -> Api a 'Asynchronous)
-> Server a
-> Observer o
toObserverFor wrapper = Observer (Just . wrapper)
data ObserverRegistry o
data instance Api (ObserverRegistry o) r where
RegisterObserver :: NFData o => Observer o -> Api (ObserverRegistry o) 'Asynchronous
ForgetObserver :: NFData o => Observer o -> Api (ObserverRegistry o) 'Asynchronous
deriving Typeable
instance NFData (Api (ObserverRegistry o) r) where
rnf (RegisterObserver o) = rnf o
rnf (ForgetObserver o) = rnf o
handleObserverRegistration
:: forall o q r
. ( HasCallStack
, Typeable o
, SetMember Process (Process q) r
, Member (ObserverState o) r
, Member Logs r
)
=> MessageCallback (ObserverRegistry o) r
handleObserverRegistration = handleCasts
(\case
RegisterObserver ob -> do
os <- get @(Observers o)
logDebug ("registering "
<> pack (show (typeOf ob))
<> " current number of observers: "
<> pack (show (Set.size (view observers os))))
put (over observers (Set.insert ob)os)
pure AwaitNext
ForgetObserver ob -> do
os <- get @(Observers o)
logDebug ("forgetting "
<> pack (show (typeOf ob))
<> " current number of observers: "
<> pack (show (Set.size (view observers os))))
put (over observers (Set.delete ob) os)
pure AwaitNext
)
manageObservers :: Eff (ObserverState o ': r) a -> Eff r a
manageObservers = evalState (Observers Set.empty)
newtype Observers o =
Observers { _observers :: Set (Observer o) }
type ObserverState o = State (Observers o)
observers :: Iso' (Observers o) (Set (Observer o))
observers = iso _observers Observers
observed
:: forall o r q
. ( SetMember Process (Process q) r
, Member (ObserverState o) r
, Member Interrupts r
)
=> o
-> Eff r ()
observed observation = do
os <- view observers <$> get
mapM_ notifySomeObserver os
where
notifySomeObserver (Observer messageFilter receiver) =
traverse_ (cast receiver) (messageFilter observation)