{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
module Database.EventStore.Internal.Control
(
EventStore
, runEventStore
, getSettings
, freshUUID
, Pub(..)
, Sub(..)
, Hub
, Subscribe
, Publish
, asPub
, asSub
, asHub
, Bus
, newBus
, busStop
, Message
, toMsg
, fromMsg
, busProcessedEverything
, publish
, publishWith
, subscribe
, stopBus
, publisher
, monitorIncrPkgCount
, monitorIncrConnectionDrop
, monitorAddDataTransmitted
, monitorIncrForceReconnect
, monitorIncrHeartbeatTimeouts
, module Database.EventStore.Internal.Settings
) where
#if __GLASGOW_HASKELL__ > 710
import Control.Monad.Fail
#endif
import Data.Typeable
#if __GLASGOW_HASKELL__ < 802
import Data.Typeable.Internal
#else
import GHC.Fingerprint
#endif
import Control.Monad.Reader
import Data.UUID
import Data.UUID.V4
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings
data Env =
Env { Env -> LoggerRef
__logRef :: !LoggerRef
, Env -> Settings
__settings :: !Settings
, Env -> Bus
__bus :: !Bus
, Env -> MonitoringBackend
__monitor :: !MonitoringBackend
}
newtype EventStore a =
EventStore { EventStore a -> ReaderT Env IO a
unEventStore :: ReaderT Env IO a }
deriving ( a -> EventStore b -> EventStore a
(a -> b) -> EventStore a -> EventStore b
(forall a b. (a -> b) -> EventStore a -> EventStore b)
-> (forall a b. a -> EventStore b -> EventStore a)
-> Functor EventStore
forall a b. a -> EventStore b -> EventStore a
forall a b. (a -> b) -> EventStore a -> EventStore b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> EventStore b -> EventStore a
$c<$ :: forall a b. a -> EventStore b -> EventStore a
fmap :: (a -> b) -> EventStore a -> EventStore b
$cfmap :: forall a b. (a -> b) -> EventStore a -> EventStore b
Functor
, Functor EventStore
a -> EventStore a
Functor EventStore
-> (forall a. a -> EventStore a)
-> (forall a b.
EventStore (a -> b) -> EventStore a -> EventStore b)
-> (forall a b c.
(a -> b -> c) -> EventStore a -> EventStore b -> EventStore c)
-> (forall a b. EventStore a -> EventStore b -> EventStore b)
-> (forall a b. EventStore a -> EventStore b -> EventStore a)
-> Applicative EventStore
EventStore a -> EventStore b -> EventStore b
EventStore a -> EventStore b -> EventStore a
EventStore (a -> b) -> EventStore a -> EventStore b
(a -> b -> c) -> EventStore a -> EventStore b -> EventStore c
forall a. a -> EventStore a
forall a b. EventStore a -> EventStore b -> EventStore a
forall a b. EventStore a -> EventStore b -> EventStore b
forall a b. EventStore (a -> b) -> EventStore a -> EventStore b
forall a b c.
(a -> b -> c) -> EventStore a -> EventStore b -> EventStore c
forall (f :: * -> *).
Functor f
-> (forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: EventStore a -> EventStore b -> EventStore a
$c<* :: forall a b. EventStore a -> EventStore b -> EventStore a
*> :: EventStore a -> EventStore b -> EventStore b
$c*> :: forall a b. EventStore a -> EventStore b -> EventStore b
liftA2 :: (a -> b -> c) -> EventStore a -> EventStore b -> EventStore c
$cliftA2 :: forall a b c.
(a -> b -> c) -> EventStore a -> EventStore b -> EventStore c
<*> :: EventStore (a -> b) -> EventStore a -> EventStore b
$c<*> :: forall a b. EventStore (a -> b) -> EventStore a -> EventStore b
pure :: a -> EventStore a
$cpure :: forall a. a -> EventStore a
$cp1Applicative :: Functor EventStore
Applicative
, Applicative EventStore
a -> EventStore a
Applicative EventStore
-> (forall a b.
EventStore a -> (a -> EventStore b) -> EventStore b)
-> (forall a b. EventStore a -> EventStore b -> EventStore b)
-> (forall a. a -> EventStore a)
-> Monad EventStore
EventStore a -> (a -> EventStore b) -> EventStore b
EventStore a -> EventStore b -> EventStore b
forall a. a -> EventStore a
forall a b. EventStore a -> EventStore b -> EventStore b
forall a b. EventStore a -> (a -> EventStore b) -> EventStore b
forall (m :: * -> *).
Applicative m
-> (forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: a -> EventStore a
$creturn :: forall a. a -> EventStore a
>> :: EventStore a -> EventStore b -> EventStore b
$c>> :: forall a b. EventStore a -> EventStore b -> EventStore b
>>= :: EventStore a -> (a -> EventStore b) -> EventStore b
$c>>= :: forall a b. EventStore a -> (a -> EventStore b) -> EventStore b
$cp1Monad :: Applicative EventStore
Monad
#if __GLASGOW_HASKELL__ > 710
, Monad EventStore
Monad EventStore
-> (forall a. String -> EventStore a) -> MonadFail EventStore
String -> EventStore a
forall a. String -> EventStore a
forall (m :: * -> *).
Monad m -> (forall a. String -> m a) -> MonadFail m
fail :: String -> EventStore a
$cfail :: forall a. String -> EventStore a
$cp1MonadFail :: Monad EventStore
MonadFail
#endif
, Monad EventStore
e -> EventStore a
Monad EventStore
-> (forall e a. Exception e => e -> EventStore a)
-> MonadThrow EventStore
forall e a. Exception e => e -> EventStore a
forall (m :: * -> *).
Monad m -> (forall e a. Exception e => e -> m a) -> MonadThrow m
throwM :: e -> EventStore a
$cthrowM :: forall e a. Exception e => e -> EventStore a
$cp1MonadThrow :: Monad EventStore
MonadThrow
, MonadThrow EventStore
MonadThrow EventStore
-> (forall e a.
Exception e =>
EventStore a -> (e -> EventStore a) -> EventStore a)
-> MonadCatch EventStore
EventStore a -> (e -> EventStore a) -> EventStore a
forall e a.
Exception e =>
EventStore a -> (e -> EventStore a) -> EventStore a
forall (m :: * -> *).
MonadThrow m
-> (forall e a. Exception e => m a -> (e -> m a) -> m a)
-> MonadCatch m
catch :: EventStore a -> (e -> EventStore a) -> EventStore a
$ccatch :: forall e a.
Exception e =>
EventStore a -> (e -> EventStore a) -> EventStore a
$cp1MonadCatch :: MonadThrow EventStore
MonadCatch
, Monad EventStore
Monad EventStore
-> (forall a. IO a -> EventStore a) -> MonadIO EventStore
IO a -> EventStore a
forall a. IO a -> EventStore a
forall (m :: * -> *).
Monad m -> (forall a. IO a -> m a) -> MonadIO m
liftIO :: IO a -> EventStore a
$cliftIO :: forall a. IO a -> EventStore a
$cp1MonadIO :: Monad EventStore
MonadIO
, Monad EventStore
Monad EventStore
-> (forall a. (a -> EventStore a) -> EventStore a)
-> MonadFix EventStore
(a -> EventStore a) -> EventStore a
forall a. (a -> EventStore a) -> EventStore a
forall (m :: * -> *).
Monad m -> (forall a. (a -> m a) -> m a) -> MonadFix m
mfix :: (a -> EventStore a) -> EventStore a
$cmfix :: forall a. (a -> EventStore a) -> EventStore a
$cp1MonadFix :: Monad EventStore
MonadFix
)
getEnv :: EventStore Env
getEnv :: EventStore Env
getEnv = ReaderT Env IO Env -> EventStore Env
forall a. ReaderT Env IO a -> EventStore a
EventStore ReaderT Env IO Env
forall r (m :: * -> *). MonadReader r m => m r
ask
getSettings :: EventStore Settings
getSettings :: EventStore Settings
getSettings = Env -> Settings
__settings (Env -> Settings) -> EventStore Env -> EventStore Settings
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventStore Env
getEnv
freshUUID :: MonadIO m => m UUID
freshUUID :: m UUID
freshUUID = IO UUID -> m UUID
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UUID
nextRandom
publisher :: EventStore Publish
publisher :: EventStore Publish
publisher = (Env -> Publish) -> EventStore Env -> EventStore Publish
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Bus -> Publish
forall p. Pub p => p -> Publish
asPub (Bus -> Publish) -> (Env -> Bus) -> Env -> Publish
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> Bus
__bus) EventStore Env
getEnv
stopBus :: EventStore ()
stopBus :: EventStore ()
stopBus = Bus -> EventStore ()
forall (m :: * -> *). MonadIO m => Bus -> m ()
busStop (Bus -> EventStore ()) -> (Env -> Bus) -> Env -> EventStore ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> Bus
__bus (Env -> EventStore ()) -> EventStore Env -> EventStore ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< EventStore Env
getEnv
instance MonadBase IO EventStore where
liftBase :: IO α -> EventStore α
liftBase IO α
m = ReaderT Env IO α -> EventStore α
forall a. ReaderT Env IO a -> EventStore a
EventStore (ReaderT Env IO α -> EventStore α)
-> ReaderT Env IO α -> EventStore α
forall a b. (a -> b) -> a -> b
$ IO α -> ReaderT Env IO α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase IO α
m
instance MonadBaseControl IO EventStore where
type StM EventStore a = a
liftBaseWith :: (RunInBase EventStore IO -> IO a) -> EventStore a
liftBaseWith RunInBase EventStore IO -> IO a
run = ReaderT Env IO a -> EventStore a
forall a. ReaderT Env IO a -> EventStore a
EventStore (ReaderT Env IO a -> EventStore a)
-> ReaderT Env IO a -> EventStore a
forall a b. (a -> b) -> a -> b
$ do
Env
env <- ReaderT Env IO Env
forall r (m :: * -> *). MonadReader r m => m r
ask
a
s <- IO a -> ReaderT Env IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> ReaderT Env IO a) -> IO a -> ReaderT Env IO a
forall a b. (a -> b) -> a -> b
$ RunInBase EventStore IO -> IO a
run (\EventStore a
m -> ReaderT Env IO a -> Env -> IO a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (EventStore a -> ReaderT Env IO a
forall a. EventStore a -> ReaderT Env IO a
unEventStore EventStore a
m) Env
env)
StM (ReaderT Env IO) a -> ReaderT Env IO a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM a
StM (ReaderT Env IO) a
s
restoreM :: StM EventStore a -> EventStore a
restoreM = StM EventStore a -> EventStore a
forall (m :: * -> *) a. Monad m => a -> m a
return
instance MonadLogger EventStore where
monadLoggerLog :: Loc -> LogSource -> LogLevel -> msg -> EventStore ()
monadLoggerLog Loc
loc LogSource
src LogLevel
lvl msg
msg = do
LoggerRef
loggerRef <- Env -> LoggerRef
__logRef (Env -> LoggerRef) -> EventStore Env -> EventStore LoggerRef
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventStore Env
getEnv
IO () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> EventStore ()) -> IO () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ LoggerRef -> Loc -> LogSource -> LogLevel -> LogStr -> IO ()
loggerCallback LoggerRef
loggerRef Loc
loc LogSource
src LogLevel
lvl (msg -> LogStr
forall msg. ToLogStr msg => msg -> LogStr
toLogStr msg
msg)
instance MonadLoggerIO EventStore where
askLoggerIO :: EventStore (Loc -> LogSource -> LogLevel -> LogStr -> IO ())
askLoggerIO = do
LoggerRef
loggerRef <- Env -> LoggerRef
__logRef (Env -> LoggerRef) -> EventStore Env -> EventStore LoggerRef
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventStore Env
getEnv
(Loc -> LogSource -> LogLevel -> LogStr -> IO ())
-> EventStore (Loc -> LogSource -> LogLevel -> LogStr -> IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (LoggerRef -> Loc -> LogSource -> LogLevel -> LogStr -> IO ()
loggerCallback LoggerRef
loggerRef)
runEventStore :: LoggerRef -> Settings -> Bus -> EventStore a -> IO a
runEventStore :: LoggerRef -> Settings -> Bus -> EventStore a -> IO a
runEventStore LoggerRef
ref Settings
setts Bus
bus (EventStore ReaderT Env IO a
action) =
ReaderT Env IO a -> Env -> IO a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ReaderT Env IO a
action (LoggerRef -> Settings -> Bus -> MonitoringBackend -> Env
Env LoggerRef
ref Settings
setts Bus
bus (Bus -> MonitoringBackend
_monitoring Bus
bus))
data Message where
Message :: Typeable a => a -> Message
deriving Typeable
instance Show Message where
show :: Message -> String
show (Message a
a) = String
"Message: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> TypeRep -> String
forall a. Show a => a -> String
show (a -> TypeRep
forall a. Typeable a => a -> TypeRep
typeOf a
a)
toMsg :: Typeable a => a -> Message
toMsg :: a -> Message
toMsg = a -> Message
forall a. Typeable a => a -> Message
Message
fromMsg :: Typeable a => Message -> Maybe a
fromMsg :: Message -> Maybe a
fromMsg (Message a
a) = a -> Maybe a
forall a b. (Typeable a, Typeable b) => a -> Maybe b
cast a
a
class Pub p where
publishSTM :: Typeable a => p -> a -> STM Bool
publish :: Typeable a => a -> EventStore ()
publish :: a -> EventStore ()
publish a
a = do
Bus
bus <- Env -> Bus
__bus (Env -> Bus) -> EventStore Env -> EventStore Bus
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventStore Env
getEnv
Bus -> a -> EventStore ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Bus
bus a
a
publishWith :: (Pub p, Typeable a, MonadIO m) => p -> a -> m ()
publishWith :: p -> a -> m ()
publishWith p
p a
a = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Bool
_ <- p -> a -> STM Bool
forall p a. (Pub p, Typeable a) => p -> a -> STM Bool
publishSTM p
p a
a
() -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
class Sub s where
subscribeEventHandler :: s -> EventHandler -> IO ()
subscribe :: (Sub s, Typeable a)
=> s
-> (a -> EventStore ())
-> IO ()
subscribe :: s -> (a -> EventStore ()) -> IO ()
subscribe s
s a -> EventStore ()
k = s -> EventHandler -> IO ()
forall s. Sub s => s -> EventHandler -> IO ()
subscribeEventHandler s
s (Proxy a -> (a -> EventStore ()) -> EventHandler
forall a.
Typeable a =>
Proxy a -> (a -> EventStore ()) -> EventHandler
EventHandler Proxy a
forall k (t :: k). Proxy t
Proxy a -> EventStore ()
k)
data Publish = forall p. Pub p => Publish p
instance Pub Publish where
publishSTM :: Publish -> a -> STM Bool
publishSTM (Publish p
p) a
a = p -> a -> STM Bool
forall p a. (Pub p, Typeable a) => p -> a -> STM Bool
publishSTM p
p a
a
data Subscribe = forall p. Sub p => Subscribe p
instance Sub Subscribe where
subscribeEventHandler :: Subscribe -> EventHandler -> IO ()
subscribeEventHandler (Subscribe p
p) EventHandler
a = p -> EventHandler -> IO ()
forall s. Sub s => s -> EventHandler -> IO ()
subscribeEventHandler p
p EventHandler
a
data Hub = forall h. (Sub h, Pub h) => Hub h
instance Sub Hub where
subscribeEventHandler :: Hub -> EventHandler -> IO ()
subscribeEventHandler (Hub h
h) = h -> EventHandler -> IO ()
forall s. Sub s => s -> EventHandler -> IO ()
subscribeEventHandler h
h
instance Pub Hub where
publishSTM :: Hub -> a -> STM Bool
publishSTM (Hub h
h) = h -> a -> STM Bool
forall p a. (Pub p, Typeable a) => p -> a -> STM Bool
publishSTM h
h
asSub :: Sub s => s -> Subscribe
asSub :: s -> Subscribe
asSub = s -> Subscribe
forall p. Sub p => p -> Subscribe
Subscribe
asPub :: Pub p => p -> Publish
asPub :: p -> Publish
asPub = p -> Publish
forall p. Pub p => p -> Publish
Publish
asHub :: (Sub h, Pub h) => h -> Hub
asHub :: h -> Hub
asHub = h -> Hub
forall h. (Sub h, Pub h) => h -> Hub
Hub
data Type = Type TypeRep Fingerprint
instance Show Type where
show :: Type -> String
show (Type TypeRep
rep Fingerprint
_) = String
"type " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> TypeRep -> String
forall a. Show a => a -> String
show TypeRep
rep
instance Eq Type where
Type TypeRep
_ Fingerprint
a == :: Type -> Type -> Bool
== Type TypeRep
_ Fingerprint
b = Fingerprint
a Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== Fingerprint
b
instance Ord Type where
compare :: Type -> Type -> Ordering
compare (Type TypeRep
_ Fingerprint
a) (Type TypeRep
_ Fingerprint
b) = Fingerprint -> Fingerprint -> Ordering
forall a. Ord a => a -> a -> Ordering
compare Fingerprint
a Fingerprint
b
instance Hashable Type where
hashWithSalt :: Int -> Type -> Int
hashWithSalt Int
s (Type TypeRep
_ (Fingerprint Word64
b Word64
l)) = Int -> (Word64, Word64) -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
s (Word64
b, Word64
l)
data GetType
= forall a. Typeable a => FromTypeable a
| forall prx a. Typeable a => FromProxy (prx a)
getFingerprint :: TypeRep -> Fingerprint
#if __GLASGOW_HASKELL__ == 708
getFingerprint (TypeRep fp _ _) = fp
#else
getFingerprint :: TypeRep -> Fingerprint
getFingerprint = TypeRep -> Fingerprint
typeRepFingerprint
#endif
getType :: GetType -> Type
getType :: GetType -> Type
getType GetType
op = TypeRep -> Fingerprint -> Type
Type TypeRep
t (TypeRep -> Fingerprint
getFingerprint TypeRep
t)
where
t :: TypeRep
t = case GetType
op of
FromTypeable a
a -> a -> TypeRep
forall a. Typeable a => a -> TypeRep
typeOf a
a
FromProxy prx a
prx -> prx a -> TypeRep
forall k (proxy :: k -> *) (a :: k).
Typeable a =>
proxy a -> TypeRep
typeRep prx a
prx
type EventHandlers = HashMap Type (Seq EventHandler)
propagate :: Typeable a => a -> Seq EventHandler -> EventStore ()
propagate :: a -> Seq EventHandler -> EventStore ()
propagate a
a = (EventHandler -> EventStore ())
-> Seq EventHandler -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ ((EventHandler -> EventStore ())
-> Seq EventHandler -> EventStore ())
-> (EventHandler -> EventStore ())
-> Seq EventHandler
-> EventStore ()
forall a b. (a -> b) -> a -> b
$ \(EventHandler Proxy a
_ a -> EventStore ()
k) -> do
let Just a
b = a -> Maybe a
forall a b. (Typeable a, Typeable b) => a -> Maybe b
cast a
a
tpe :: TypeRep
tpe = a -> TypeRep
forall a. Typeable a => a -> TypeRep
typeOf a
b
Either SomeException ()
outcome <- EventStore () -> EventStore (Either SomeException ())
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny (EventStore () -> EventStore (Either SomeException ()))
-> EventStore () -> EventStore (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ a -> EventStore ()
k a
b
case Either SomeException ()
outcome of
Right ()
_ -> () -> EventStore ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Left SomeException
e -> $(Int
String
LogLevel
String -> LogSource
String -> String -> String -> CharPos -> CharPos -> Loc
LogSource -> LogSource
Loc -> LogSource -> LogLevel -> LogSource -> EventStore ()
(LogSource -> EventStore ())
-> (LogSource -> LogSource) -> LogSource -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> LogSource -> LogLevel -> msg -> m ()
pack :: String -> LogSource
id :: forall a. a -> a
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> LogSource -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logError) [i|Exception when propagating #{tpe}: #{e}.|]
data EventHandler where
EventHandler :: Typeable a
=> Proxy a
-> (a -> EventStore ())
-> EventHandler
instance Show EventHandler where
show :: EventHandler -> String
show (EventHandler Proxy a
prx a -> EventStore ()
_) = String
"Handle " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> TypeRep -> String
forall a. Show a => a -> String
show (Proxy a -> TypeRep
forall k (proxy :: k -> *) (a :: k).
Typeable a =>
proxy a -> TypeRep
typeRep Proxy a
prx)
data Bus =
Bus { Bus -> LoggerRef
_busLoggerRef :: LoggerRef
, Bus -> Settings
_busSettings :: Settings
, Bus -> IORef EventHandlers
_busEventHandlers :: IORef EventHandlers
, Bus -> TBMQueue Message
_busQueue :: TBMQueue Message
, Bus -> Async ()
_workerAsync :: Async ()
, Bus -> MonitoringBackend
_monitoring :: MonitoringBackend
}
busStop :: MonadIO m => Bus -> m ()
busStop :: Bus -> m ()
busStop Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ TBMQueue Message -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue Message
_busQueue
busProcessedEverything :: Bus -> IO ()
busProcessedEverything :: Bus -> IO ()
busProcessedEverything Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} = Async (StM IO ()) -> IO ()
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait Async ()
Async (StM IO ())
_workerAsync
messageType :: Type
messageType :: Type
messageType = GetType -> Type
getType (Proxy Message -> GetType
forall (prx :: * -> *) a. Typeable a => prx a -> GetType
FromProxy (Proxy Message
forall k (t :: k). Proxy t
Proxy :: Proxy Message))
newBus :: LoggerRef -> Settings -> IO Bus
newBus :: LoggerRef -> Settings -> IO Bus
newBus LoggerRef
ref Settings
setts = do
Bus
bus <- (Bus -> IO Bus) -> IO Bus
forall (m :: * -> *) a. MonadFix m => (a -> m a) -> m a
mfix ((Bus -> IO Bus) -> IO Bus) -> (Bus -> IO Bus) -> IO Bus
forall a b. (a -> b) -> a -> b
$ \Bus
b -> do
LoggerRef
-> Settings
-> IORef EventHandlers
-> TBMQueue Message
-> Async ()
-> MonitoringBackend
-> Bus
Bus LoggerRef
ref Settings
setts (IORef EventHandlers
-> TBMQueue Message -> Async () -> MonitoringBackend -> Bus)
-> IO (IORef EventHandlers)
-> IO (TBMQueue Message -> Async () -> MonitoringBackend -> Bus)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventHandlers -> IO (IORef EventHandlers)
forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef EventHandlers
forall a. Monoid a => a
mempty
IO (TBMQueue Message -> Async () -> MonitoringBackend -> Bus)
-> IO (TBMQueue Message)
-> IO (Async () -> MonitoringBackend -> Bus)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (TBMQueue Message)
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
500
IO (Async () -> MonitoringBackend -> Bus)
-> IO (Async ()) -> IO (MonitoringBackend -> Bus)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO () -> IO (Async (StM IO ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async (Bus -> IO ()
worker Bus
b)
IO (MonitoringBackend -> Bus) -> IO MonitoringBackend -> IO Bus
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> MonitoringBackend -> IO MonitoringBackend
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Settings -> MonitoringBackend
s_monitoring Settings
setts)
Bus -> IO Bus
forall (m :: * -> *) a. Monad m => a -> m a
return Bus
bus
worker :: Bus -> IO ()
worker :: Bus -> IO ()
worker self :: Bus
self@Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} = IO ()
loop
where
handleMsg :: Message -> IO ()
handleMsg (Message a
a) = do
EventHandlers
callbacks <- IORef EventHandlers -> IO EventHandlers
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef EventHandlers
_busEventHandlers
Bus -> EventHandlers -> a -> IO ()
forall a. Typeable a => Bus -> EventHandlers -> a -> IO ()
publishing Bus
self EventHandlers
callbacks a
a
IO ()
loop
loop :: IO ()
loop = (Message -> IO ()) -> Maybe Message -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Message -> IO ()
handleMsg (Maybe Message -> IO ()) -> IO (Maybe Message) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STM (Maybe Message) -> IO (Maybe Message)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBMQueue Message -> STM (Maybe Message)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue Message
_busQueue)
instance Sub Bus where
subscribeEventHandler :: Bus -> EventHandler -> IO ()
subscribeEventHandler Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} hdl :: EventHandler
hdl@(EventHandler Proxy a
prx a -> EventStore ()
_) =
IORef EventHandlers
-> (EventHandlers -> (EventHandlers, ())) -> IO ()
forall (m :: * -> *) a b.
MonadBase IO m =>
IORef a -> (a -> (a, b)) -> m b
atomicModifyIORef' IORef EventHandlers
_busEventHandlers EventHandlers -> (EventHandlers, ())
update
where
update :: EventHandlers -> (EventHandlers, ())
update :: EventHandlers -> (EventHandlers, ())
update EventHandlers
callbacks =
let tpe :: Type
tpe = GetType -> Type
getType (Proxy a -> GetType
forall (prx :: * -> *) a. Typeable a => prx a -> GetType
FromProxy Proxy a
prx)
next :: ContainerKey EventHandlers -> EventHandlers -> EventHandlers
next = (Maybe (MapValue EventHandlers) -> Maybe (MapValue EventHandlers))
-> ContainerKey EventHandlers -> EventHandlers -> EventHandlers
forall map.
IsMap map =>
(Maybe (MapValue map) -> Maybe (MapValue map))
-> ContainerKey map -> map -> map
alterMap ((Maybe (MapValue EventHandlers) -> Maybe (MapValue EventHandlers))
-> ContainerKey EventHandlers -> EventHandlers -> EventHandlers)
-> (Maybe (MapValue EventHandlers)
-> Maybe (MapValue EventHandlers))
-> ContainerKey EventHandlers
-> EventHandlers
-> EventHandlers
forall a b. (a -> b) -> a -> b
$ \Maybe (MapValue EventHandlers)
input ->
case Maybe (MapValue EventHandlers)
input of
Maybe (MapValue EventHandlers)
Nothing -> Seq EventHandler -> Maybe (Seq EventHandler)
forall a. a -> Maybe a
Just (Element (Seq EventHandler) -> Seq EventHandler
forall seq. MonoPointed seq => Element seq -> seq
singleton Element (Seq EventHandler)
EventHandler
hdl)
Just MapValue EventHandlers
hs -> Seq EventHandler -> Maybe (Seq EventHandler)
forall a. a -> Maybe a
Just (Seq EventHandler -> Element (Seq EventHandler) -> Seq EventHandler
forall seq. SemiSequence seq => seq -> Element seq -> seq
snoc Seq EventHandler
MapValue EventHandlers
hs Element (Seq EventHandler)
EventHandler
hdl) in
(ContainerKey EventHandlers -> EventHandlers -> EventHandlers
next ContainerKey EventHandlers
Type
tpe EventHandlers
callbacks, ())
instance Pub Bus where
publishSTM :: Bus -> a -> STM Bool
publishSTM Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} a
a = do
Bool
closed <- TBMQueue Message -> STM Bool
forall a. TBMQueue a -> STM Bool
isClosedTBMQueue TBMQueue Message
_busQueue
TBMQueue Message -> Message -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue Message
_busQueue (a -> Message
forall a. Typeable a => a -> Message
toMsg a
a)
Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> STM Bool) -> Bool -> STM Bool
forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not Bool
closed
publishing :: Typeable a => Bus -> EventHandlers -> a -> IO ()
publishing :: Bus -> EventHandlers -> a -> IO ()
publishing self :: Bus
self@Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} EventHandlers
callbacks a
a = do
let tpe :: Type
tpe = GetType -> Type
getType (a -> GetType
forall a. Typeable a => a -> GetType
FromTypeable a
a)
LoggerRef -> Settings -> Bus -> EventStore () -> IO ()
forall a. LoggerRef -> Settings -> Bus -> EventStore a -> IO a
runEventStore LoggerRef
_busLoggerRef Settings
_busSettings Bus
self (EventStore () -> IO ()) -> EventStore () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
$(Int
String
LogLevel
String -> LogSource
String -> String -> String -> CharPos -> CharPos -> Loc
LogSource -> LogSource
Loc -> LogSource -> LogLevel -> LogSource -> EventStore ()
(LogSource -> EventStore ())
-> (LogSource -> LogSource) -> LogSource -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> LogSource -> LogLevel -> msg -> m ()
pack :: String -> LogSource
id :: forall a. a -> a
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> LogSource -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug) [i|Publishing message #{tpe}.|]
(Seq EventHandler -> EventStore ())
-> Maybe (Seq EventHandler) -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (a -> Seq EventHandler -> EventStore ()
forall a. Typeable a => a -> Seq EventHandler -> EventStore ()
propagate a
a) (ContainerKey EventHandlers
-> EventHandlers -> Maybe (MapValue EventHandlers)
forall map.
IsMap map =>
ContainerKey map -> map -> Maybe (MapValue map)
lookup ContainerKey EventHandlers
Type
tpe EventHandlers
callbacks)
$(Int
String
LogLevel
String -> LogSource
String -> String -> String -> CharPos -> CharPos -> Loc
LogSource -> LogSource
Loc -> LogSource -> LogLevel -> LogSource -> EventStore ()
(LogSource -> EventStore ())
-> (LogSource -> LogSource) -> LogSource -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> LogSource -> LogLevel -> msg -> m ()
pack :: String -> LogSource
id :: forall a. a -> a
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> LogSource -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug) [i|Message #{tpe} propagated.|]
Bool -> EventStore () -> EventStore ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Type
tpe Type -> Type -> Bool
forall a. Eq a => a -> a -> Bool
== Type
messageType) (EventStore () -> EventStore ()) -> EventStore () -> EventStore ()
forall a b. (a -> b) -> a -> b
$
(Seq EventHandler -> EventStore ())
-> Maybe (Seq EventHandler) -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Message -> Seq EventHandler -> EventStore ()
forall a. Typeable a => a -> Seq EventHandler -> EventStore ()
propagate (a -> Message
forall a. Typeable a => a -> Message
toMsg a
a)) (ContainerKey EventHandlers
-> EventHandlers -> Maybe (MapValue EventHandlers)
forall map.
IsMap map =>
ContainerKey map -> map -> Maybe (MapValue map)
lookup ContainerKey EventHandlers
Type
messageType EventHandlers
callbacks)
monitorIncrPkgCount :: EventStore ()
monitorIncrPkgCount :: EventStore ()
monitorIncrPkgCount = do
Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
IO () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> EventStore ()) -> IO () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> IO ()
monitoringBackendIncrPkgCount MonitoringBackend
__monitor
monitorIncrConnectionDrop :: EventStore ()
monitorIncrConnectionDrop :: EventStore ()
monitorIncrConnectionDrop = do
Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
IO () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> EventStore ()) -> IO () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> IO ()
monitoringBackendIncrConnectionDrop MonitoringBackend
__monitor
monitorAddDataTransmitted :: Int -> EventStore ()
monitorAddDataTransmitted :: Int -> EventStore ()
monitorAddDataTransmitted Int
siz = do
Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
IO () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> EventStore ()) -> IO () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> Int -> IO ()
monitoringBackendAddDataTransmitted MonitoringBackend
__monitor Int
siz
monitorIncrForceReconnect :: EventStore ()
monitorIncrForceReconnect :: EventStore ()
monitorIncrForceReconnect = do
Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
IO () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> EventStore ()) -> IO () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> IO ()
monitoringBackendIncrForceReconnect MonitoringBackend
__monitor
monitorIncrHeartbeatTimeouts :: EventStore ()
monitorIncrHeartbeatTimeouts :: EventStore ()
monitorIncrHeartbeatTimeouts = do
Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
IO () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> EventStore ()) -> IO () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> IO ()
monitoringBackendIncrHeartbeatTimeouts MonitoringBackend
__monitor