{-# LANGUAGE CPP                        #-}
{-# LANGUAGE DeriveDataTypeable         #-}
{-# LANGUAGE GADTs                      #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Control
-- Copyright : (C) 2014 Yorick Laupa
-- License : (see the file LICENSE)
--
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
--
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Control
  (
    -- * Control
    EventStore
  , runEventStore
  , getSettings
  , freshUUID
    -- * Messaging
  , Pub(..)
  , Sub(..)
  , Hub
  , Subscribe
  , Publish
  , asPub
  , asSub
  , asHub
  , Bus
  , newBus
  , busStop
  , Message
  , toMsg
  , fromMsg
  , busProcessedEverything
  , publish
  , publishWith
  , subscribe
  , stopBus
  , publisher
    -- * Monitoring
  , monitorIncrPkgCount
  , monitorIncrConnectionDrop
  , monitorAddDataTransmitted
  , monitorIncrForceReconnect
  , monitorIncrHeartbeatTimeouts
    -- * Re-export
  , module Database.EventStore.Internal.Settings
  ) where

--------------------------------------------------------------------------------
#if __GLASGOW_HASKELL__ > 710
import Control.Monad.Fail
#endif
import Data.Typeable
#if __GLASGOW_HASKELL__ < 802
import Data.Typeable.Internal
#else
import GHC.Fingerprint
#endif

--------------------------------------------------------------------------------
import Control.Monad.Reader
import Data.UUID
import Data.UUID.V4

--------------------------------------------------------------------------------
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings

--------------------------------------------------------------------------------
data Env =
  Env { Env -> LoggerRef
__logRef   :: !LoggerRef
      , Env -> Settings
__settings :: !Settings
      , Env -> Bus
__bus      :: !Bus
      , Env -> MonitoringBackend
__monitor  :: !MonitoringBackend
      }

--------------------------------------------------------------------------------
newtype EventStore a =
  EventStore { forall a. EventStore a -> ReaderT Env IO a
unEventStore :: ReaderT Env IO a }
  deriving ( forall a b. a -> EventStore b -> EventStore a
forall a b. (a -> b) -> EventStore a -> EventStore b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> EventStore b -> EventStore a
$c<$ :: forall a b. a -> EventStore b -> EventStore a
fmap :: forall a b. (a -> b) -> EventStore a -> EventStore b
$cfmap :: forall a b. (a -> b) -> EventStore a -> EventStore b
Functor
           , Functor EventStore
forall a. a -> EventStore a
forall a b. EventStore a -> EventStore b -> EventStore a
forall a b. EventStore a -> EventStore b -> EventStore b
forall a b. EventStore (a -> b) -> EventStore a -> EventStore b
forall a b c.
(a -> b -> c) -> EventStore a -> EventStore b -> EventStore c
forall (f :: * -> *).
Functor f
-> (forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: forall a b. EventStore a -> EventStore b -> EventStore a
$c<* :: forall a b. EventStore a -> EventStore b -> EventStore a
*> :: forall a b. EventStore a -> EventStore b -> EventStore b
$c*> :: forall a b. EventStore a -> EventStore b -> EventStore b
liftA2 :: forall a b c.
(a -> b -> c) -> EventStore a -> EventStore b -> EventStore c
$cliftA2 :: forall a b c.
(a -> b -> c) -> EventStore a -> EventStore b -> EventStore c
<*> :: forall a b. EventStore (a -> b) -> EventStore a -> EventStore b
$c<*> :: forall a b. EventStore (a -> b) -> EventStore a -> EventStore b
pure :: forall a. a -> EventStore a
$cpure :: forall a. a -> EventStore a
Applicative
           , Applicative EventStore
forall a. a -> EventStore a
forall a b. EventStore a -> EventStore b -> EventStore b
forall a b. EventStore a -> (a -> EventStore b) -> EventStore b
forall (m :: * -> *).
Applicative m
-> (forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: forall a. a -> EventStore a
$creturn :: forall a. a -> EventStore a
>> :: forall a b. EventStore a -> EventStore b -> EventStore b
$c>> :: forall a b. EventStore a -> EventStore b -> EventStore b
>>= :: forall a b. EventStore a -> (a -> EventStore b) -> EventStore b
$c>>= :: forall a b. EventStore a -> (a -> EventStore b) -> EventStore b
Monad
#if __GLASGOW_HASKELL__ > 710
           , Monad EventStore
forall a. String -> EventStore a
forall (m :: * -> *).
Monad m -> (forall a. String -> m a) -> MonadFail m
fail :: forall a. String -> EventStore a
$cfail :: forall a. String -> EventStore a
MonadFail
#endif
           , Monad EventStore
forall e a. Exception e => e -> EventStore a
forall (m :: * -> *).
Monad m -> (forall e a. Exception e => e -> m a) -> MonadThrow m
throwM :: forall e a. Exception e => e -> EventStore a
$cthrowM :: forall e a. Exception e => e -> EventStore a
MonadThrow
           , MonadThrow EventStore
forall e a.
Exception e =>
EventStore a -> (e -> EventStore a) -> EventStore a
forall (m :: * -> *).
MonadThrow m
-> (forall e a. Exception e => m a -> (e -> m a) -> m a)
-> MonadCatch m
catch :: forall e a.
Exception e =>
EventStore a -> (e -> EventStore a) -> EventStore a
$ccatch :: forall e a.
Exception e =>
EventStore a -> (e -> EventStore a) -> EventStore a
MonadCatch
           , Monad EventStore
forall a. IO a -> EventStore a
forall (m :: * -> *).
Monad m -> (forall a. IO a -> m a) -> MonadIO m
liftIO :: forall a. IO a -> EventStore a
$cliftIO :: forall a. IO a -> EventStore a
MonadIO
           , Monad EventStore
forall a. (a -> EventStore a) -> EventStore a
forall (m :: * -> *).
Monad m -> (forall a. (a -> m a) -> m a) -> MonadFix m
mfix :: forall a. (a -> EventStore a) -> EventStore a
$cmfix :: forall a. (a -> EventStore a) -> EventStore a
MonadFix
           )

--------------------------------------------------------------------------------
getEnv :: EventStore Env
getEnv :: EventStore Env
getEnv = forall a. ReaderT Env IO a -> EventStore a
EventStore forall r (m :: * -> *). MonadReader r m => m r
ask

--------------------------------------------------------------------------------
getSettings :: EventStore Settings
getSettings :: EventStore Settings
getSettings = Env -> Settings
__settings forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventStore Env
getEnv

--------------------------------------------------------------------------------
freshUUID :: MonadIO m => m UUID
freshUUID :: forall (m :: * -> *). MonadIO m => m UUID
freshUUID = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UUID
nextRandom

--------------------------------------------------------------------------------
publisher :: EventStore Publish
publisher :: EventStore Publish
publisher = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall p. Pub p => p -> Publish
asPub forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> Bus
__bus) EventStore Env
getEnv

--------------------------------------------------------------------------------
stopBus :: EventStore ()
stopBus :: EventStore ()
stopBus = forall (m :: * -> *). MonadIO m => Bus -> m ()
busStop forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> Bus
__bus forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< EventStore Env
getEnv

--------------------------------------------------------------------------------
instance MonadBase IO EventStore where
  liftBase :: forall a. IO a -> EventStore a
liftBase IO α
m = forall a. ReaderT Env IO a -> EventStore a
EventStore forall a b. (a -> b) -> a -> b
$ forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase IO α
m

--------------------------------------------------------------------------------
instance MonadBaseControl IO EventStore where
    type StM EventStore a = a
    liftBaseWith :: forall a. (RunInBase EventStore IO -> IO a) -> EventStore a
liftBaseWith RunInBase EventStore IO -> IO a
run = forall a. ReaderT Env IO a -> EventStore a
EventStore forall a b. (a -> b) -> a -> b
$ do
      Env
env <- forall r (m :: * -> *). MonadReader r m => m r
ask
      a
s   <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ RunInBase EventStore IO -> IO a
run (\EventStore a
m -> forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (forall a. EventStore a -> ReaderT Env IO a
unEventStore EventStore a
m) Env
env)
      forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM a
s
    restoreM :: forall a. StM EventStore a -> EventStore a
restoreM = forall (m :: * -> *) a. Monad m => a -> m a
return

--------------------------------------------------------------------------------
instance MonadLogger EventStore where
  monadLoggerLog :: forall msg.
ToLogStr msg =>
Loc -> LogSource -> LogLevel -> msg -> EventStore ()
monadLoggerLog Loc
loc LogSource
src LogLevel
lvl msg
msg  = do
    LoggerRef
loggerRef <- Env -> LoggerRef
__logRef forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventStore Env
getEnv
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ LoggerRef -> Loc -> LogSource -> LogLevel -> LogStr -> IO ()
loggerCallback LoggerRef
loggerRef Loc
loc LogSource
src LogLevel
lvl (forall msg. ToLogStr msg => msg -> LogStr
toLogStr msg
msg)

--------------------------------------------------------------------------------
instance MonadLoggerIO EventStore where
  askLoggerIO :: EventStore (Loc -> LogSource -> LogLevel -> LogStr -> IO ())
askLoggerIO = do
    LoggerRef
loggerRef <- Env -> LoggerRef
__logRef forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventStore Env
getEnv
    forall (m :: * -> *) a. Monad m => a -> m a
return (LoggerRef -> Loc -> LogSource -> LogLevel -> LogStr -> IO ()
loggerCallback LoggerRef
loggerRef)

--------------------------------------------------------------------------------
runEventStore :: LoggerRef -> Settings -> Bus -> EventStore a -> IO a
runEventStore :: forall a. LoggerRef -> Settings -> Bus -> EventStore a -> IO a
runEventStore LoggerRef
ref Settings
setts Bus
bus (EventStore ReaderT Env IO a
action) =
  forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ReaderT Env IO a
action (LoggerRef -> Settings -> Bus -> MonitoringBackend -> Env
Env LoggerRef
ref Settings
setts Bus
bus (Bus -> MonitoringBackend
_monitoring Bus
bus))

--------------------------------------------------------------------------------
-- Messaging
--------------------------------------------------------------------------------
data Message where
  Message :: Typeable a => a -> Message
  deriving Typeable

--------------------------------------------------------------------------------
instance Show Message where
  show :: Message -> String
show (Message a
a) = String
"Message: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (forall a. Typeable a => a -> TypeRep
typeOf a
a)

--------------------------------------------------------------------------------
toMsg :: Typeable a => a -> Message
toMsg :: forall a. Typeable a => a -> Message
toMsg = forall a. Typeable a => a -> Message
Message

--------------------------------------------------------------------------------
fromMsg :: Typeable a => Message -> Maybe a
fromMsg :: forall a. Typeable a => Message -> Maybe a
fromMsg (Message a
a) = forall a b. (Typeable a, Typeable b) => a -> Maybe b
cast a
a

--------------------------------------------------------------------------------
class Pub p where
  publishSTM :: Typeable a => p -> a -> STM Bool

--------------------------------------------------------------------------------
publish :: Typeable a => a -> EventStore ()
publish :: forall a. Typeable a => a -> EventStore ()
publish a
a = do
  Bus
bus <- Env -> Bus
__bus forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventStore Env
getEnv
  forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Bus
bus a
a

--------------------------------------------------------------------------------
publishWith :: (Pub p, Typeable a, MonadIO m) => p -> a -> m ()
publishWith :: forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith p
p a
a = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
  Bool
_ <- forall p a. (Pub p, Typeable a) => p -> a -> STM Bool
publishSTM p
p a
a
  forall (m :: * -> *) a. Monad m => a -> m a
return ()

--------------------------------------------------------------------------------
class Sub s where
  subscribeEventHandler :: s -> EventHandler -> IO ()

--------------------------------------------------------------------------------
subscribe :: (Sub s, Typeable a)
          => s
          -> (a -> EventStore ())
          -> IO ()
subscribe :: forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe s
s a -> EventStore ()
k = forall s. Sub s => s -> EventHandler -> IO ()
subscribeEventHandler s
s (forall a.
Typeable a =>
Proxy a -> (a -> EventStore ()) -> EventHandler
EventHandler forall {k} (t :: k). Proxy t
Proxy a -> EventStore ()
k)

--------------------------------------------------------------------------------
data Publish = forall p. Pub p => Publish p

--------------------------------------------------------------------------------
instance Pub Publish where
  publishSTM :: forall a. Typeable a => Publish -> a -> STM Bool
publishSTM (Publish p
p) a
a = forall p a. (Pub p, Typeable a) => p -> a -> STM Bool
publishSTM p
p a
a

--------------------------------------------------------------------------------
data Subscribe = forall p. Sub p => Subscribe p

--------------------------------------------------------------------------------
instance Sub Subscribe where
  subscribeEventHandler :: Subscribe -> EventHandler -> IO ()
subscribeEventHandler (Subscribe p
p) EventHandler
a = forall s. Sub s => s -> EventHandler -> IO ()
subscribeEventHandler p
p EventHandler
a

--------------------------------------------------------------------------------
data Hub = forall h. (Sub h, Pub h) => Hub h

--------------------------------------------------------------------------------
instance Sub Hub where
  subscribeEventHandler :: Hub -> EventHandler -> IO ()
subscribeEventHandler (Hub h
h) = forall s. Sub s => s -> EventHandler -> IO ()
subscribeEventHandler h
h

--------------------------------------------------------------------------------
instance Pub Hub where
  publishSTM :: forall a. Typeable a => Hub -> a -> STM Bool
publishSTM (Hub h
h) = forall p a. (Pub p, Typeable a) => p -> a -> STM Bool
publishSTM h
h

--------------------------------------------------------------------------------
asSub :: Sub s => s -> Subscribe
asSub :: forall s. Sub s => s -> Subscribe
asSub = forall s. Sub s => s -> Subscribe
Subscribe

--------------------------------------------------------------------------------
asPub :: Pub p => p -> Publish
asPub :: forall p. Pub p => p -> Publish
asPub = forall p. Pub p => p -> Publish
Publish

--------------------------------------------------------------------------------
asHub :: (Sub h, Pub h) => h -> Hub
asHub :: forall h. (Sub h, Pub h) => h -> Hub
asHub = forall h. (Sub h, Pub h) => h -> Hub
Hub

--------------------------------------------------------------------------------
data Type = Type TypeRep Fingerprint

--------------------------------------------------------------------------------
instance Show Type where
  show :: Type -> String
show (Type TypeRep
rep Fingerprint
_) = String
"type " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show TypeRep
rep

--------------------------------------------------------------------------------
instance Eq Type where
  Type TypeRep
_ Fingerprint
a == :: Type -> Type -> Bool
== Type TypeRep
_ Fingerprint
b = Fingerprint
a forall a. Eq a => a -> a -> Bool
== Fingerprint
b

--------------------------------------------------------------------------------
instance Ord Type where
  compare :: Type -> Type -> Ordering
compare (Type TypeRep
_ Fingerprint
a) (Type TypeRep
_ Fingerprint
b) = forall a. Ord a => a -> a -> Ordering
compare Fingerprint
a Fingerprint
b

--------------------------------------------------------------------------------
instance Hashable Type where
  hashWithSalt :: Int -> Type -> Int
hashWithSalt Int
s (Type TypeRep
_ (Fingerprint Word64
b Word64
l)) = forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
s (Word64
b, Word64
l)

--------------------------------------------------------------------------------
data GetType
  = forall a. Typeable a => FromTypeable a
  | forall prx a. Typeable a => FromProxy (prx a)

--------------------------------------------------------------------------------
getFingerprint :: TypeRep -> Fingerprint
#if __GLASGOW_HASKELL__ == 708
getFingerprint (TypeRep fp _ _) = fp
#else
getFingerprint :: TypeRep -> Fingerprint
getFingerprint = TypeRep -> Fingerprint
typeRepFingerprint
#endif

--------------------------------------------------------------------------------
getType :: GetType -> Type
getType :: GetType -> Type
getType GetType
op = TypeRep -> Fingerprint -> Type
Type TypeRep
t (TypeRep -> Fingerprint
getFingerprint TypeRep
t)
  where
    t :: TypeRep
t = case GetType
op of
          FromTypeable a
a -> forall a. Typeable a => a -> TypeRep
typeOf a
a
          FromProxy prx a
prx  -> forall {k} (proxy :: k -> *) (a :: k).
Typeable a =>
proxy a -> TypeRep
typeRep prx a
prx

--------------------------------------------------------------------------------
type EventHandlers = HashMap Type (Seq EventHandler)

--------------------------------------------------------------------------------
propagate :: Typeable a => a -> Seq EventHandler -> EventStore ()
propagate :: forall a. Typeable a => a -> Seq EventHandler -> EventStore ()
propagate a
a = forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ forall a b. (a -> b) -> a -> b
$ \(EventHandler Proxy a
_ a -> EventStore ()
k) -> do
  let Just a
b = forall a b. (Typeable a, Typeable b) => a -> Maybe b
cast a
a
      tpe :: TypeRep
tpe    = forall a. Typeable a => a -> TypeRep
typeOf a
b
  Either SomeException ()
outcome <- forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny forall a b. (a -> b) -> a -> b
$ a -> EventStore ()
k a
b
  case Either SomeException ()
outcome of
    Right ()
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Left SomeException
e  -> $(logError) [i|Exception when propagating #{tpe}: #{e}.|]

--------------------------------------------------------------------------------
data EventHandler where
  EventHandler :: Typeable a
               => Proxy a
               -> (a -> EventStore ())
               -> EventHandler

--------------------------------------------------------------------------------
instance Show EventHandler where
  show :: EventHandler -> String
show (EventHandler Proxy a
prx a -> EventStore ()
_) = String
"Handle " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show (forall {k} (proxy :: k -> *) (a :: k).
Typeable a =>
proxy a -> TypeRep
typeRep Proxy a
prx)

--------------------------------------------------------------------------------
data Bus =
  Bus { Bus -> LoggerRef
_busLoggerRef      :: LoggerRef
      , Bus -> Settings
_busSettings       :: Settings
      , Bus -> IORef EventHandlers
_busEventHandlers  :: IORef EventHandlers
      , Bus -> TBMQueue Message
_busQueue          :: TBMQueue Message
      , Bus -> Async ()
_workerAsync       :: Async ()
      , Bus -> MonitoringBackend
_monitoring        :: MonitoringBackend
      }

--------------------------------------------------------------------------------
busStop :: MonadIO m => Bus -> m ()
busStop :: forall (m :: * -> *). MonadIO m => Bus -> m ()
busStop Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue Message
_busQueue

--------------------------------------------------------------------------------
busProcessedEverything :: Bus -> IO ()
busProcessedEverything :: Bus -> IO ()
busProcessedEverything Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} = forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait Async ()
_workerAsync

--------------------------------------------------------------------------------
messageType :: Type
messageType :: Type
messageType = GetType -> Type
getType (forall (prx :: * -> *) a. Typeable a => prx a -> GetType
FromProxy (forall {k} (t :: k). Proxy t
Proxy :: Proxy Message))

--------------------------------------------------------------------------------
newBus :: LoggerRef -> Settings -> IO Bus
newBus :: LoggerRef -> Settings -> IO Bus
newBus LoggerRef
ref Settings
setts = do
  Bus
bus <- forall (m :: * -> *) a. MonadFix m => (a -> m a) -> m a
mfix forall a b. (a -> b) -> a -> b
$ \Bus
b -> do
    LoggerRef
-> Settings
-> IORef EventHandlers
-> TBMQueue Message
-> Async ()
-> MonitoringBackend
-> Bus
Bus LoggerRef
ref Settings
setts forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef forall a. Monoid a => a
mempty
                  forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
500
                  forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async (Bus -> IO ()
worker Bus
b)
                  forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall (f :: * -> *) a. Applicative f => a -> f a
pure (Settings -> MonitoringBackend
s_monitoring Settings
setts)

  forall (m :: * -> *) a. Monad m => a -> m a
return Bus
bus

--------------------------------------------------------------------------------
worker :: Bus -> IO ()
worker :: Bus -> IO ()
worker self :: Bus
self@Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} = IO ()
loop
  where
    handleMsg :: Message -> IO ()
handleMsg (Message a
a) = do
      EventHandlers
callbacks <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef EventHandlers
_busEventHandlers
      forall a. Typeable a => Bus -> EventHandlers -> a -> IO ()
publishing Bus
self EventHandlers
callbacks a
a
      IO ()
loop

    loop :: IO ()
loop = forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Message -> IO ()
handleMsg forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue Message
_busQueue)

--------------------------------------------------------------------------------
instance Sub Bus where
  subscribeEventHandler :: Bus -> EventHandler -> IO ()
subscribeEventHandler Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} hdl :: EventHandler
hdl@(EventHandler Proxy a
prx a -> EventStore ()
_) =
    forall (m :: * -> *) a b.
MonadBase IO m =>
IORef a -> (a -> (a, b)) -> m b
atomicModifyIORef' IORef EventHandlers
_busEventHandlers EventHandlers -> (EventHandlers, ())
update
    where
      update :: EventHandlers -> (EventHandlers, ())
      update :: EventHandlers -> (EventHandlers, ())
update EventHandlers
callbacks =
        let tpe :: Type
tpe  = GetType -> Type
getType (forall (prx :: * -> *) a. Typeable a => prx a -> GetType
FromProxy Proxy a
prx)
            next :: ContainerKey EventHandlers -> EventHandlers -> EventHandlers
next = forall map.
IsMap map =>
(Maybe (MapValue map) -> Maybe (MapValue map))
-> ContainerKey map -> map -> map
alterMap forall a b. (a -> b) -> a -> b
$ \Maybe (MapValue EventHandlers)
input ->
              case Maybe (MapValue EventHandlers)
input of
                Maybe (MapValue EventHandlers)
Nothing -> forall a. a -> Maybe a
Just (forall seq. MonoPointed seq => Element seq -> seq
singleton EventHandler
hdl)
                Just MapValue EventHandlers
hs -> forall a. a -> Maybe a
Just (forall seq. SemiSequence seq => seq -> Element seq -> seq
snoc MapValue EventHandlers
hs EventHandler
hdl) in
        (ContainerKey EventHandlers -> EventHandlers -> EventHandlers
next Type
tpe EventHandlers
callbacks, ())

--------------------------------------------------------------------------------
instance Pub Bus where
  publishSTM :: forall a. Typeable a => Bus -> a -> STM Bool
publishSTM Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} a
a = do
    Bool
closed <- forall a. TBMQueue a -> STM Bool
isClosedTBMQueue TBMQueue Message
_busQueue
    forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue Message
_busQueue (forall a. Typeable a => a -> Message
toMsg a
a)
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not Bool
closed

--------------------------------------------------------------------------------
publishing :: Typeable a => Bus -> EventHandlers -> a -> IO ()
publishing :: forall a. Typeable a => Bus -> EventHandlers -> a -> IO ()
publishing self :: Bus
self@Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} EventHandlers
callbacks a
a = do
  let tpe :: Type
tpe = GetType -> Type
getType (forall a. Typeable a => a -> GetType
FromTypeable a
a)
  forall a. LoggerRef -> Settings -> Bus -> EventStore a -> IO a
runEventStore LoggerRef
_busLoggerRef Settings
_busSettings Bus
self forall a b. (a -> b) -> a -> b
$ do
    $(logDebug) [i|Publishing message #{tpe}.|]
    forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (forall a. Typeable a => a -> Seq EventHandler -> EventStore ()
propagate a
a) (forall map.
IsMap map =>
ContainerKey map -> map -> Maybe (MapValue map)
lookup Type
tpe EventHandlers
callbacks)
    $(logDebug) [i|Message #{tpe} propagated.|]

    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Type
tpe forall a. Eq a => a -> a -> Bool
== Type
messageType) forall a b. (a -> b) -> a -> b
$
      forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (forall a. Typeable a => a -> Seq EventHandler -> EventStore ()
propagate (forall a. Typeable a => a -> Message
toMsg a
a)) (forall map.
IsMap map =>
ContainerKey map -> map -> Maybe (MapValue map)
lookup Type
messageType EventHandlers
callbacks)

--------------------------------------------------------------------------------
-- Monitoring
--------------------------------------------------------------------------------

--------------------------------------------------------------------------------
monitorIncrPkgCount :: EventStore ()
monitorIncrPkgCount :: EventStore ()
monitorIncrPkgCount = do
  Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> IO ()
monitoringBackendIncrPkgCount MonitoringBackend
__monitor

--------------------------------------------------------------------------------
monitorIncrConnectionDrop :: EventStore ()
monitorIncrConnectionDrop :: EventStore ()
monitorIncrConnectionDrop = do
  Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> IO ()
monitoringBackendIncrConnectionDrop MonitoringBackend
__monitor

--------------------------------------------------------------------------------
monitorAddDataTransmitted :: Int -> EventStore ()
monitorAddDataTransmitted :: Int -> EventStore ()
monitorAddDataTransmitted Int
siz = do
  Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> Int -> IO ()
monitoringBackendAddDataTransmitted MonitoringBackend
__monitor Int
siz

--------------------------------------------------------------------------------
monitorIncrForceReconnect :: EventStore ()
monitorIncrForceReconnect :: EventStore ()
monitorIncrForceReconnect = do
  Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> IO ()
monitoringBackendIncrForceReconnect MonitoringBackend
__monitor

--------------------------------------------------------------------------------
monitorIncrHeartbeatTimeouts :: EventStore ()
monitorIncrHeartbeatTimeouts :: EventStore ()
monitorIncrHeartbeatTimeouts = do
  Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> IO ()
monitoringBackendIncrHeartbeatTimeouts MonitoringBackend
__monitor