{-# LANGUAGE CPP                        #-}
{-# LANGUAGE DeriveDataTypeable         #-}
{-# LANGUAGE GADTs                      #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Control
-- Copyright : (C) 2014 Yorick Laupa
-- License : (see the file LICENSE)
--
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
--
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Control
  (
    -- * Control
    EventStore
  , runEventStore
  , getSettings
  , freshUUID
    -- * Messaging
  , Pub(..)
  , Sub(..)
  , Hub
  , Subscribe
  , Publish
  , asPub
  , asSub
  , asHub
  , Bus
  , newBus
  , busStop
  , Message
  , toMsg
  , fromMsg
  , busProcessedEverything
  , publish
  , publishWith
  , subscribe
  , stopBus
  , publisher
    -- * Monitoring
  , monitorIncrPkgCount
  , monitorIncrConnectionDrop
  , monitorAddDataTransmitted
  , monitorIncrForceReconnect
  , monitorIncrHeartbeatTimeouts
    -- * Re-export
  , module Database.EventStore.Internal.Settings
  ) where

--------------------------------------------------------------------------------
#if __GLASGOW_HASKELL__ > 710
import Control.Monad.Fail
#endif
import Data.Typeable
#if __GLASGOW_HASKELL__ < 802
import Data.Typeable.Internal
#else
import GHC.Fingerprint
#endif

--------------------------------------------------------------------------------
import Control.Monad.Reader
import Data.UUID
import Data.UUID.V4

--------------------------------------------------------------------------------
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Settings

--------------------------------------------------------------------------------
data Env =
  Env { Env -> LoggerRef
__logRef   :: !LoggerRef
      , Env -> Settings
__settings :: !Settings
      , Env -> Bus
__bus      :: !Bus
      , Env -> MonitoringBackend
__monitor  :: !MonitoringBackend
      }

--------------------------------------------------------------------------------
newtype EventStore a =
  EventStore { EventStore a -> ReaderT Env IO a
unEventStore :: ReaderT Env IO a }
  deriving ( a -> EventStore b -> EventStore a
(a -> b) -> EventStore a -> EventStore b
(forall a b. (a -> b) -> EventStore a -> EventStore b)
-> (forall a b. a -> EventStore b -> EventStore a)
-> Functor EventStore
forall a b. a -> EventStore b -> EventStore a
forall a b. (a -> b) -> EventStore a -> EventStore b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> EventStore b -> EventStore a
$c<$ :: forall a b. a -> EventStore b -> EventStore a
fmap :: (a -> b) -> EventStore a -> EventStore b
$cfmap :: forall a b. (a -> b) -> EventStore a -> EventStore b
Functor
           , Functor EventStore
a -> EventStore a
Functor EventStore
-> (forall a. a -> EventStore a)
-> (forall a b.
    EventStore (a -> b) -> EventStore a -> EventStore b)
-> (forall a b c.
    (a -> b -> c) -> EventStore a -> EventStore b -> EventStore c)
-> (forall a b. EventStore a -> EventStore b -> EventStore b)
-> (forall a b. EventStore a -> EventStore b -> EventStore a)
-> Applicative EventStore
EventStore a -> EventStore b -> EventStore b
EventStore a -> EventStore b -> EventStore a
EventStore (a -> b) -> EventStore a -> EventStore b
(a -> b -> c) -> EventStore a -> EventStore b -> EventStore c
forall a. a -> EventStore a
forall a b. EventStore a -> EventStore b -> EventStore a
forall a b. EventStore a -> EventStore b -> EventStore b
forall a b. EventStore (a -> b) -> EventStore a -> EventStore b
forall a b c.
(a -> b -> c) -> EventStore a -> EventStore b -> EventStore c
forall (f :: * -> *).
Functor f
-> (forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: EventStore a -> EventStore b -> EventStore a
$c<* :: forall a b. EventStore a -> EventStore b -> EventStore a
*> :: EventStore a -> EventStore b -> EventStore b
$c*> :: forall a b. EventStore a -> EventStore b -> EventStore b
liftA2 :: (a -> b -> c) -> EventStore a -> EventStore b -> EventStore c
$cliftA2 :: forall a b c.
(a -> b -> c) -> EventStore a -> EventStore b -> EventStore c
<*> :: EventStore (a -> b) -> EventStore a -> EventStore b
$c<*> :: forall a b. EventStore (a -> b) -> EventStore a -> EventStore b
pure :: a -> EventStore a
$cpure :: forall a. a -> EventStore a
$cp1Applicative :: Functor EventStore
Applicative
           , Applicative EventStore
a -> EventStore a
Applicative EventStore
-> (forall a b.
    EventStore a -> (a -> EventStore b) -> EventStore b)
-> (forall a b. EventStore a -> EventStore b -> EventStore b)
-> (forall a. a -> EventStore a)
-> Monad EventStore
EventStore a -> (a -> EventStore b) -> EventStore b
EventStore a -> EventStore b -> EventStore b
forall a. a -> EventStore a
forall a b. EventStore a -> EventStore b -> EventStore b
forall a b. EventStore a -> (a -> EventStore b) -> EventStore b
forall (m :: * -> *).
Applicative m
-> (forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: a -> EventStore a
$creturn :: forall a. a -> EventStore a
>> :: EventStore a -> EventStore b -> EventStore b
$c>> :: forall a b. EventStore a -> EventStore b -> EventStore b
>>= :: EventStore a -> (a -> EventStore b) -> EventStore b
$c>>= :: forall a b. EventStore a -> (a -> EventStore b) -> EventStore b
$cp1Monad :: Applicative EventStore
Monad
#if __GLASGOW_HASKELL__ > 710
           , Monad EventStore
Monad EventStore
-> (forall a. String -> EventStore a) -> MonadFail EventStore
String -> EventStore a
forall a. String -> EventStore a
forall (m :: * -> *).
Monad m -> (forall a. String -> m a) -> MonadFail m
fail :: String -> EventStore a
$cfail :: forall a. String -> EventStore a
$cp1MonadFail :: Monad EventStore
MonadFail
#endif
           , Monad EventStore
e -> EventStore a
Monad EventStore
-> (forall e a. Exception e => e -> EventStore a)
-> MonadThrow EventStore
forall e a. Exception e => e -> EventStore a
forall (m :: * -> *).
Monad m -> (forall e a. Exception e => e -> m a) -> MonadThrow m
throwM :: e -> EventStore a
$cthrowM :: forall e a. Exception e => e -> EventStore a
$cp1MonadThrow :: Monad EventStore
MonadThrow
           , MonadThrow EventStore
MonadThrow EventStore
-> (forall e a.
    Exception e =>
    EventStore a -> (e -> EventStore a) -> EventStore a)
-> MonadCatch EventStore
EventStore a -> (e -> EventStore a) -> EventStore a
forall e a.
Exception e =>
EventStore a -> (e -> EventStore a) -> EventStore a
forall (m :: * -> *).
MonadThrow m
-> (forall e a. Exception e => m a -> (e -> m a) -> m a)
-> MonadCatch m
catch :: EventStore a -> (e -> EventStore a) -> EventStore a
$ccatch :: forall e a.
Exception e =>
EventStore a -> (e -> EventStore a) -> EventStore a
$cp1MonadCatch :: MonadThrow EventStore
MonadCatch
           , Monad EventStore
Monad EventStore
-> (forall a. IO a -> EventStore a) -> MonadIO EventStore
IO a -> EventStore a
forall a. IO a -> EventStore a
forall (m :: * -> *).
Monad m -> (forall a. IO a -> m a) -> MonadIO m
liftIO :: IO a -> EventStore a
$cliftIO :: forall a. IO a -> EventStore a
$cp1MonadIO :: Monad EventStore
MonadIO
           , Monad EventStore
Monad EventStore
-> (forall a. (a -> EventStore a) -> EventStore a)
-> MonadFix EventStore
(a -> EventStore a) -> EventStore a
forall a. (a -> EventStore a) -> EventStore a
forall (m :: * -> *).
Monad m -> (forall a. (a -> m a) -> m a) -> MonadFix m
mfix :: (a -> EventStore a) -> EventStore a
$cmfix :: forall a. (a -> EventStore a) -> EventStore a
$cp1MonadFix :: Monad EventStore
MonadFix
           )

--------------------------------------------------------------------------------
getEnv :: EventStore Env
getEnv :: EventStore Env
getEnv = ReaderT Env IO Env -> EventStore Env
forall a. ReaderT Env IO a -> EventStore a
EventStore ReaderT Env IO Env
forall r (m :: * -> *). MonadReader r m => m r
ask

--------------------------------------------------------------------------------
getSettings :: EventStore Settings
getSettings :: EventStore Settings
getSettings = Env -> Settings
__settings (Env -> Settings) -> EventStore Env -> EventStore Settings
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventStore Env
getEnv

--------------------------------------------------------------------------------
freshUUID :: MonadIO m => m UUID
freshUUID :: m UUID
freshUUID = IO UUID -> m UUID
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UUID
nextRandom

--------------------------------------------------------------------------------
publisher :: EventStore Publish
publisher :: EventStore Publish
publisher = (Env -> Publish) -> EventStore Env -> EventStore Publish
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Bus -> Publish
forall p. Pub p => p -> Publish
asPub (Bus -> Publish) -> (Env -> Bus) -> Env -> Publish
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> Bus
__bus) EventStore Env
getEnv

--------------------------------------------------------------------------------
stopBus :: EventStore ()
stopBus :: EventStore ()
stopBus = Bus -> EventStore ()
forall (m :: * -> *). MonadIO m => Bus -> m ()
busStop (Bus -> EventStore ()) -> (Env -> Bus) -> Env -> EventStore ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> Bus
__bus (Env -> EventStore ()) -> EventStore Env -> EventStore ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< EventStore Env
getEnv

--------------------------------------------------------------------------------
instance MonadBase IO EventStore where
  liftBase :: IO α -> EventStore α
liftBase IO α
m = ReaderT Env IO α -> EventStore α
forall a. ReaderT Env IO a -> EventStore a
EventStore (ReaderT Env IO α -> EventStore α)
-> ReaderT Env IO α -> EventStore α
forall a b. (a -> b) -> a -> b
$ IO α -> ReaderT Env IO α
forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase IO α
m

--------------------------------------------------------------------------------
instance MonadBaseControl IO EventStore where
    type StM EventStore a = a
    liftBaseWith :: (RunInBase EventStore IO -> IO a) -> EventStore a
liftBaseWith RunInBase EventStore IO -> IO a
run = ReaderT Env IO a -> EventStore a
forall a. ReaderT Env IO a -> EventStore a
EventStore (ReaderT Env IO a -> EventStore a)
-> ReaderT Env IO a -> EventStore a
forall a b. (a -> b) -> a -> b
$ do
      Env
env <- ReaderT Env IO Env
forall r (m :: * -> *). MonadReader r m => m r
ask
      a
s   <- IO a -> ReaderT Env IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> ReaderT Env IO a) -> IO a -> ReaderT Env IO a
forall a b. (a -> b) -> a -> b
$ RunInBase EventStore IO -> IO a
run (\EventStore a
m -> ReaderT Env IO a -> Env -> IO a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (EventStore a -> ReaderT Env IO a
forall a. EventStore a -> ReaderT Env IO a
unEventStore EventStore a
m) Env
env)
      StM (ReaderT Env IO) a -> ReaderT Env IO a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM a
StM (ReaderT Env IO) a
s
    restoreM :: StM EventStore a -> EventStore a
restoreM = StM EventStore a -> EventStore a
forall (m :: * -> *) a. Monad m => a -> m a
return

--------------------------------------------------------------------------------
instance MonadLogger EventStore where
  monadLoggerLog :: Loc -> LogSource -> LogLevel -> msg -> EventStore ()
monadLoggerLog Loc
loc LogSource
src LogLevel
lvl msg
msg  = do
    LoggerRef
loggerRef <- Env -> LoggerRef
__logRef (Env -> LoggerRef) -> EventStore Env -> EventStore LoggerRef
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventStore Env
getEnv
    IO () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> EventStore ()) -> IO () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ LoggerRef -> Loc -> LogSource -> LogLevel -> LogStr -> IO ()
loggerCallback LoggerRef
loggerRef Loc
loc LogSource
src LogLevel
lvl (msg -> LogStr
forall msg. ToLogStr msg => msg -> LogStr
toLogStr msg
msg)

--------------------------------------------------------------------------------
instance MonadLoggerIO EventStore where
  askLoggerIO :: EventStore (Loc -> LogSource -> LogLevel -> LogStr -> IO ())
askLoggerIO = do
    LoggerRef
loggerRef <- Env -> LoggerRef
__logRef (Env -> LoggerRef) -> EventStore Env -> EventStore LoggerRef
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventStore Env
getEnv
    (Loc -> LogSource -> LogLevel -> LogStr -> IO ())
-> EventStore (Loc -> LogSource -> LogLevel -> LogStr -> IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (LoggerRef -> Loc -> LogSource -> LogLevel -> LogStr -> IO ()
loggerCallback LoggerRef
loggerRef)

--------------------------------------------------------------------------------
runEventStore :: LoggerRef -> Settings -> Bus -> EventStore a -> IO a
runEventStore :: LoggerRef -> Settings -> Bus -> EventStore a -> IO a
runEventStore LoggerRef
ref Settings
setts Bus
bus (EventStore ReaderT Env IO a
action) =
  ReaderT Env IO a -> Env -> IO a
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ReaderT Env IO a
action (LoggerRef -> Settings -> Bus -> MonitoringBackend -> Env
Env LoggerRef
ref Settings
setts Bus
bus (Bus -> MonitoringBackend
_monitoring Bus
bus))

--------------------------------------------------------------------------------
-- Messaging
--------------------------------------------------------------------------------
data Message where
  Message :: Typeable a => a -> Message
  deriving Typeable

--------------------------------------------------------------------------------
instance Show Message where
  show :: Message -> String
show (Message a
a) = String
"Message: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> TypeRep -> String
forall a. Show a => a -> String
show (a -> TypeRep
forall a. Typeable a => a -> TypeRep
typeOf a
a)

--------------------------------------------------------------------------------
toMsg :: Typeable a => a -> Message
toMsg :: a -> Message
toMsg = a -> Message
forall a. Typeable a => a -> Message
Message

--------------------------------------------------------------------------------
fromMsg :: Typeable a => Message -> Maybe a
fromMsg :: Message -> Maybe a
fromMsg (Message a
a) = a -> Maybe a
forall a b. (Typeable a, Typeable b) => a -> Maybe b
cast a
a

--------------------------------------------------------------------------------
class Pub p where
  publishSTM :: Typeable a => p -> a -> STM Bool

--------------------------------------------------------------------------------
publish :: Typeable a => a -> EventStore ()
publish :: a -> EventStore ()
publish a
a = do
  Bus
bus <- Env -> Bus
__bus (Env -> Bus) -> EventStore Env -> EventStore Bus
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventStore Env
getEnv
  Bus -> a -> EventStore ()
forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith Bus
bus a
a

--------------------------------------------------------------------------------
publishWith :: (Pub p, Typeable a, MonadIO m) => p -> a -> m ()
publishWith :: p -> a -> m ()
publishWith p
p a
a = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  Bool
_ <- p -> a -> STM Bool
forall p a. (Pub p, Typeable a) => p -> a -> STM Bool
publishSTM p
p a
a
  () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

--------------------------------------------------------------------------------
class Sub s where
  subscribeEventHandler :: s -> EventHandler -> IO ()

--------------------------------------------------------------------------------
subscribe :: (Sub s, Typeable a)
          => s
          -> (a -> EventStore ())
          -> IO ()
subscribe :: s -> (a -> EventStore ()) -> IO ()
subscribe s
s a -> EventStore ()
k = s -> EventHandler -> IO ()
forall s. Sub s => s -> EventHandler -> IO ()
subscribeEventHandler s
s (Proxy a -> (a -> EventStore ()) -> EventHandler
forall a.
Typeable a =>
Proxy a -> (a -> EventStore ()) -> EventHandler
EventHandler Proxy a
forall k (t :: k). Proxy t
Proxy a -> EventStore ()
k)

--------------------------------------------------------------------------------
data Publish = forall p. Pub p => Publish p

--------------------------------------------------------------------------------
instance Pub Publish where
  publishSTM :: Publish -> a -> STM Bool
publishSTM (Publish p
p) a
a = p -> a -> STM Bool
forall p a. (Pub p, Typeable a) => p -> a -> STM Bool
publishSTM p
p a
a

--------------------------------------------------------------------------------
data Subscribe = forall p. Sub p => Subscribe p

--------------------------------------------------------------------------------
instance Sub Subscribe where
  subscribeEventHandler :: Subscribe -> EventHandler -> IO ()
subscribeEventHandler (Subscribe p
p) EventHandler
a = p -> EventHandler -> IO ()
forall s. Sub s => s -> EventHandler -> IO ()
subscribeEventHandler p
p EventHandler
a

--------------------------------------------------------------------------------
data Hub = forall h. (Sub h, Pub h) => Hub h

--------------------------------------------------------------------------------
instance Sub Hub where
  subscribeEventHandler :: Hub -> EventHandler -> IO ()
subscribeEventHandler (Hub h
h) = h -> EventHandler -> IO ()
forall s. Sub s => s -> EventHandler -> IO ()
subscribeEventHandler h
h

--------------------------------------------------------------------------------
instance Pub Hub where
  publishSTM :: Hub -> a -> STM Bool
publishSTM (Hub h
h) = h -> a -> STM Bool
forall p a. (Pub p, Typeable a) => p -> a -> STM Bool
publishSTM h
h

--------------------------------------------------------------------------------
asSub :: Sub s => s -> Subscribe
asSub :: s -> Subscribe
asSub = s -> Subscribe
forall p. Sub p => p -> Subscribe
Subscribe

--------------------------------------------------------------------------------
asPub :: Pub p => p -> Publish
asPub :: p -> Publish
asPub = p -> Publish
forall p. Pub p => p -> Publish
Publish

--------------------------------------------------------------------------------
asHub :: (Sub h, Pub h) => h -> Hub
asHub :: h -> Hub
asHub = h -> Hub
forall h. (Sub h, Pub h) => h -> Hub
Hub

--------------------------------------------------------------------------------
data Type = Type TypeRep Fingerprint

--------------------------------------------------------------------------------
instance Show Type where
  show :: Type -> String
show (Type TypeRep
rep Fingerprint
_) = String
"type " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> TypeRep -> String
forall a. Show a => a -> String
show TypeRep
rep

--------------------------------------------------------------------------------
instance Eq Type where
  Type TypeRep
_ Fingerprint
a == :: Type -> Type -> Bool
== Type TypeRep
_ Fingerprint
b = Fingerprint
a Fingerprint -> Fingerprint -> Bool
forall a. Eq a => a -> a -> Bool
== Fingerprint
b

--------------------------------------------------------------------------------
instance Ord Type where
  compare :: Type -> Type -> Ordering
compare (Type TypeRep
_ Fingerprint
a) (Type TypeRep
_ Fingerprint
b) = Fingerprint -> Fingerprint -> Ordering
forall a. Ord a => a -> a -> Ordering
compare Fingerprint
a Fingerprint
b

--------------------------------------------------------------------------------
instance Hashable Type where
  hashWithSalt :: Int -> Type -> Int
hashWithSalt Int
s (Type TypeRep
_ (Fingerprint Word64
b Word64
l)) = Int -> (Word64, Word64) -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
s (Word64
b, Word64
l)

--------------------------------------------------------------------------------
data GetType
  = forall a. Typeable a => FromTypeable a
  | forall prx a. Typeable a => FromProxy (prx a)

--------------------------------------------------------------------------------
getFingerprint :: TypeRep -> Fingerprint
#if __GLASGOW_HASKELL__ == 708
getFingerprint (TypeRep fp _ _) = fp
#else
getFingerprint :: TypeRep -> Fingerprint
getFingerprint = TypeRep -> Fingerprint
typeRepFingerprint
#endif

--------------------------------------------------------------------------------
getType :: GetType -> Type
getType :: GetType -> Type
getType GetType
op = TypeRep -> Fingerprint -> Type
Type TypeRep
t (TypeRep -> Fingerprint
getFingerprint TypeRep
t)
  where
    t :: TypeRep
t = case GetType
op of
          FromTypeable a
a -> a -> TypeRep
forall a. Typeable a => a -> TypeRep
typeOf a
a
          FromProxy prx a
prx  -> prx a -> TypeRep
forall k (proxy :: k -> *) (a :: k).
Typeable a =>
proxy a -> TypeRep
typeRep prx a
prx

--------------------------------------------------------------------------------
type EventHandlers = HashMap Type (Seq EventHandler)

--------------------------------------------------------------------------------
propagate :: Typeable a => a -> Seq EventHandler -> EventStore ()
propagate :: a -> Seq EventHandler -> EventStore ()
propagate a
a = (EventHandler -> EventStore ())
-> Seq EventHandler -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ ((EventHandler -> EventStore ())
 -> Seq EventHandler -> EventStore ())
-> (EventHandler -> EventStore ())
-> Seq EventHandler
-> EventStore ()
forall a b. (a -> b) -> a -> b
$ \(EventHandler Proxy a
_ a -> EventStore ()
k) -> do
  let Just a
b = a -> Maybe a
forall a b. (Typeable a, Typeable b) => a -> Maybe b
cast a
a
      tpe :: TypeRep
tpe    = a -> TypeRep
forall a. Typeable a => a -> TypeRep
typeOf a
b
  Either SomeException ()
outcome <- EventStore () -> EventStore (Either SomeException ())
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny (EventStore () -> EventStore (Either SomeException ()))
-> EventStore () -> EventStore (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ a -> EventStore ()
k a
b
  case Either SomeException ()
outcome of
    Right ()
_ -> () -> EventStore ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Left SomeException
e  -> $(Int
String
LogLevel
String -> LogSource
String -> String -> String -> CharPos -> CharPos -> Loc
LogSource -> LogSource
Loc -> LogSource -> LogLevel -> LogSource -> EventStore ()
(LogSource -> EventStore ())
-> (LogSource -> LogSource) -> LogSource -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> LogSource -> LogLevel -> msg -> m ()
pack :: String -> LogSource
id :: forall a. a -> a
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> LogSource -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logError) [i|Exception when propagating #{tpe}: #{e}.|]

--------------------------------------------------------------------------------
data EventHandler where
  EventHandler :: Typeable a
               => Proxy a
               -> (a -> EventStore ())
               -> EventHandler

--------------------------------------------------------------------------------
instance Show EventHandler where
  show :: EventHandler -> String
show (EventHandler Proxy a
prx a -> EventStore ()
_) = String
"Handle " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> TypeRep -> String
forall a. Show a => a -> String
show (Proxy a -> TypeRep
forall k (proxy :: k -> *) (a :: k).
Typeable a =>
proxy a -> TypeRep
typeRep Proxy a
prx)

--------------------------------------------------------------------------------
data Bus =
  Bus { Bus -> LoggerRef
_busLoggerRef      :: LoggerRef
      , Bus -> Settings
_busSettings       :: Settings
      , Bus -> IORef EventHandlers
_busEventHandlers  :: IORef EventHandlers
      , Bus -> TBMQueue Message
_busQueue          :: TBMQueue Message
      , Bus -> Async ()
_workerAsync       :: Async ()
      , Bus -> MonitoringBackend
_monitoring        :: MonitoringBackend
      }

--------------------------------------------------------------------------------
busStop :: MonadIO m => Bus -> m ()
busStop :: Bus -> m ()
busStop Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ TBMQueue Message -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue Message
_busQueue

--------------------------------------------------------------------------------
busProcessedEverything :: Bus -> IO ()
busProcessedEverything :: Bus -> IO ()
busProcessedEverything Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} = Async (StM IO ()) -> IO ()
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait Async ()
Async (StM IO ())
_workerAsync

--------------------------------------------------------------------------------
messageType :: Type
messageType :: Type
messageType = GetType -> Type
getType (Proxy Message -> GetType
forall (prx :: * -> *) a. Typeable a => prx a -> GetType
FromProxy (Proxy Message
forall k (t :: k). Proxy t
Proxy :: Proxy Message))

--------------------------------------------------------------------------------
newBus :: LoggerRef -> Settings -> IO Bus
newBus :: LoggerRef -> Settings -> IO Bus
newBus LoggerRef
ref Settings
setts = do
  Bus
bus <- (Bus -> IO Bus) -> IO Bus
forall (m :: * -> *) a. MonadFix m => (a -> m a) -> m a
mfix ((Bus -> IO Bus) -> IO Bus) -> (Bus -> IO Bus) -> IO Bus
forall a b. (a -> b) -> a -> b
$ \Bus
b -> do
    LoggerRef
-> Settings
-> IORef EventHandlers
-> TBMQueue Message
-> Async ()
-> MonitoringBackend
-> Bus
Bus LoggerRef
ref Settings
setts (IORef EventHandlers
 -> TBMQueue Message -> Async () -> MonitoringBackend -> Bus)
-> IO (IORef EventHandlers)
-> IO (TBMQueue Message -> Async () -> MonitoringBackend -> Bus)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> EventHandlers -> IO (IORef EventHandlers)
forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef EventHandlers
forall a. Monoid a => a
mempty
                  IO (TBMQueue Message -> Async () -> MonitoringBackend -> Bus)
-> IO (TBMQueue Message)
-> IO (Async () -> MonitoringBackend -> Bus)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (TBMQueue Message)
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
500
                  IO (Async () -> MonitoringBackend -> Bus)
-> IO (Async ()) -> IO (MonitoringBackend -> Bus)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO () -> IO (Async (StM IO ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async (Bus -> IO ()
worker Bus
b)
                  IO (MonitoringBackend -> Bus) -> IO MonitoringBackend -> IO Bus
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> MonitoringBackend -> IO MonitoringBackend
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Settings -> MonitoringBackend
s_monitoring Settings
setts)

  Bus -> IO Bus
forall (m :: * -> *) a. Monad m => a -> m a
return Bus
bus

--------------------------------------------------------------------------------
worker :: Bus -> IO ()
worker :: Bus -> IO ()
worker self :: Bus
self@Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} = IO ()
loop
  where
    handleMsg :: Message -> IO ()
handleMsg (Message a
a) = do
      EventHandlers
callbacks <- IORef EventHandlers -> IO EventHandlers
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef EventHandlers
_busEventHandlers
      Bus -> EventHandlers -> a -> IO ()
forall a. Typeable a => Bus -> EventHandlers -> a -> IO ()
publishing Bus
self EventHandlers
callbacks a
a
      IO ()
loop

    loop :: IO ()
loop = (Message -> IO ()) -> Maybe Message -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Message -> IO ()
handleMsg (Maybe Message -> IO ()) -> IO (Maybe Message) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STM (Maybe Message) -> IO (Maybe Message)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBMQueue Message -> STM (Maybe Message)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue Message
_busQueue)

--------------------------------------------------------------------------------
instance Sub Bus where
  subscribeEventHandler :: Bus -> EventHandler -> IO ()
subscribeEventHandler Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} hdl :: EventHandler
hdl@(EventHandler Proxy a
prx a -> EventStore ()
_) =
    IORef EventHandlers
-> (EventHandlers -> (EventHandlers, ())) -> IO ()
forall (m :: * -> *) a b.
MonadBase IO m =>
IORef a -> (a -> (a, b)) -> m b
atomicModifyIORef' IORef EventHandlers
_busEventHandlers EventHandlers -> (EventHandlers, ())
update
    where
      update :: EventHandlers -> (EventHandlers, ())
      update :: EventHandlers -> (EventHandlers, ())
update EventHandlers
callbacks =
        let tpe :: Type
tpe  = GetType -> Type
getType (Proxy a -> GetType
forall (prx :: * -> *) a. Typeable a => prx a -> GetType
FromProxy Proxy a
prx)
            next :: ContainerKey EventHandlers -> EventHandlers -> EventHandlers
next = (Maybe (MapValue EventHandlers) -> Maybe (MapValue EventHandlers))
-> ContainerKey EventHandlers -> EventHandlers -> EventHandlers
forall map.
IsMap map =>
(Maybe (MapValue map) -> Maybe (MapValue map))
-> ContainerKey map -> map -> map
alterMap ((Maybe (MapValue EventHandlers) -> Maybe (MapValue EventHandlers))
 -> ContainerKey EventHandlers -> EventHandlers -> EventHandlers)
-> (Maybe (MapValue EventHandlers)
    -> Maybe (MapValue EventHandlers))
-> ContainerKey EventHandlers
-> EventHandlers
-> EventHandlers
forall a b. (a -> b) -> a -> b
$ \Maybe (MapValue EventHandlers)
input ->
              case Maybe (MapValue EventHandlers)
input of
                Maybe (MapValue EventHandlers)
Nothing -> Seq EventHandler -> Maybe (Seq EventHandler)
forall a. a -> Maybe a
Just (Element (Seq EventHandler) -> Seq EventHandler
forall seq. MonoPointed seq => Element seq -> seq
singleton Element (Seq EventHandler)
EventHandler
hdl)
                Just MapValue EventHandlers
hs -> Seq EventHandler -> Maybe (Seq EventHandler)
forall a. a -> Maybe a
Just (Seq EventHandler -> Element (Seq EventHandler) -> Seq EventHandler
forall seq. SemiSequence seq => seq -> Element seq -> seq
snoc Seq EventHandler
MapValue EventHandlers
hs Element (Seq EventHandler)
EventHandler
hdl) in
        (ContainerKey EventHandlers -> EventHandlers -> EventHandlers
next ContainerKey EventHandlers
Type
tpe EventHandlers
callbacks, ())

--------------------------------------------------------------------------------
instance Pub Bus where
  publishSTM :: Bus -> a -> STM Bool
publishSTM Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} a
a = do
    Bool
closed <- TBMQueue Message -> STM Bool
forall a. TBMQueue a -> STM Bool
isClosedTBMQueue TBMQueue Message
_busQueue
    TBMQueue Message -> Message -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue Message
_busQueue (a -> Message
forall a. Typeable a => a -> Message
toMsg a
a)
    Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> STM Bool) -> Bool -> STM Bool
forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not Bool
closed

--------------------------------------------------------------------------------
publishing :: Typeable a => Bus -> EventHandlers -> a -> IO ()
publishing :: Bus -> EventHandlers -> a -> IO ()
publishing self :: Bus
self@Bus{Async ()
IORef EventHandlers
TBMQueue Message
LoggerRef
MonitoringBackend
Settings
_monitoring :: MonitoringBackend
_workerAsync :: Async ()
_busQueue :: TBMQueue Message
_busEventHandlers :: IORef EventHandlers
_busSettings :: Settings
_busLoggerRef :: LoggerRef
_workerAsync :: Bus -> Async ()
_busQueue :: Bus -> TBMQueue Message
_busEventHandlers :: Bus -> IORef EventHandlers
_busSettings :: Bus -> Settings
_busLoggerRef :: Bus -> LoggerRef
_monitoring :: Bus -> MonitoringBackend
..} EventHandlers
callbacks a
a = do
  let tpe :: Type
tpe = GetType -> Type
getType (a -> GetType
forall a. Typeable a => a -> GetType
FromTypeable a
a)
  LoggerRef -> Settings -> Bus -> EventStore () -> IO ()
forall a. LoggerRef -> Settings -> Bus -> EventStore a -> IO a
runEventStore LoggerRef
_busLoggerRef Settings
_busSettings Bus
self (EventStore () -> IO ()) -> EventStore () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    $(Int
String
LogLevel
String -> LogSource
String -> String -> String -> CharPos -> CharPos -> Loc
LogSource -> LogSource
Loc -> LogSource -> LogLevel -> LogSource -> EventStore ()
(LogSource -> EventStore ())
-> (LogSource -> LogSource) -> LogSource -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> LogSource -> LogLevel -> msg -> m ()
pack :: String -> LogSource
id :: forall a. a -> a
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> LogSource -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug) [i|Publishing message #{tpe}.|]
    (Seq EventHandler -> EventStore ())
-> Maybe (Seq EventHandler) -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (a -> Seq EventHandler -> EventStore ()
forall a. Typeable a => a -> Seq EventHandler -> EventStore ()
propagate a
a) (ContainerKey EventHandlers
-> EventHandlers -> Maybe (MapValue EventHandlers)
forall map.
IsMap map =>
ContainerKey map -> map -> Maybe (MapValue map)
lookup ContainerKey EventHandlers
Type
tpe EventHandlers
callbacks)
    $(Int
String
LogLevel
String -> LogSource
String -> String -> String -> CharPos -> CharPos -> Loc
LogSource -> LogSource
Loc -> LogSource -> LogLevel -> LogSource -> EventStore ()
(LogSource -> EventStore ())
-> (LogSource -> LogSource) -> LogSource -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> LogSource -> LogLevel -> msg -> m ()
pack :: String -> LogSource
id :: forall a. a -> a
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> LogSource -> LogLevel -> msg -> m ()
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug) [i|Message #{tpe} propagated.|]

    Bool -> EventStore () -> EventStore ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Type
tpe Type -> Type -> Bool
forall a. Eq a => a -> a -> Bool
== Type
messageType) (EventStore () -> EventStore ()) -> EventStore () -> EventStore ()
forall a b. (a -> b) -> a -> b
$
      (Seq EventHandler -> EventStore ())
-> Maybe (Seq EventHandler) -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Message -> Seq EventHandler -> EventStore ()
forall a. Typeable a => a -> Seq EventHandler -> EventStore ()
propagate (a -> Message
forall a. Typeable a => a -> Message
toMsg a
a)) (ContainerKey EventHandlers
-> EventHandlers -> Maybe (MapValue EventHandlers)
forall map.
IsMap map =>
ContainerKey map -> map -> Maybe (MapValue map)
lookup ContainerKey EventHandlers
Type
messageType EventHandlers
callbacks)

--------------------------------------------------------------------------------
-- Monitoring
--------------------------------------------------------------------------------

--------------------------------------------------------------------------------
monitorIncrPkgCount :: EventStore ()
monitorIncrPkgCount :: EventStore ()
monitorIncrPkgCount = do
  Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
  IO () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> EventStore ()) -> IO () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> IO ()
monitoringBackendIncrPkgCount MonitoringBackend
__monitor

--------------------------------------------------------------------------------
monitorIncrConnectionDrop :: EventStore ()
monitorIncrConnectionDrop :: EventStore ()
monitorIncrConnectionDrop = do
  Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
  IO () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> EventStore ()) -> IO () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> IO ()
monitoringBackendIncrConnectionDrop MonitoringBackend
__monitor

--------------------------------------------------------------------------------
monitorAddDataTransmitted :: Int -> EventStore ()
monitorAddDataTransmitted :: Int -> EventStore ()
monitorAddDataTransmitted Int
siz = do
  Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
  IO () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> EventStore ()) -> IO () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> Int -> IO ()
monitoringBackendAddDataTransmitted MonitoringBackend
__monitor Int
siz

--------------------------------------------------------------------------------
monitorIncrForceReconnect :: EventStore ()
monitorIncrForceReconnect :: EventStore ()
monitorIncrForceReconnect = do
  Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
  IO () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> EventStore ()) -> IO () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> IO ()
monitoringBackendIncrForceReconnect MonitoringBackend
__monitor

--------------------------------------------------------------------------------
monitorIncrHeartbeatTimeouts :: EventStore ()
monitorIncrHeartbeatTimeouts :: EventStore ()
monitorIncrHeartbeatTimeouts = do
  Env{LoggerRef
MonitoringBackend
Settings
Bus
__monitor :: MonitoringBackend
__bus :: Bus
__settings :: Settings
__logRef :: LoggerRef
__monitor :: Env -> MonitoringBackend
__bus :: Env -> Bus
__settings :: Env -> Settings
__logRef :: Env -> LoggerRef
..} <- EventStore Env
getEnv
  IO () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> EventStore ()) -> IO () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ MonitoringBackend -> IO ()
monitoringBackendIncrHeartbeatTimeouts MonitoringBackend
__monitor