{-# LANGUAGE IncoherentInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE GADTs #-}
module Control.Eff.Concurrent.Api.Observer
( Observer(..)
, Observable(..)
, notifyObserver
, registerObserver
, forgetObserver
, SomeObserver(..)
, notifySomeObserver
, Observers()
, manageObservers
, addObserver
, removeObserver
, notifyObservers
, CallbackObserver
, spawnCallbackObserver
) where
import GHC.Stack
import Data.Dynamic
import Data.Maybe
import Data.Set (Set)
import qualified Data.Set as Set
import Control.Eff
import Control.Eff.Concurrent.Process
import Control.Eff.Concurrent.Api
import Control.Eff.Concurrent.Api.Client
import Control.Eff.Concurrent.Api.Server
import Control.Eff.Log
import Control.Eff.State.Lazy
import Control.Lens
import Control.Monad
class (Typeable p, Observable o) => Observer p o where
observationMessage :: Server o -> Observation o -> Api p 'Asynchronous
class (Typeable o, Typeable (Observation o)) => Observable o where
data Observation o
registerObserverMessage :: SomeObserver o -> Api o 'Asynchronous
forgetObserverMessage :: SomeObserver o -> Api o 'Asynchronous
notifyObserver :: ( SetMember Process (Process q) r
, Observable o
, Observer p o
, HasCallStack
)
=> SchedulerProxy q -> Server p -> Server o -> Observation o -> Eff r ()
notifyObserver px observer observed observation =
cast px observer (observationMessage observed observation)
registerObserver :: ( SetMember Process (Process q) r
, Observable o
, Observer p o
, HasCallStack
)
=> SchedulerProxy q -> Server p -> Server o -> Eff r ()
registerObserver px observer observed =
cast px observed (registerObserverMessage (SomeObserver observer))
forgetObserver :: ( SetMember Process (Process q) r
, Observable o
, Observer p o)
=> SchedulerProxy q -> Server p -> Server o -> Eff r ()
forgetObserver px observer observed =
cast px observed (forgetObserverMessage (SomeObserver observer))
data SomeObserver o where
SomeObserver :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o
deriving instance Show (SomeObserver o)
instance Ord (SomeObserver o) where
compare (SomeObserver (Server o1)) (SomeObserver (Server o2)) =
compare o1 o2
instance Eq (SomeObserver o) where
(==) (SomeObserver (Server o1)) (SomeObserver (Server o2)) =
o1 == o2
notifySomeObserver :: ( SetMember Process (Process q) r
, Observable o
, HasCallStack
)
=> SchedulerProxy q
-> Server o
-> Observation o
-> SomeObserver o
-> Eff r ()
notifySomeObserver px observed observation (SomeObserver observer) =
notifyObserver px observer observed observation
data Observers o =
Observers { _observers :: Set (SomeObserver o) }
observers :: Iso' (Observers o) (Set (SomeObserver o))
observers = iso _observers Observers
manageObservers :: Eff (State (Observers o) ': r) a -> Eff r a
manageObservers = flip evalState (Observers Set.empty)
addObserver
:: ( SetMember Process (Process q) r
, Member (State (Observers o)) r
, Observable o)
=> SomeObserver o -> Eff r ()
addObserver = modify . over observers . Set.insert
removeObserver
:: ( SetMember Process (Process q) r
, Member (State (Observers o)) r
, Observable o)
=> SomeObserver o -> Eff r ()
removeObserver = modify . over observers . Set.delete
notifyObservers
:: forall o r q
. ( Observable o
, SetMember Process (Process q) r
, Member (State (Observers o)) r)
=> SchedulerProxy q -> Observation o -> Eff r ()
notifyObservers px observation = do
me <- asServer @o <$> self px
os <- view observers <$> get
mapM_ (notifySomeObserver px me observation) os
data CallbackObserver o
deriving Typeable
data instance Api (CallbackObserver o) r where
CbObserved :: (Typeable o, Typeable (Observation o)) =>
Server o -> Observation o -> Api (CallbackObserver o) 'Asynchronous
deriving Typeable
deriving instance Show (Observation o) => Show (Api (CallbackObserver o) r)
instance (Observable o) => Observer (CallbackObserver o) o where
observationMessage = CbObserved
spawnCallbackObserver
:: forall o r q .
( SetMember Process (Process q) r
, Typeable o
, Show (Observation o)
, Observable o
, Member (Logs String) q)
=> SchedulerProxy q
-> (Server o -> Observation o -> Eff (Process q ': q) Bool)
-> Eff r (Server (CallbackObserver o))
spawnCallbackObserver px onObserve =
asServer @(CallbackObserver o)
<$>
(spawn @r @q $ do
me <- asServer @(CallbackObserver o) <$> self px
let loopUntil =
serve px
(ApiHandler @(CallbackObserver o)
(handleCast loopUntil)
(unhandledCallError px)
(\e ->
do logMsg (show me ++ " observer terminating " ++ fromMaybe "normally" e)
defaultTermination px e))
loopUntil)
where
handleCast k (CbObserved fromSvr v) =
onObserve fromSvr v >>= flip when k