{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE OverloadedStrings  #-}
{-# LANGUAGE LambdaCase         #-}
{-# LANGUAGE RecordWildCards    #-}
{-# LANGUAGE ViewPatterns       #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.ConnectionManager
-- Copyright : (C) 2017 Yorick Laupa
-- License : (see the file LICENSE)
--
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
--
--------------------------------------------------------------------------------
module Database.EventStore.Internal.ConnectionManager
  ( connectionManager ) where

--------------------------------------------------------------------------------
import Data.Typeable

--------------------------------------------------------------------------------
import Control.Monad.Reader
import Data.Time

--------------------------------------------------------------------------------
import           Database.EventStore.Internal.Command
import           Database.EventStore.Internal.Communication
import           Database.EventStore.Internal.Connection
import           Database.EventStore.Internal.Control
import           Database.EventStore.Internal.Discovery
import           Database.EventStore.Internal.EndPoint
import           Database.EventStore.Internal.Logger
import           Database.EventStore.Internal.Operation
import           Database.EventStore.Internal.Operation.Authenticate (newAuthenticatePkg)
import           Database.EventStore.Internal.Operation.Identify (newIdentifyPkg)
import qualified Database.EventStore.Internal.Manager.Operation.Registry as Operation
import           Database.EventStore.Internal.Prelude
import           Database.EventStore.Internal.Stopwatch
import           Database.EventStore.Internal.Types

--------------------------------------------------------------------------------
data Stage
  = Init
  | Connecting Attempts ConnectingState
  | Connected Connection
  | Closed

--------------------------------------------------------------------------------
instance Show Stage where
  show :: Stage -> String
show Stage
Init             = String
"Init"
  show (Connecting Attempts
a ConnectingState
s) = String
"Connecting: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (Attempts, ConnectingState) -> String
forall a. Show a => a -> String
show (Attempts
a, ConnectingState
s)
  show (Connected Connection
c)    = String
"Connected on" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Connection -> String
forall a. Show a => a -> String
show Connection
c
  show Stage
Closed           = String
"Closed"

--------------------------------------------------------------------------------
data ConnectingState
  = Reconnecting
  | EndpointDiscovery
  | ConnectionEstablishing Connection
  | Authentication UUID NominalDiffTime Connection
  | Identification UUID NominalDiffTime Connection
  deriving Int -> ConnectingState -> ShowS
[ConnectingState] -> ShowS
ConnectingState -> String
(Int -> ConnectingState -> ShowS)
-> (ConnectingState -> String)
-> ([ConnectingState] -> ShowS)
-> Show ConnectingState
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConnectingState] -> ShowS
$cshowList :: [ConnectingState] -> ShowS
show :: ConnectingState -> String
$cshow :: ConnectingState -> String
showsPrec :: Int -> ConnectingState -> ShowS
$cshowsPrec :: Int -> ConnectingState -> ShowS
Show

--------------------------------------------------------------------------------
atLeastEstablishingState :: Stage -> Bool
atLeastEstablishingState :: Stage -> Bool
atLeastEstablishingState = \case
  Stage
Init -> Bool
False
  Connected{} -> Bool
True
  Stage
Closed -> Bool
True
  Connecting Attempts
_ ConnectingState
s ->
    case ConnectingState
s of
      ConnectingState
Reconnecting -> Bool
False
      ConnectingState
EndpointDiscovery -> Bool
False
      ConnectingState
_ -> Bool
True

--------------------------------------------------------------------------------
data Attempts =
  Attempts { Attempts -> Int
attemptCount     :: !Int
           , Attempts -> NominalDiffTime
attemptLastStart :: !NominalDiffTime
           } deriving Int -> Attempts -> ShowS
[Attempts] -> ShowS
Attempts -> String
(Int -> Attempts -> ShowS)
-> (Attempts -> String) -> ([Attempts] -> ShowS) -> Show Attempts
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Attempts] -> ShowS
$cshowList :: [Attempts] -> ShowS
show :: Attempts -> String
$cshow :: Attempts -> String
showsPrec :: Int -> Attempts -> ShowS
$cshowsPrec :: Int -> Attempts -> ShowS
Show

--------------------------------------------------------------------------------
freshAttempt :: Stopwatch -> EventStore Attempts
freshAttempt :: Stopwatch -> EventStore Attempts
freshAttempt = (NominalDiffTime -> Attempts)
-> EventStore NominalDiffTime -> EventStore Attempts
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int -> NominalDiffTime -> Attempts
Attempts Int
1) (EventStore NominalDiffTime -> EventStore Attempts)
-> (Stopwatch -> EventStore NominalDiffTime)
-> Stopwatch
-> EventStore Attempts
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stopwatch -> EventStore NominalDiffTime
forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed

--------------------------------------------------------------------------------
data ConnectionMaxAttemptReached = ConnectionMaxAttemptReached
  deriving Typeable

--------------------------------------------------------------------------------
instance Show ConnectionMaxAttemptReached where
  show :: ConnectionMaxAttemptReached -> String
show ConnectionMaxAttemptReached
_ = String
"Reconnection limit reached."

--------------------------------------------------------------------------------
instance Exception ConnectionMaxAttemptReached

--------------------------------------------------------------------------------
data IdentificationTimeout = IdentificationTimeout deriving Typeable

--------------------------------------------------------------------------------
instance Show IdentificationTimeout where
  show :: IdentificationTimeout -> String
show IdentificationTimeout
_ = String
"Timed out waiting for client to be identified."

--------------------------------------------------------------------------------
instance Exception IdentificationTimeout

--------------------------------------------------------------------------------
data EstablishConnection = EstablishConnection EndPoint deriving Typeable

--------------------------------------------------------------------------------
newtype CloseConnection = CloseConnection SomeException
  deriving (Int -> CloseConnection -> ShowS
[CloseConnection] -> ShowS
CloseConnection -> String
(Int -> CloseConnection -> ShowS)
-> (CloseConnection -> String)
-> ([CloseConnection] -> ShowS)
-> Show CloseConnection
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [CloseConnection] -> ShowS
$cshowList :: [CloseConnection] -> ShowS
show :: CloseConnection -> String
$cshow :: CloseConnection -> String
showsPrec :: Int -> CloseConnection -> ShowS
$cshowsPrec :: Int -> CloseConnection -> ShowS
Show, Typeable)

--------------------------------------------------------------------------------
instance Exception CloseConnection

--------------------------------------------------------------------------------
data Tick = Tick deriving Typeable

--------------------------------------------------------------------------------
timerPeriod :: Duration
timerPeriod :: Duration
timerPeriod = Int64 -> Duration
msDuration Int64
200

--------------------------------------------------------------------------------
data HeartbeatStage = Interval | Timeout

--------------------------------------------------------------------------------
data HeartbeatTracker =
  HeartbeatTracker { HeartbeatTracker -> Integer
_pkgNum         :: !Integer
                   , HeartbeatTracker -> HeartbeatStage
_heartbeatStage :: !HeartbeatStage
                   , HeartbeatTracker -> NominalDiffTime
_startedSince   :: !NominalDiffTime
                   }

--------------------------------------------------------------------------------
newHeartbeatTracker :: MonadBaseControl IO m
                    => Stopwatch
                    -> m (IORef HeartbeatTracker)
newHeartbeatTracker :: Stopwatch -> m (IORef HeartbeatTracker)
newHeartbeatTracker =
  HeartbeatTracker -> m (IORef HeartbeatTracker)
forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef (HeartbeatTracker -> m (IORef HeartbeatTracker))
-> (NominalDiffTime -> HeartbeatTracker)
-> NominalDiffTime
-> m (IORef HeartbeatTracker)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Integer -> HeartbeatStage -> NominalDiffTime -> HeartbeatTracker
HeartbeatTracker Integer
0 HeartbeatStage
Interval (NominalDiffTime -> m (IORef HeartbeatTracker))
-> (Stopwatch -> m NominalDiffTime)
-> Stopwatch
-> m (IORef HeartbeatTracker)
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> (a -> m b) -> a -> m c
<=< Stopwatch -> m NominalDiffTime
forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed

--------------------------------------------------------------------------------
initHeartbeatTracker :: Internal -> EventStore ()
initHeartbeatTracker :: Internal -> EventStore ()
initHeartbeatTracker Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
..} = do
  NominalDiffTime
elapsed <- Stopwatch -> EventStore NominalDiffTime
forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
_stopwatch
  Integer
pkgNum  <- IORef Integer -> EventStore Integer
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Integer
_lastPkgNum
  let tracker :: HeartbeatTracker
tracker = Integer -> HeartbeatStage -> NominalDiffTime -> HeartbeatTracker
HeartbeatTracker Integer
pkgNum HeartbeatStage
Interval NominalDiffTime
elapsed
  IORef HeartbeatTracker -> HeartbeatTracker -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef HeartbeatTracker
_tracker HeartbeatTracker
tracker

--------------------------------------------------------------------------------
data Internal =
  Internal { Internal -> Discovery
_disc          :: Discovery
           , Internal -> ConnectionBuilder
_builder       :: ConnectionBuilder
           , Internal -> IORef Stage
_stage         :: IORef Stage
           , Internal -> IORef (Maybe EndPoint)
_last          :: IORef (Maybe EndPoint)
           , Internal -> TVar Bool
_sending       :: TVar Bool
           , Internal -> Registry
_opMgr         :: Operation.Registry
           , Internal -> Stopwatch
_stopwatch     :: Stopwatch
           , Internal -> IORef NominalDiffTime
_lastCheck     :: IORef NominalDiffTime
           , Internal -> IORef Bool
_lastConnected :: IORef Bool
           , Internal -> IORef HeartbeatTracker
_tracker       :: IORef HeartbeatTracker
           , Internal -> IORef Integer
_lastPkgNum    :: IORef Integer
           }

--------------------------------------------------------------------------------
incrPackageNumber :: Internal -> EventStore ()
incrPackageNumber :: Internal -> EventStore ()
incrPackageNumber Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} = do
  IORef Integer -> (Integer -> (Integer, ())) -> EventStore ()
forall (m :: * -> *) a b.
MonadBase IO m =>
IORef a -> (a -> (a, b)) -> m b
atomicModifyIORef' IORef Integer
_lastPkgNum ((Integer -> (Integer, ())) -> EventStore ())
-> (Integer -> (Integer, ())) -> EventStore ()
forall a b. (a -> b) -> a -> b
$ \Integer
n -> (Integer
n Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
1, ())
  EventStore ()
monitorIncrPkgCount

--------------------------------------------------------------------------------
connectionManager :: Settings
                  -> ConnectionBuilder
                  -> Discovery
                  -> Hub
                  -> IO ()
connectionManager :: Settings -> ConnectionBuilder -> Discovery -> Hub -> IO ()
connectionManager Settings
setts ConnectionBuilder
builder Discovery
disc Hub
mainBus = do
  IORef Stage
stageRef <- Stage -> IO (IORef Stage)
forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef Stage
Init
  let mkInternal :: IORef (Maybe EndPoint)
-> TVar Bool
-> Registry
-> Stopwatch
-> IORef NominalDiffTime
-> IORef Bool
-> IORef HeartbeatTracker
-> IORef Integer
-> Internal
mkInternal = Discovery
-> ConnectionBuilder
-> IORef Stage
-> IORef (Maybe EndPoint)
-> TVar Bool
-> Registry
-> Stopwatch
-> IORef NominalDiffTime
-> IORef Bool
-> IORef HeartbeatTracker
-> IORef Integer
-> Internal
Internal Discovery
disc ConnectionBuilder
builder IORef Stage
stageRef

  Stopwatch
stopwatch    <- IO Stopwatch
forall (m :: * -> *). MonadBase IO m => m Stopwatch
newStopwatch
  NominalDiffTime
timeoutCheck <- Stopwatch -> IO NominalDiffTime
forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
stopwatch
  Internal
internal <- IORef (Maybe EndPoint)
-> TVar Bool
-> Registry
-> Stopwatch
-> IORef NominalDiffTime
-> IORef Bool
-> IORef HeartbeatTracker
-> IORef Integer
-> Internal
mkInternal (IORef (Maybe EndPoint)
 -> TVar Bool
 -> Registry
 -> Stopwatch
 -> IORef NominalDiffTime
 -> IORef Bool
 -> IORef HeartbeatTracker
 -> IORef Integer
 -> Internal)
-> IO (IORef (Maybe EndPoint))
-> IO
     (TVar Bool
      -> Registry
      -> Stopwatch
      -> IORef NominalDiffTime
      -> IORef Bool
      -> IORef HeartbeatTracker
      -> IORef Integer
      -> Internal)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe EndPoint -> IO (IORef (Maybe EndPoint))
forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef Maybe EndPoint
forall a. Maybe a
Nothing
                         IO
  (TVar Bool
   -> Registry
   -> Stopwatch
   -> IORef NominalDiffTime
   -> IORef Bool
   -> IORef HeartbeatTracker
   -> IORef Integer
   -> Internal)
-> IO (TVar Bool)
-> IO
     (Registry
      -> Stopwatch
      -> IORef NominalDiffTime
      -> IORef Bool
      -> IORef HeartbeatTracker
      -> IORef Integer
      -> Internal)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Bool -> IO (TVar Bool)
forall a. a -> IO (TVar a)
newTVarIO Bool
False
                         IO
  (Registry
   -> Stopwatch
   -> IORef NominalDiffTime
   -> IORef Bool
   -> IORef HeartbeatTracker
   -> IORef Integer
   -> Internal)
-> IO Registry
-> IO
     (Stopwatch
      -> IORef NominalDiffTime
      -> IORef Bool
      -> IORef HeartbeatTracker
      -> IORef Integer
      -> Internal)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NominalDiffTime -> Retry -> IO Registry
Operation.registryNew (Settings -> NominalDiffTime
s_operationTimeout Settings
setts) (Settings -> Retry
s_operationRetry Settings
setts)
                         IO
  (Stopwatch
   -> IORef NominalDiffTime
   -> IORef Bool
   -> IORef HeartbeatTracker
   -> IORef Integer
   -> Internal)
-> IO Stopwatch
-> IO
     (IORef NominalDiffTime
      -> IORef Bool
      -> IORef HeartbeatTracker
      -> IORef Integer
      -> Internal)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Stopwatch -> IO Stopwatch
forall (m :: * -> *) a. Monad m => a -> m a
return Stopwatch
stopwatch
                         IO
  (IORef NominalDiffTime
   -> IORef Bool
   -> IORef HeartbeatTracker
   -> IORef Integer
   -> Internal)
-> IO (IORef NominalDiffTime)
-> IO
     (IORef Bool -> IORef HeartbeatTracker -> IORef Integer -> Internal)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NominalDiffTime -> IO (IORef NominalDiffTime)
forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef NominalDiffTime
timeoutCheck
                         IO
  (IORef Bool -> IORef HeartbeatTracker -> IORef Integer -> Internal)
-> IO (IORef Bool)
-> IO (IORef HeartbeatTracker -> IORef Integer -> Internal)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Bool -> IO (IORef Bool)
forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef Bool
False
                         IO (IORef HeartbeatTracker -> IORef Integer -> Internal)
-> IO (IORef HeartbeatTracker) -> IO (IORef Integer -> Internal)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Stopwatch -> IO (IORef HeartbeatTracker)
forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m (IORef HeartbeatTracker)
newHeartbeatTracker Stopwatch
stopwatch
                         IO (IORef Integer -> Internal) -> IO (IORef Integer) -> IO Internal
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Integer -> IO (IORef Integer)
forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef Integer
0

  Hub -> (SystemInit -> EventStore ()) -> IO ()
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> SystemInit -> EventStore ()
onInit Internal
internal)
  Hub -> (EstablishConnection -> EventStore ()) -> IO ()
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> EstablishConnection -> EventStore ()
onEstablish Internal
internal)
  Hub -> (ConnectionEstablished -> EventStore ()) -> IO ()
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> ConnectionEstablished -> EventStore ()
onEstablished Internal
internal)
  Hub -> (PackageArrived -> EventStore ()) -> IO ()
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> PackageArrived -> EventStore ()
onArrived Internal
internal)
  Hub -> (Transmit -> EventStore ()) -> IO ()
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> Transmit -> EventStore ()
onTransmit Internal
internal)
  Hub -> (ConnectionError -> EventStore ()) -> IO ()
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> ConnectionError -> EventStore ()
onConnectionError Internal
internal)
  Hub -> (ConnectionClosed -> EventStore ()) -> IO ()
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> ConnectionClosed -> EventStore ()
onConnectionClosed Internal
internal)
  Hub -> (CloseConnection -> EventStore ()) -> IO ()
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> CloseConnection -> EventStore ()
onCloseConnection Internal
internal)
  Hub -> (SystemShutdown -> EventStore ()) -> IO ()
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> SystemShutdown -> EventStore ()
onShutdown Internal
internal)
  Hub -> (Tick -> EventStore ()) -> IO ()
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> Tick -> EventStore ()
onTick Internal
internal)
  Hub -> (SendPackage -> EventStore ()) -> IO ()
forall s a.
(Sub s, Typeable a) =>
s -> (a -> EventStore ()) -> IO ()
subscribe Hub
mainBus (Internal -> SendPackage -> EventStore ()
onSendPackage Internal
internal)

--------------------------------------------------------------------------------
onInit :: Internal -> SystemInit -> EventStore ()
onInit :: Internal -> SystemInit -> EventStore ()
onInit self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} SystemInit
_ = do
  NewTimer -> EventStore ()
forall a. Typeable a => a -> EventStore ()
publish (Tick -> Duration -> Bool -> NewTimer
forall e. Typeable e => e -> Duration -> Bool -> NewTimer
NewTimer Tick
Tick Duration
timerPeriod Bool
False)
  Internal -> EventStore ()
startConnect Internal
self

--------------------------------------------------------------------------------
startConnect :: Internal -> EventStore ()
startConnect :: Internal -> EventStore ()
startConnect self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} =
  IORef Stage -> EventStore Stage
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage EventStore Stage -> (Stage -> EventStore ()) -> EventStore ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Stage
Init -> do
      Attempts
atts <- Stopwatch -> EventStore Attempts
freshAttempt Stopwatch
_stopwatch
      IORef Stage -> Stage -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Attempts -> ConnectingState -> Stage
Connecting Attempts
atts ConnectingState
Reconnecting)
      Internal -> EventStore ()
discover Internal
self
    Stage
_ -> () -> EventStore ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

--------------------------------------------------------------------------------
discover :: Internal -> EventStore ()
discover :: Internal -> EventStore ()
discover Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} =
  IORef Stage -> EventStore Stage
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage EventStore Stage -> (Stage -> EventStore ()) -> EventStore ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Connecting Attempts
att ConnectingState
p ->
      case ConnectingState
p of
        Reconnecting{} -> do
          IORef Stage -> Stage -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Attempts -> ConnectingState -> Stage
Connecting Attempts
att ConnectingState
EndpointDiscovery)
          Maybe EndPoint
old <- IORef (Maybe EndPoint) -> EventStore (Maybe EndPoint)
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef (Maybe EndPoint)
_last
          ThreadId
_   <- EventStore () -> EventStore ThreadId
forall (m :: * -> *). MonadBaseControl IO m => m () -> m ThreadId
fork (EventStore () -> EventStore ThreadId)
-> EventStore () -> EventStore ThreadId
forall a b. (a -> b) -> a -> b
$
              EventStore (Maybe EndPoint)
-> EventStore (Either SomeException (Maybe EndPoint))
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny (Discovery -> Maybe EndPoint -> EventStore (Maybe EndPoint)
runDiscovery Discovery
_disc Maybe EndPoint
old) EventStore (Either SomeException (Maybe EndPoint))
-> (Either SomeException (Maybe EndPoint) -> EventStore ())
-> EventStore ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                Left SomeException
e -> do
                  Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logError
                    [i| Failed to resolve TCP endpoint to which to connect #{e}.|]
                  CloseConnection -> EventStore ()
forall a. Typeable a => a -> EventStore ()
publish (SomeException -> CloseConnection
CloseConnection SomeException
e)
                Right Maybe EndPoint
opt ->
                  case Maybe EndPoint
opt of
                    Maybe EndPoint
Nothing -> do
                      Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logWarn
                        Text
"Failed to resolve TCP endpoint to which to connect."
                    Just EndPoint
ept -> EstablishConnection -> EventStore ()
forall a. Typeable a => a -> EventStore ()
publish (EndPoint -> EstablishConnection
EstablishConnection EndPoint
ept)
          () -> EventStore ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        ConnectingState
_ -> () -> EventStore ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Stage
_ -> () -> EventStore ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

--------------------------------------------------------------------------------
establish :: Internal -> EndPoint -> EventStore ()
establish :: Internal -> EndPoint -> EventStore ()
establish Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} EndPoint
ept = do
  $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug) [i|Establish tcp connection on [#{ept}]|]
  IORef Stage -> EventStore Stage
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage EventStore Stage -> (Stage -> EventStore ()) -> EventStore ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Connecting Attempts
att ConnectingState
s ->
      case ConnectingState
s of
        ConnectingState
EndpointDiscovery -> do
          Connection
conn <- ConnectionBuilder -> EndPoint -> EventStore Connection
connect ConnectionBuilder
_builder EndPoint
ept
          Bool
connected <- IORef Bool -> (Bool -> (Bool, Bool)) -> EventStore Bool
forall (m :: * -> *) a b.
MonadBase IO m =>
IORef a -> (a -> (a, b)) -> m b
atomicModifyIORef' IORef Bool
_lastConnected ((Bool -> (Bool, Bool)) -> EventStore Bool)
-> (Bool -> (Bool, Bool)) -> EventStore Bool
forall a b. (a -> b) -> a -> b
$ \Bool
c -> (Bool
True, Bool
c)
          Bool -> EventStore () -> EventStore ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
connected (EventStore () -> EventStore ()) -> EventStore () -> EventStore ()
forall a b. (a -> b) -> a -> b
$
            Initialized -> EventStore ()
forall a. Typeable a => a -> EventStore ()
publish (Service -> Initialized
Initialized Service
ConnectionManager)
          IORef Stage -> Stage -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Attempts -> ConnectingState -> Stage
Connecting Attempts
att (Connection -> ConnectingState
ConnectionEstablishing Connection
conn))
        ConnectingState
_ -> () -> EventStore ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Stage
_ -> () -> EventStore ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

--------------------------------------------------------------------------------
established :: Internal -> Connection -> EventStore ()
established :: Internal -> Connection -> EventStore ()
established self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} Connection
conn =
  IORef Stage -> EventStore Stage
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage EventStore Stage -> (Stage -> EventStore ()) -> EventStore ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Connecting Attempts
att (ConnectionEstablishing Connection
known) -> do
      Bool -> EventStore () -> EventStore ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Connection
conn Connection -> Connection -> Bool
forall a. Eq a => a -> a -> Bool
== Connection
known) (EventStore () -> EventStore ()) -> EventStore () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ do
        Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logDebug [i|TCP connection established: #{conn}.|]
        Settings
setts <- EventStore Settings
getSettings
        case Settings -> Maybe Credentials
s_defaultUserCredentials Settings
setts of
          Just Credentials
cred -> Internal -> Attempts -> Connection -> Credentials -> EventStore ()
authenticate Internal
self Attempts
att Connection
conn Credentials
cred
          Maybe Credentials
Nothing   -> Internal -> Attempts -> Connection -> EventStore ()
identifyClient Internal
self Attempts
att Connection
known
    Stage
_ -> () -> EventStore ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

--------------------------------------------------------------------------------
authenticate :: Internal
             -> Attempts
             -> Connection
             -> Credentials
             -> EventStore ()
authenticate :: Internal -> Attempts -> Connection -> Credentials -> EventStore ()
authenticate Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} Attempts
att Connection
conn Credentials
cred = do
  Package
pkg     <- Credentials -> EventStore Package
forall (m :: * -> *). MonadBase IO m => Credentials -> m Package
newAuthenticatePkg Credentials
cred
  NominalDiffTime
elapsed <- Stopwatch -> EventStore NominalDiffTime
forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
_stopwatch
  let authCorr :: UUID
authCorr = Package -> UUID
packageCorrelation Package
pkg

  IORef Stage -> Stage -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Attempts -> ConnectingState -> Stage
Connecting Attempts
att (UUID -> NominalDiffTime -> Connection -> ConnectingState
Authentication UUID
authCorr NominalDiffTime
elapsed Connection
conn))
  Connection -> Package -> EventStore ()
enqueuePackage Connection
conn Package
pkg

--------------------------------------------------------------------------------
identifyClient :: Internal -> Attempts -> Connection -> EventStore ()
identifyClient :: Internal -> Attempts -> Connection -> EventStore ()
identifyClient Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} Attempts
att Connection
conn = do
  Settings
setts <- EventStore Settings
getSettings
  UUID
uuid  <- EventStore UUID
forall (m :: * -> *). MonadBase IO m => m UUID
newUUID
  let defName :: Text
defName  = [i|ES-#{uuid}|]
      connName :: Text
connName = Text -> Maybe Text -> Text
forall a. a -> Maybe a -> a
fromMaybe Text
defName (Settings -> Maybe Text
s_defaultConnectionName Settings
setts)

  Package
pkg     <- Int32 -> Text -> EventStore Package
forall (m :: * -> *). MonadBase IO m => Int32 -> Text -> m Package
newIdentifyPkg Int32
clientVersion Text
connName
  NominalDiffTime
elapsed <- Stopwatch -> EventStore NominalDiffTime
forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
_stopwatch
  let idCorr :: UUID
idCorr = Package -> UUID
packageCorrelation Package
pkg

  IORef Stage -> Stage -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Attempts -> ConnectingState -> Stage
Connecting Attempts
att (UUID -> NominalDiffTime -> Connection -> ConnectingState
Identification UUID
idCorr NominalDiffTime
elapsed Connection
conn))
  Connection -> Package -> EventStore ()
enqueuePackage Connection
conn Package
pkg
  where
    clientVersion :: Int32
clientVersion = Int32
1

--------------------------------------------------------------------------------
clientIdentified :: Internal -> EventStore ()
clientIdentified :: Internal -> EventStore ()
clientIdentified self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} =
  IORef Stage -> EventStore Stage
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage EventStore Stage -> (Stage -> EventStore ()) -> EventStore ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Connecting Attempts
_ (Identification UUID
_ NominalDiffTime
_ Connection
conn) -> do
      Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logDebug [i|TCP connection identified: #{conn}.|]
      IORef Stage -> Stage -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Connection -> Stage
Connected Connection
conn)
      Internal -> EventStore ()
initHeartbeatTracker Internal
self

      -- HACK: It can happen the user submitted operations before the connection was
      -- available. Those operations are only check on every 's_operationTimeout'
      -- ms. This could lead the first operation to take time before gettings.
      -- FIXME: We might consider doing that hack only if it's the first time
      -- we connect with the server.
      [Package]
pkgs <- Registry -> UUID -> EventStore [Package]
Operation.registryCheckAndRetry Registry
_opMgr (Connection -> UUID
connectionId Connection
conn)
      (Package -> EventStore ()) -> [Package] -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Connection -> Package -> EventStore ()
enqueuePackage Connection
conn) [Package]
pkgs
    Stage
_ -> () -> EventStore ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

--------------------------------------------------------------------------------
onEstablished :: Internal -> ConnectionEstablished -> EventStore ()
onEstablished :: Internal -> ConnectionEstablished -> EventStore ()
onEstablished Internal
self (ConnectionEstablished Connection
conn) = Internal -> Connection -> EventStore ()
established Internal
self Connection
conn

--------------------------------------------------------------------------------
closeConnection :: Exception e => Internal -> e -> EventStore ()
closeConnection :: Internal -> e -> EventStore ()
closeConnection self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} e
cause = do
  Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logDebug [i|CloseConnection: #{cause}.|]
  Maybe Connection
mConn <- Internal -> EventStore (Maybe Connection)
lookupConnectionAndSwitchToClosed Internal
self
  Registry -> EventStore ()
Operation.registryAbort Registry
_opMgr
  (Connection -> EventStore ()) -> Maybe Connection -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Internal -> e -> Connection -> EventStore ()
forall e.
Exception e =>
Internal -> e -> Connection -> EventStore ()
closeTcpConnection Internal
self e
cause) Maybe Connection
mConn
  Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logInfo [i|CloseConnection: connection cleanup done for [#{cause}].|]
  FatalException -> EventStore ()
forall a. Typeable a => a -> EventStore ()
publish (e -> FatalException
forall e. Exception e => e -> FatalException
FatalException e
cause)

--------------------------------------------------------------------------------
lookupConnectionAndSwitchToClosed :: Internal -> EventStore (Maybe Connection)
lookupConnectionAndSwitchToClosed :: Internal -> EventStore (Maybe Connection)
lookupConnectionAndSwitchToClosed self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} = do
  Maybe Connection
outcome <- Internal -> EventStore (Maybe Connection)
lookupConnection Internal
self
  IORef Stage -> Stage -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage Stage
Closed
  Maybe Connection -> EventStore (Maybe Connection)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Connection
outcome

--------------------------------------------------------------------------------
closeTcpConnection :: Exception e => Internal -> e -> Connection -> EventStore ()
closeTcpConnection :: Internal -> e -> Connection -> EventStore ()
closeTcpConnection Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} e
cause Connection
conn = do
  let cid :: UUID
cid = Connection -> UUID
connectionId Connection
conn
  Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logDebug [i|CloseTcpConnection: connection [#{cid}]. Cause: #{cause}.|]
  Connection -> EventStore ()
dispose Connection
conn
  Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logDebug [i|CloseTcpConnection: connection [#{cid}] disposed.|]

  IORef Stage -> EventStore Stage
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage EventStore Stage -> (Stage -> EventStore ()) -> EventStore ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Stage
Closed -> () -> EventStore ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Stage
stage  -> do
      Attempts
att <-
        case Stage
stage of
          Connecting Attempts
old ConnectingState
_ -> Attempts -> EventStore Attempts
forall (m :: * -> *) a. Monad m => a -> m a
return Attempts
old
          Stage
_                -> Stopwatch -> EventStore Attempts
freshAttempt Stopwatch
_stopwatch
      IORef Stage -> Stage -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Attempts -> ConnectingState -> Stage
Connecting Attempts
att ConnectingState
Reconnecting)

--------------------------------------------------------------------------------
data ForceReconnect = ForceReconnect EndPoint deriving (Typeable, Int -> ForceReconnect -> ShowS
[ForceReconnect] -> ShowS
ForceReconnect -> String
(Int -> ForceReconnect -> ShowS)
-> (ForceReconnect -> String)
-> ([ForceReconnect] -> ShowS)
-> Show ForceReconnect
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ForceReconnect] -> ShowS
$cshowList :: [ForceReconnect] -> ShowS
show :: ForceReconnect -> String
$cshow :: ForceReconnect -> String
showsPrec :: Int -> ForceReconnect -> ShowS
$cshowsPrec :: Int -> ForceReconnect -> ShowS
Show)

--------------------------------------------------------------------------------
instance Exception ForceReconnect

--------------------------------------------------------------------------------
forceReconnect :: Internal -> NodeEndPoints -> EventStore ()
forceReconnect :: Internal -> NodeEndPoints -> EventStore ()
forceReconnect self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} NodeEndPoints
node = do
  Settings
setts <- EventStore Settings
getSettings
  let ept :: EndPoint
ept = if Maybe TLSSettings -> Bool
forall a. Maybe a -> Bool
isJust (Maybe TLSSettings -> Bool) -> Maybe TLSSettings -> Bool
forall a b. (a -> b) -> a -> b
$ Settings -> Maybe TLSSettings
s_ssl Settings
setts
            then let Just EndPoint
pt = NodeEndPoints -> Maybe EndPoint
secureEndPoint NodeEndPoints
node in EndPoint
pt
            else NodeEndPoints -> EndPoint
tcpEndPoint NodeEndPoints
node

  Connected Connection
conn <- IORef Stage -> EventStore Stage
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage
  Bool -> EventStore () -> EventStore ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Connection -> EndPoint
connectionEndPoint Connection
conn EndPoint -> EndPoint -> Bool
forall a. Eq a => a -> a -> Bool
/= EndPoint
ept) (EventStore () -> EventStore ()) -> EventStore () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ do
    EventStore ()
monitorIncrForceReconnect
    Internal -> ForceReconnect -> Connection -> EventStore ()
forall e.
Exception e =>
Internal -> e -> Connection -> EventStore ()
closeTcpConnection Internal
self (EndPoint -> ForceReconnect
ForceReconnect EndPoint
ept) Connection
conn
    Attempts
att <- Stopwatch -> EventStore Attempts
freshAttempt Stopwatch
_stopwatch

    -- We update the last pkg number to nullify the current heartbeat tracking.
    IORef Integer -> (Integer -> (Integer, ())) -> EventStore ()
forall (m :: * -> *) a b.
MonadBase IO m =>
IORef a -> (a -> (a, b)) -> m b
atomicModifyIORef' IORef Integer
_lastPkgNum ((Integer -> (Integer, ())) -> EventStore ())
-> (Integer -> (Integer, ())) -> EventStore ()
forall a b. (a -> b) -> a -> b
$ \Integer
cur -> (Integer
cur Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
1, ())

    IORef Stage -> Stage -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Attempts -> ConnectingState -> Stage
Connecting Attempts
att ConnectingState
EndpointDiscovery)
    Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logInfo [i|#{conn}: going to reconnect to #{ept}.|]
    Internal -> EndPoint -> EventStore ()
establish Internal
self EndPoint
ept

--------------------------------------------------------------------------------
onEstablish :: Internal -> EstablishConnection -> EventStore ()
onEstablish :: Internal -> EstablishConnection -> EventStore ()
onEstablish Internal
self (EstablishConnection EndPoint
ept) = Internal -> EndPoint -> EventStore ()
establish Internal
self EndPoint
ept

--------------------------------------------------------------------------------
onTick :: Internal -> Tick -> EventStore ()
onTick :: Internal -> Tick -> EventStore ()
onTick self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} Tick
_ = do
  Settings
setts <- EventStore Settings
getSettings
  IORef Stage -> EventStore Stage
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage EventStore Stage -> (Stage -> EventStore ()) -> EventStore ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    (Stage -> Maybe Attempts
onGoingConnection -> Just Attempts{Int
NominalDiffTime
attemptLastStart :: NominalDiffTime
attemptCount :: Int
attemptLastStart :: Attempts -> NominalDiffTime
attemptCount :: Attempts -> Int
..}) -> do
      NominalDiffTime
elapsed <- Stopwatch -> EventStore NominalDiffTime
forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
_stopwatch
      Bool -> EventStore () -> EventStore ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NominalDiffTime
elapsed NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
- NominalDiffTime
attemptLastStart NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
>= Settings -> NominalDiffTime
s_reconnect_delay Settings
setts) (EventStore () -> EventStore ()) -> EventStore () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ do
        let retries :: Int
retries = Int
attemptCount Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
            att :: Attempts
att     = Int -> NominalDiffTime -> Attempts
Attempts Int
retries NominalDiffTime
elapsed
        IORef Stage -> Stage -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef Stage
_stage (Attempts -> ConnectingState -> Stage
Connecting Attempts
att ConnectingState
Reconnecting)
        case Settings -> Retry
s_retry Settings
setts of
          AtMost Int
n
            | Int
attemptCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
n -> Int -> EventStore ()
retryConnection Int
attemptCount
            | Bool
otherwise -> EventStore ()
maxAttemptReached
          Retry
KeepRetrying -> Int -> EventStore ()
retryConnection Int
attemptCount

    (Stage -> Maybe (NominalDiffTime, Attempts, Connection)
pendingAuthenticate -> Just (NominalDiffTime
started, Attempts
att, Connection
conn)) -> do
      NominalDiffTime
elapsed <- Stopwatch -> EventStore NominalDiffTime
forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
_stopwatch
      Bool -> EventStore () -> EventStore ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NominalDiffTime
elapsed NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
- NominalDiffTime
started NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
>= Settings -> NominalDiffTime
s_operationTimeout Settings
setts) (EventStore () -> EventStore ()) -> EventStore () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ do
        Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logWarn Text
"Authentication timed out."
        Internal -> Attempts -> Connection -> EventStore ()
identifyClient Internal
self Attempts
att Connection
conn

    (Stage -> Maybe NominalDiffTime
pendingIdentification -> Just NominalDiffTime
started) -> do
      NominalDiffTime
elapsed <- Stopwatch -> EventStore NominalDiffTime
forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
_stopwatch
      Bool -> EventStore () -> EventStore ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NominalDiffTime
elapsed NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
- NominalDiffTime
started NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
>= Settings -> NominalDiffTime
s_operationTimeout Settings
setts) (EventStore () -> EventStore ()) -> EventStore () -> EventStore ()
forall a b. (a -> b) -> a -> b
$
        -- We close the current connection and let the reconnection process
        -- to take over.
        (Connection -> EventStore ()) -> Maybe Connection -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Internal -> IdentificationTimeout -> Connection -> EventStore ()
forall e.
Exception e =>
Internal -> e -> Connection -> EventStore ()
closeTcpConnection Internal
self IdentificationTimeout
IdentificationTimeout)
            (Maybe Connection -> EventStore ())
-> EventStore (Maybe Connection) -> EventStore ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Internal -> EventStore (Maybe Connection)
lookupConnection Internal
self

    (Stage -> Bool
defaultConnecting -> Bool
True) -> Internal -> EventStore ()
manageHeartbeats Internal
self

    Connected Connection
conn -> do
      NominalDiffTime
elapsed           <- Stopwatch -> EventStore NominalDiffTime
forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
_stopwatch
      NominalDiffTime
timeoutCheckStart <- IORef NominalDiffTime -> EventStore NominalDiffTime
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef NominalDiffTime
_lastCheck

      Bool -> EventStore () -> EventStore ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (NominalDiffTime
elapsed NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
- NominalDiffTime
timeoutCheckStart NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
>= Settings -> NominalDiffTime
s_operationTimeout Settings
setts) (EventStore () -> EventStore ()) -> EventStore () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ do
        Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logDebug Text
"Start check and retry..."
        [Package]
pkgs <- Registry -> UUID -> EventStore [Package]
Operation.registryCheckAndRetry Registry
_opMgr (Connection -> UUID
connectionId Connection
conn)
        (Package -> EventStore ()) -> [Package] -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (Connection -> Package -> EventStore ()
enqueuePackage Connection
conn) [Package]
pkgs
        Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logDebug Text
"Completed check and retry"
        IORef NominalDiffTime -> NominalDiffTime -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef NominalDiffTime
_lastCheck NominalDiffTime
elapsed

    Stage
_ -> () -> EventStore ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

  -- FIXME - This `readIORef` call can be merged into the previous one.
  -- Done in 2019's refactoring code.
  Stage
stage <- IORef Stage -> EventStore Stage
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage
  Bool -> EventStore () -> EventStore ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Stage -> Bool
atLeastEstablishingState Stage
stage) (EventStore () -> EventStore ()) -> EventStore () -> EventStore ()
forall a b. (a -> b) -> a -> b
$
    Internal -> EventStore ()
manageHeartbeats Internal
self
  where
    onGoingConnection :: Stage -> Maybe Attempts
onGoingConnection (Connecting Attempts
att ConnectingState
Reconnecting)             = Attempts -> Maybe Attempts
forall a. a -> Maybe a
Just Attempts
att
    onGoingConnection (Connecting Attempts
att ConnectionEstablishing{}) = Attempts -> Maybe Attempts
forall a. a -> Maybe a
Just Attempts
att
    onGoingConnection Stage
_                                         = Maybe Attempts
forall a. Maybe a
Nothing

    pendingIdentification :: Stage -> Maybe NominalDiffTime
pendingIdentification (Connecting Attempts
_ (Identification UUID
_ NominalDiffTime
started Connection
_)) = NominalDiffTime -> Maybe NominalDiffTime
forall a. a -> Maybe a
Just NominalDiffTime
started
    pendingIdentification Stage
_                                           = Maybe NominalDiffTime
forall a. Maybe a
Nothing

    pendingAuthenticate :: Stage -> Maybe (NominalDiffTime, Attempts, Connection)
pendingAuthenticate (Connecting Attempts
a (Authentication UUID
_ NominalDiffTime
started Connection
c)) = (NominalDiffTime, Attempts, Connection)
-> Maybe (NominalDiffTime, Attempts, Connection)
forall a. a -> Maybe a
Just (NominalDiffTime
started, Attempts
a, Connection
c)
    pendingAuthenticate Stage
_                                           = Maybe (NominalDiffTime, Attempts, Connection)
forall a. Maybe a
Nothing

    defaultConnecting :: Stage -> Bool
defaultConnecting Connecting{} = Bool
True
    defaultConnecting Stage
_            = Bool
False

    maxAttemptReached :: EventStore ()
maxAttemptReached = do
      Internal -> ConnectionMaxAttemptReached -> EventStore ()
forall e. Exception e => Internal -> e -> EventStore ()
closeConnection Internal
self ConnectionMaxAttemptReached
ConnectionMaxAttemptReached
      FatalException -> EventStore ()
forall a. Typeable a => a -> EventStore ()
publish (ConnectionMaxAttemptReached -> FatalException
forall e. Exception e => e -> FatalException
FatalException ConnectionMaxAttemptReached
ConnectionMaxAttemptReached)

    retryConnection :: Int -> EventStore ()
retryConnection Int
cnt = do
      Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logDebug [i|Checking reconnection... (attempt #{cnt}).|]
      Internal -> EventStore ()
discover Internal
self

--------------------------------------------------------------------------------
data ServerHeartbeatTimeout = ServerHeartbeatTimeout deriving Typeable

--------------------------------------------------------------------------------
instance Show ServerHeartbeatTimeout where
  show :: ServerHeartbeatTimeout -> String
show ServerHeartbeatTimeout
_ = String
"Server connection has heartbeat timeout"

--------------------------------------------------------------------------------
instance Exception ServerHeartbeatTimeout

--------------------------------------------------------------------------------
manageHeartbeats :: Internal -> EventStore ()
manageHeartbeats :: Internal -> EventStore ()
manageHeartbeats self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} = (Connection -> EventStore ()) -> Maybe Connection -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Connection -> EventStore ()
go (Maybe Connection -> EventStore ())
-> EventStore (Maybe Connection) -> EventStore ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Internal -> EventStore (Maybe Connection)
lookupConnection Internal
self
  where
    go :: Connection -> EventStore ()
go Connection
conn = do
      NominalDiffTime
elapsed <- Stopwatch -> EventStore NominalDiffTime
forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed Stopwatch
_stopwatch
      Integer
pkgNum  <- IORef Integer -> EventStore Integer
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Integer
_lastPkgNum
      HeartbeatTracker
tracker <- IORef HeartbeatTracker -> EventStore HeartbeatTracker
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef HeartbeatTracker
_tracker
      Settings
setts   <- EventStore Settings
getSettings

      let interval :: NominalDiffTime
interval    = Settings -> NominalDiffTime
s_heartbeatInterval Settings
setts
          timeout :: NominalDiffTime
timeout     = Settings -> NominalDiffTime
s_heartbeatTimeout Settings
setts
          initTracker :: HeartbeatTracker
initTracker = HeartbeatTracker
tracker
                        { _heartbeatStage :: HeartbeatStage
_heartbeatStage = HeartbeatStage
Interval
                        , _startedSince :: NominalDiffTime
_startedSince   = NominalDiffTime
elapsed
                        , _pkgNum :: Integer
_pkgNum         = Integer
pkgNum
                        }

      if HeartbeatTracker -> Integer
_pkgNum HeartbeatTracker
tracker Integer -> Integer -> Bool
forall a. Eq a => a -> a -> Bool
/= Integer
pkgNum
        then IORef HeartbeatTracker -> HeartbeatTracker -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef HeartbeatTracker
_tracker HeartbeatTracker
initTracker
        else
          case HeartbeatTracker -> HeartbeatStage
_heartbeatStage HeartbeatTracker
tracker of
            HeartbeatStage
Interval
              | NominalDiffTime
elapsed NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
- HeartbeatTracker -> NominalDiffTime
_startedSince HeartbeatTracker
tracker NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
>= NominalDiffTime
interval -> do
                UUID
uuid <- EventStore UUID
forall (m :: * -> *). MonadIO m => m UUID
freshUUID
                let pkg :: Package
pkg        = UUID -> Package
heartbeatRequestPackage UUID
uuid
                    newTracker :: HeartbeatTracker
newTracker = HeartbeatTracker
tracker
                                 { _heartbeatStage :: HeartbeatStage
_heartbeatStage = HeartbeatStage
Timeout
                                 , _startedSince :: NominalDiffTime
_startedSince   = NominalDiffTime
elapsed
                                 , _pkgNum :: Integer
_pkgNum         = Integer
pkgNum
                                 }
                Connection -> Package -> EventStore ()
enqueuePackage Connection
conn Package
pkg
                IORef HeartbeatTracker -> HeartbeatTracker -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
atomicWriteIORef IORef HeartbeatTracker
_tracker HeartbeatTracker
newTracker
              | Bool
otherwise -> () -> EventStore ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            HeartbeatStage
Timeout
              | NominalDiffTime
elapsed NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
- HeartbeatTracker -> NominalDiffTime
_startedSince HeartbeatTracker
tracker NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
>= NominalDiffTime
timeout -> do
                EventStore ()
monitorIncrHeartbeatTimeouts
                Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logInfo [i|Closing #{conn} due to HEARTBEAT TIMEOUT at pkgNum #{pkgNum}|]
                Internal -> ServerHeartbeatTimeout -> Connection -> EventStore ()
forall e.
Exception e =>
Internal -> e -> Connection -> EventStore ()
closeTcpConnection Internal
self ServerHeartbeatTimeout
ServerHeartbeatTimeout Connection
conn
              | Bool
otherwise -> () -> EventStore ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

--------------------------------------------------------------------------------
onArrived :: Internal -> PackageArrived -> EventStore ()
onArrived :: Internal -> PackageArrived -> EventStore ()
onArrived self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} (PackageArrived Connection
conn pkg :: Package
pkg@Package{Maybe Credentials
ByteString
UUID
Command
packageCred :: Package -> Maybe Credentials
packageData :: Package -> ByteString
packageCmd :: Package -> Command
packageCred :: Maybe Credentials
packageData :: ByteString
packageCorrelation :: UUID
packageCmd :: Command
packageCorrelation :: Package -> UUID
..}) = do
  -- FIXME - We can merge those two `readIORef` calls into one.
  -- It's done on 2019's refactoring.
  Stage
cur <- IORef Stage -> EventStore Stage
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage
  Bool -> EventStore () -> EventStore ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Stage -> Bool
closedOrInit Stage
cur) (EventStore () -> EventStore ()) -> EventStore () -> EventStore ()
forall a b. (a -> b) -> a -> b
$
    Internal -> EventStore ()
incrPackageNumber Internal
self
  -- /FIXME

  IORef Stage -> EventStore Stage
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage EventStore Stage -> (Stage -> EventStore ()) -> EventStore ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    (Stage -> Maybe Attempts
onAuthentication -> Just Attempts
att) -> do
      Bool -> EventStore () -> EventStore ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Command
packageCmd Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
notAuthenticatedCmd) (EventStore () -> EventStore ()) -> EventStore () -> EventStore ()
forall a b. (a -> b) -> a -> b
$
        Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logWarn Text
"Not authenticated."

      Internal -> Attempts -> Connection -> EventStore ()
identifyClient Internal
self Attempts
att Connection
conn

    (Stage -> Bool
onIdentification -> Bool
True) ->
      Internal -> EventStore ()
clientIdentified Internal
self

    (Stage -> Bool
runningConnection -> Bool
True) -> do
      Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logDebug [i|Package received:  #{pkg}.|]
      EventStore ()
handlePackage

    Stage
_ -> Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logDebug [i|Package IGNORED: #{pkg}.|]

  where
    onIdentification :: Stage -> Bool
onIdentification (Connecting Attempts
_ (Identification UUID
u NominalDiffTime
_ Connection
_)) =
      UUID
packageCorrelation UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== UUID
u Bool -> Bool -> Bool
&& Command
packageCmd Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
clientIdentifiedCmd
    onIdentification Stage
_ = Bool
False

    onAuthentication :: Stage -> Maybe Attempts
onAuthentication (Connecting Attempts
a (Authentication UUID
u NominalDiffTime
_ Connection
_)) =
      if UUID
packageCorrelation UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
== UUID
u Bool -> Bool -> Bool
&& (Command
packageCmd Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
authenticatedCmd Bool -> Bool -> Bool
|| Command
packageCmd Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
notAuthenticatedCmd)
      then Attempts -> Maybe Attempts
forall a. a -> Maybe a
Just Attempts
a
      else Maybe Attempts
forall a. Maybe a
Nothing
    onAuthentication Stage
_ = Maybe Attempts
forall a. Maybe a
Nothing

    runningConnection :: Stage -> Bool
runningConnection (Connecting Attempts
_ (ConnectionEstablishing Connection
c)) = Connection
conn Connection -> Connection -> Bool
forall a. Eq a => a -> a -> Bool
== Connection
c
    runningConnection (Connecting Attempts
_ (Authentication UUID
_ NominalDiffTime
_ Connection
c)) = Connection
conn Connection -> Connection -> Bool
forall a. Eq a => a -> a -> Bool
== Connection
c
    runningConnection (Connecting Attempts
_ (Identification UUID
_ NominalDiffTime
_ Connection
c)) = Connection
conn Connection -> Connection -> Bool
forall a. Eq a => a -> a -> Bool
== Connection
c
    runningConnection (Connected Connection
c) = Connection
conn Connection -> Connection -> Bool
forall a. Eq a => a -> a -> Bool
== Connection
c
    runningConnection Stage
_ = Bool
False

    heartbeatResponse :: Package
heartbeatResponse = UUID -> Package
heartbeatResponsePackage UUID
packageCorrelation

    handlePackage :: EventStore ()
handlePackage
      | Command
packageCmd Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
heartbeatResponseCmd = () -> EventStore ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      | Command
packageCmd Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
heartbeatRequestCmd =
        Connection -> Package -> EventStore ()
enqueuePackage Connection
conn Package
heartbeatResponse
      | Bool
otherwise =
        Registry -> Package -> EventStore (Maybe NodeEndPoints)
Operation.registryHandle Registry
_opMgr Package
pkg EventStore (Maybe NodeEndPoints)
-> (Maybe NodeEndPoints -> EventStore ()) -> EventStore ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          Maybe NodeEndPoints
Nothing -> () -> EventStore ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          Just NodeEndPoints
node -> Internal -> NodeEndPoints -> EventStore ()
forceReconnect Internal
self NodeEndPoints
node

    closedOrInit :: Stage -> Bool
closedOrInit = \case
      Stage
Init -> Bool
True
      Stage
Closed -> Bool
True
      Stage
_ -> Bool
False

--------------------------------------------------------------------------------
isSameConnection :: Internal -> Connection -> EventStore Bool
isSameConnection :: Internal -> Connection -> EventStore Bool
isSameConnection Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} Connection
conn = Stage -> Bool
go (Stage -> Bool) -> EventStore Stage -> EventStore Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Stage -> EventStore Stage
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage
  where
    go :: Stage -> Bool
go (Connected Connection
known) = Connection
known Connection -> Connection -> Bool
forall a. Eq a => a -> a -> Bool
== Connection
conn
    go (Connecting Attempts
_ ConnectingState
state) =
      case ConnectingState
state of
        ConnectionEstablishing Connection
known -> Connection
known Connection -> Connection -> Bool
forall a. Eq a => a -> a -> Bool
== Connection
conn
        Authentication UUID
_ NominalDiffTime
_ Connection
known -> Connection
known Connection -> Connection -> Bool
forall a. Eq a => a -> a -> Bool
== Connection
conn
        Identification UUID
_ NominalDiffTime
_ Connection
known -> Connection
known Connection -> Connection -> Bool
forall a. Eq a => a -> a -> Bool
== Connection
conn
        ConnectingState
_ -> Bool
False
    go Stage
_ = Bool
False

--------------------------------------------------------------------------------
onConnectionError :: Internal -> ConnectionError -> EventStore ()
onConnectionError :: Internal -> ConnectionError -> EventStore ()
onConnectionError self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} (ConnectionError Connection
conn SomeException
e) =
  EventStore Bool -> EventStore () -> EventStore ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (Internal -> Connection -> EventStore Bool
isSameConnection Internal
self Connection
conn) (EventStore () -> EventStore ()) -> EventStore () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ do
    Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logError [i|TCP #{conn} error. Cause: #{e}.|]
    Internal -> SomeException -> EventStore ()
forall e. Exception e => Internal -> e -> EventStore ()
closeConnection Internal
self SomeException
e

--------------------------------------------------------------------------------
onConnectionClosed :: Internal -> ConnectionClosed -> EventStore ()
onConnectionClosed :: Internal -> ConnectionClosed -> EventStore ()
onConnectionClosed self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} (ConnectionClosed Connection
conn SomeException
cause) =
  EventStore Bool -> EventStore () -> EventStore ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (Internal -> Connection -> EventStore Bool
isSameConnection Internal
self Connection
conn) (EventStore () -> EventStore ()) -> EventStore () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ do
    Internal -> SomeException -> Connection -> EventStore ()
forall e.
Exception e =>
Internal -> e -> Connection -> EventStore ()
closeTcpConnection Internal
self SomeException
cause Connection
conn
    EventStore ()
monitorIncrConnectionDrop

--------------------------------------------------------------------------------
onShutdown :: Internal -> SystemShutdown -> EventStore ()
onShutdown :: Internal -> SystemShutdown -> EventStore ()
onShutdown self :: Internal
self@Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} SystemShutdown
_ = do
  Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logDebug Text
"Shutting down..."
  Maybe Connection
mConn <- Internal -> EventStore (Maybe Connection)
lookupConnectionAndSwitchToClosed Internal
self
  Registry -> EventStore ()
Operation.registryAbort Registry
_opMgr
  (Connection -> EventStore ()) -> Maybe Connection -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Connection -> EventStore ()
dispose Maybe Connection
mConn
  Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logDebug Text
"Shutdown properly."
  ServiceTerminated -> EventStore ()
forall a. Typeable a => a -> EventStore ()
publish (Service -> ServiceTerminated
ServiceTerminated Service
ConnectionManager)

--------------------------------------------------------------------------------
onTransmit :: Internal -> Transmit -> EventStore ()
onTransmit :: Internal -> Transmit -> EventStore ()
onTransmit Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} (Transmit Mailbox
m Lifetime
lifetime Package
pkg) =
  IORef Stage -> EventStore Stage
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
_stage EventStore Stage -> (Stage -> EventStore ()) -> EventStore ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Stage
Closed
      -> Mailbox -> OperationError -> EventStore ()
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail Mailbox
m OperationError
Aborted
    Connected Connection
conn
      -> do Connection -> Package -> EventStore ()
enqueuePackage Connection
conn Package
pkg
            Registry -> UUID -> Lifetime -> Package -> Mailbox -> EventStore ()
Operation.registryRegister Registry
_opMgr (Connection -> UUID
connectionId Connection
conn) Lifetime
lifetime Package
pkg Mailbox
m
    Stage
_ -> Registry -> Mailbox -> Lifetime -> Package -> EventStore ()
Operation.registryPostpone Registry
_opMgr Mailbox
m Lifetime
lifetime Package
pkg

--------------------------------------------------------------------------------
onCloseConnection :: Internal -> CloseConnection -> EventStore ()
onCloseConnection :: Internal -> CloseConnection -> EventStore ()
onCloseConnection Internal
self CloseConnection
e = Internal -> CloseConnection -> EventStore ()
forall e. Exception e => Internal -> e -> EventStore ()
closeConnection Internal
self CloseConnection
e

--------------------------------------------------------------------------------
lookupConnection :: Internal -> EventStore (Maybe Connection)
lookupConnection :: Internal -> EventStore (Maybe Connection)
lookupConnection Internal{TVar Bool
IORef Bool
IORef Integer
IORef (Maybe EndPoint)
IORef NominalDiffTime
IORef HeartbeatTracker
IORef Stage
Discovery
Stopwatch
Registry
ConnectionBuilder
_lastPkgNum :: IORef Integer
_tracker :: IORef HeartbeatTracker
_lastConnected :: IORef Bool
_lastCheck :: IORef NominalDiffTime
_stopwatch :: Stopwatch
_opMgr :: Registry
_sending :: TVar Bool
_last :: IORef (Maybe EndPoint)
_stage :: IORef Stage
_builder :: ConnectionBuilder
_disc :: Discovery
_lastPkgNum :: Internal -> IORef Integer
_tracker :: Internal -> IORef HeartbeatTracker
_lastConnected :: Internal -> IORef Bool
_lastCheck :: Internal -> IORef NominalDiffTime
_stopwatch :: Internal -> Stopwatch
_opMgr :: Internal -> Registry
_sending :: Internal -> TVar Bool
_last :: Internal -> IORef (Maybe EndPoint)
_stage :: Internal -> IORef Stage
_builder :: Internal -> ConnectionBuilder
_disc :: Internal -> Discovery
..} = IORef Stage -> EventStore (Maybe Connection)
lookingUpConnection IORef Stage
_stage

--------------------------------------------------------------------------------
lookingUpConnection :: IORef Stage -> EventStore (Maybe Connection)
lookingUpConnection :: IORef Stage -> EventStore (Maybe Connection)
lookingUpConnection IORef Stage
ref = Stage -> Maybe Connection
go (Stage -> Maybe Connection)
-> EventStore Stage -> EventStore (Maybe Connection)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Stage -> EventStore Stage
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef Stage
ref
  where
    go :: Stage -> Maybe Connection
go (Connected Connection
conn) = Connection -> Maybe Connection
forall a. a -> Maybe a
Just Connection
conn
    go (Connecting Attempts
_ ConnectingState
state) =
      case ConnectingState
state of
        ConnectionEstablishing Connection
conn -> Connection -> Maybe Connection
forall a. a -> Maybe a
Just Connection
conn
        Authentication UUID
_ NominalDiffTime
_ Connection
conn -> Connection -> Maybe Connection
forall a. a -> Maybe a
Just Connection
conn
        Identification UUID
_ NominalDiffTime
_ Connection
conn -> Connection -> Maybe Connection
forall a. a -> Maybe a
Just Connection
conn
        ConnectingState
_ -> Maybe Connection
forall a. Maybe a
Nothing
    go Stage
_ = Maybe Connection
forall a. Maybe a
Nothing

--------------------------------------------------------------------------------
onSendPackage :: Internal -> SendPackage -> EventStore ()
onSendPackage :: Internal -> SendPackage -> EventStore ()
onSendPackage Internal
self (SendPackage Package
pkg) =
  (Connection -> EventStore ()) -> Maybe Connection -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Connection -> EventStore ()
sending (Maybe Connection -> EventStore ())
-> EventStore (Maybe Connection) -> EventStore ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Internal -> EventStore (Maybe Connection)
lookupConnection Internal
self
  where
    sending :: Connection -> EventStore ()
sending Connection
conn = Connection -> Package -> EventStore ()
enqueuePackage Connection
conn Package
pkg