--------------------------------------------------------------------------------
-- |
-- Module    :  Database.EventStore.Internal.Manager.Registry
-- Copyright :  (C) 2020 Yorick Laupa
-- License   :  (see the file LICENSE)
-- Maintainer:  Yorick Laupa <yo.eight@gmail.com>
-- Stability :  experimental
-- Portability: non-portable
--
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Manager.Operation.Registry where

--------------------------------------------------------------------------------
import qualified Data.HashMap.Strict as HashMap

--------------------------------------------------------------------------------
import Data.ProtocolBuffers
import Data.Serialize

--------------------------------------------------------------------------------
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.EndPoint
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Stopwatch
import Database.EventStore.Internal.Types

--------------------------------------------------------------------------------
data Request =
  Request
  { Request -> Package
requestOriginal :: !Package
  , Request -> UUID
requestConnId :: !UUID
  , Request -> Int
requestRetries :: !Int
  , Request -> NominalDiffTime
requestStarted :: !NominalDiffTime
  , Request -> Mailbox
requestMailbox :: !Mailbox
  , Request -> Lifetime
requestLifetime :: !Lifetime
  }

--------------------------------------------------------------------------------
requestIsKeepAlive :: Request -> Bool
requestIsKeepAlive :: Request -> Bool
requestIsKeepAlive Request
req
  = case Request -> Lifetime
requestLifetime Request
req of
      Lifetime
OneTime -> Bool
False
      KeepAlive Command
_ -> Bool
True

--------------------------------------------------------------------------------
requestToWaiting :: Request -> Waiting
requestToWaiting :: Request -> Waiting
requestToWaiting Request
req =
  Waiting :: Lifetime -> Package -> Mailbox -> Waiting
Waiting
  { waitingLifetime :: Lifetime
waitingLifetime = Request -> Lifetime
requestLifetime Request
req
  , waitingPkg :: Package
waitingPkg = Request -> Package
requestOriginal Request
req
  , waitingMailbox :: Mailbox
waitingMailbox = Request -> Mailbox
requestMailbox Request
req
  }

--------------------------------------------------------------------------------
waitingToRequest
  :: UUID -- Connection id.
  -> NominalDiffTime
  -> Waiting
  -> Request
waitingToRequest :: UUID -> NominalDiffTime -> Waiting -> Request
waitingToRequest UUID
connId NominalDiffTime
started Waiting
w
  = Request :: Package
-> UUID -> Int -> NominalDiffTime -> Mailbox -> Lifetime -> Request
Request
    { requestOriginal :: Package
requestOriginal = Waiting -> Package
waitingPkg Waiting
w
    , requestConnId :: UUID
requestConnId = UUID
connId
    , requestRetries :: Int
requestRetries = Int
1
    , requestStarted :: NominalDiffTime
requestStarted = NominalDiffTime
started
    , requestMailbox :: Mailbox
requestMailbox = Waiting -> Mailbox
waitingMailbox Waiting
w
    , requestLifetime :: Lifetime
requestLifetime = Waiting -> Lifetime
waitingLifetime Waiting
w
    }

--------------------------------------------------------------------------------
data Waiting =
  Waiting
  { Waiting -> Lifetime
waitingLifetime :: !Lifetime
  , Waiting -> Package
waitingPkg :: !Package
  , Waiting -> Mailbox
waitingMailbox :: !Mailbox
  }

--------------------------------------------------------------------------------
type Requests = HashMap UUID Request

--------------------------------------------------------------------------------
data Registry' =
  Registry'
  { Registry' -> Requests
registryRequests :: !Requests
  , Registry' -> [Waiting]
registryWaitings :: ![Waiting]
  , Registry' -> NominalDiffTime
registryTimeout :: !NominalDiffTime
  , Registry' -> Retry
registryMaxRetry :: !Retry
  }

--------------------------------------------------------------------------------
registryClear :: Registry' -> Registry'
registryClear :: Registry' -> Registry'
registryClear Registry'
reg =
  Registry'
reg
  { registryRequests :: Requests
registryRequests = Requests
forall a. Monoid a => a
mempty
  , registryWaitings :: [Waiting]
registryWaitings = []
  }

--------------------------------------------------------------------------------
data Registry =
  Registry
  { Registry -> IORef Registry'
registryState :: IORef Registry'
  , Registry -> Stopwatch
registryStopwatch :: Stopwatch
  }

--------------------------------------------------------------------------------
-- | I'm bad at naming thing however, we are going to use that datastructure
--  so we could lookup and delete in one single pass.
data Blob a b = Blob a b

--------------------------------------------------------------------------------
instance Functor (Blob a) where
  fmap :: (a -> b) -> Blob a a -> Blob a b
fmap a -> b
f (Blob a
a a
b) = a -> b -> Blob a b
forall a b. a -> b -> Blob a b
Blob a
a (a -> b
f a
b)

--------------------------------------------------------------------------------
registryRemoveRequest
  :: UUID
  -> Registry'
  -> (Maybe Request, Registry')
registryRemoveRequest :: UUID -> Registry' -> (Maybe Request, Registry')
registryRemoveRequest UUID
key Registry'
reg
  = let Blob Maybe Request
result Requests
newMap
          = (Maybe Request -> Blob (Maybe Request) (Maybe Request))
-> UUID -> Requests -> Blob (Maybe Request) Requests
forall (f :: * -> *) k v.
(Functor f, Eq k, Hashable k) =>
(Maybe v -> f (Maybe v)) -> k -> HashMap k v -> f (HashMap k v)
HashMap.alterF Maybe Request -> Blob (Maybe Request) (Maybe Request)
forall a a. Maybe a -> Blob (Maybe a) (Maybe a)
go UUID
key (Registry' -> Requests
registryRequests Registry'
reg)
    in (Maybe Request
result, Registry'
reg { registryRequests :: Requests
registryRequests = Requests
newMap })
  where
    go :: Maybe a -> Blob (Maybe a) (Maybe a)
go Maybe a
Nothing = Maybe a -> Maybe a -> Blob (Maybe a) (Maybe a)
forall a b. a -> b -> Blob a b
Blob Maybe a
forall a. Maybe a
Nothing Maybe a
forall a. Maybe a
Nothing
    go (Just a
e) = Maybe a -> Maybe a -> Blob (Maybe a) (Maybe a)
forall a b. a -> b -> Blob a b
Blob (a -> Maybe a
forall a. a -> Maybe a
Just a
e) Maybe a
forall a. Maybe a
Nothing

--------------------------------------------------------------------------------
registryNew :: NominalDiffTime -> Retry -> IO Registry
registryNew :: NominalDiffTime -> Retry -> IO Registry
registryNew NominalDiffTime
timeout Retry
maxRetry
  = IORef Registry' -> Stopwatch -> Registry
Registry
    (IORef Registry' -> Stopwatch -> Registry)
-> IO (IORef Registry') -> IO (Stopwatch -> Registry)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Registry' -> IO (IORef Registry')
forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef Registry'
state
    IO (Stopwatch -> Registry) -> IO Stopwatch -> IO Registry
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO Stopwatch
forall (m :: * -> *). MonadBase IO m => m Stopwatch
newStopwatch
  where
    state :: Registry'
state =
      Registry' :: Requests -> [Waiting] -> NominalDiffTime -> Retry -> Registry'
Registry'
      { registryRequests :: Requests
registryRequests = Requests
forall a. Monoid a => a
mempty
      , registryWaitings :: [Waiting]
registryWaitings = []
      , registryTimeout :: NominalDiffTime
registryTimeout = NominalDiffTime
timeout
      , registryMaxRetry :: Retry
registryMaxRetry = Retry
maxRetry
      }

--------------------------------------------------------------------------------
registryRegister
  :: Registry
  -> UUID -- Connection Id.
  -> Lifetime
  -> Package
  -> Mailbox
  -> EventStore ()
registryRegister :: Registry -> UUID -> Lifetime -> Package -> Mailbox -> EventStore ()
registryRegister Registry
reg UUID
connId Lifetime
lifetime Package
pkg Mailbox
mailbox
  = do NominalDiffTime
started <- Stopwatch -> EventStore NominalDiffTime
forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed (Registry -> Stopwatch
registryStopwatch Registry
reg)
       IORef Registry' -> (Registry' -> Registry') -> EventStore ()
forall (m :: * -> *) a.
MonadBase IO m =>
IORef a -> (a -> a) -> m ()
modifyIORef' (Registry -> IORef Registry'
registryState Registry
reg) (NominalDiffTime -> Registry' -> Registry'
go NominalDiffTime
started)
  where
    go :: NominalDiffTime -> Registry' -> Registry'
go NominalDiffTime
started Registry'
state =
      let req :: Request
req = Request :: Package
-> UUID -> Int -> NominalDiffTime -> Mailbox -> Lifetime -> Request
Request
                { requestOriginal :: Package
requestOriginal = Package
pkg
                , requestConnId :: UUID
requestConnId = UUID
connId
                , requestRetries :: Int
requestRetries = Int
1
                , requestStarted :: NominalDiffTime
requestStarted = NominalDiffTime
started
                , requestMailbox :: Mailbox
requestMailbox = Mailbox
mailbox
                , requestLifetime :: Lifetime
requestLifetime = Lifetime
lifetime
                }

          correlation :: UUID
correlation = Package -> UUID
packageCorrelation Package
pkg
          nextReqs :: Requests
nextReqs = ContainerKey Requests -> MapValue Requests -> Requests -> Requests
forall map.
IsMap map =>
ContainerKey map -> MapValue map -> map -> map
insertMap ContainerKey Requests
UUID
correlation MapValue Requests
Request
req (Registry' -> Requests
registryRequests Registry'
state)
      in Registry'
state { registryRequests :: Requests
registryRequests = Requests
nextReqs }

--------------------------------------------------------------------------------
registryPostpone
  :: Registry
  -> Mailbox
  -> Lifetime
  -> Package
  -> EventStore ()
registryPostpone :: Registry -> Mailbox -> Lifetime -> Package -> EventStore ()
registryPostpone Registry
reg Mailbox
mailbox Lifetime
lifetime Package
pkg
  = IORef Registry' -> (Registry' -> Registry') -> EventStore ()
forall (m :: * -> *) a.
MonadBase IO m =>
IORef a -> (a -> a) -> m ()
modifyIORef' (Registry -> IORef Registry'
registryState Registry
reg) Registry' -> Registry'
go
  where
    go :: Registry' -> Registry'
go Registry'
state
      = let waiting :: Waiting
waiting
              = Waiting :: Lifetime -> Package -> Mailbox -> Waiting
Waiting
                { waitingLifetime :: Lifetime
waitingLifetime = Lifetime
lifetime
                , waitingPkg :: Package
waitingPkg = Package
pkg
                , waitingMailbox :: Mailbox
waitingMailbox = Mailbox
mailbox
                }

            nextWs :: [Waiting]
nextWs = Waiting
waiting Waiting -> [Waiting] -> [Waiting]
forall a. a -> [a] -> [a]
: Registry' -> [Waiting]
registryWaitings Registry'
state
        in Registry'
state { registryWaitings :: [Waiting]
registryWaitings = [Waiting]
nextWs }

--------------------------------------------------------------------------------
registryHandle
  :: Registry
  -> Package
  -> EventStore (Maybe NodeEndPoints)
registryHandle :: Registry -> Package -> EventStore (Maybe NodeEndPoints)
registryHandle Registry
reg Package
pkg
  = do Registry'
state <- IORef Registry' -> EventStore Registry'
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef (Registry -> IORef Registry'
registryState Registry
reg)
       case UUID -> Registry' -> (Maybe Request, Registry')
registryRemoveRequest (Package -> UUID
packageCorrelation Package
pkg) Registry'
state of
         (Maybe Request
Nothing, Registry'
_)
           -> do Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logWarn [i|No operation associated to package: #{pkg}|]
                 Maybe NodeEndPoints -> EventStore (Maybe NodeEndPoints)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NodeEndPoints
forall a. Maybe a
Nothing
         (Just Request
req, Registry'
stateWithoutReq)
           -> case Package -> Command
packageCmd Package
pkg of
                Command
cmd | Command
cmd Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
badRequestCmd
                  -> do let reason :: Maybe Text
reason = Package -> Maybe Text
packageDataAsText Package
pkg
                        Mailbox -> OperationError -> EventStore ()
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) (Maybe Text -> OperationError
ServerError Maybe Text
reason)
                        Maybe NodeEndPoints -> EventStore (Maybe NodeEndPoints)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NodeEndPoints
forall a. Maybe a
Nothing

                    | Command
cmd Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
notAuthenticatedCmd
                  -> do Mailbox -> OperationError -> EventStore ()
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) OperationError
NotAuthenticatedOp
                        Maybe NodeEndPoints -> EventStore (Maybe NodeEndPoints)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NodeEndPoints
forall a. Maybe a
Nothing

                    | Command
cmd Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
notHandledCmd -- In all cases, we decide to postpone that command.
                  -> do $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logWarn) [i|Not handled response received: #{pkg}.|]
                        let Just NotHandledBuf
msg = ByteString -> Maybe NotHandledBuf
forall a. Decode a => ByteString -> Maybe a
maybeDecodeMessage (ByteString -> Maybe NotHandledBuf)
-> ByteString -> Maybe NotHandledBuf
forall a b. (a -> b) -> a -> b
$ Package -> ByteString
packageData Package
pkg
                            reason :: FieldType
  (Field 1 (RequiredField (Always (Enumeration NotHandledReason))))
reason = Field 1 (RequiredField (Always (Enumeration NotHandledReason)))
-> FieldType
     (Field 1 (RequiredField (Always (Enumeration NotHandledReason))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (RequiredField (Always (Enumeration NotHandledReason)))
 -> FieldType
      (Field 1 (RequiredField (Always (Enumeration NotHandledReason)))))
-> Field 1 (RequiredField (Always (Enumeration NotHandledReason)))
-> FieldType
     (Field 1 (RequiredField (Always (Enumeration NotHandledReason))))
forall a b. (a -> b) -> a -> b
$ NotHandledBuf -> Required 1 (Enumeration NotHandledReason)
notHandledReason NotHandledBuf
msg
                            waiting :: Waiting
waiting = Request -> Waiting
requestToWaiting Request
req
                            nextWs :: [Waiting]
nextWs = Waiting
waiting Waiting -> [Waiting] -> [Waiting]
forall a. a -> [a] -> [a]
: Registry' -> [Waiting]
registryWaitings Registry'
stateWithoutReq
                            finalState :: Registry'
finalState = Registry'
stateWithoutReq { registryWaitings :: [Waiting]
registryWaitings = [Waiting]
nextWs }
                            origCmd :: Command
origCmd = Package -> Command
packageCmd (Request -> Package
requestOriginal Request
req)
                            pkgId :: UUID
pkgId = Package -> UUID
packageCorrelation Package
pkg

                        IORef Registry' -> Registry' -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) Registry'
finalState

                        case FieldType
  (Field 1 (RequiredField (Always (Enumeration NotHandledReason))))
reason of
                          FieldType
  (Field 1 (RequiredField (Always (Enumeration NotHandledReason))))
N_NotMaster
                            -> do let Just MasterInfoBuf
details = Field 2 (OptionalField (Maybe (Message MasterInfoBuf)))
-> FieldType
     (Field 2 (OptionalField (Maybe (Message MasterInfoBuf))))
forall a. HasField a => a -> FieldType a
getField (Field 2 (OptionalField (Maybe (Message MasterInfoBuf)))
 -> FieldType
      (Field 2 (OptionalField (Maybe (Message MasterInfoBuf)))))
-> Field 2 (OptionalField (Maybe (Message MasterInfoBuf)))
-> FieldType
     (Field 2 (OptionalField (Maybe (Message MasterInfoBuf))))
forall a b. (a -> b) -> a -> b
$ NotHandledBuf -> Optional 2 (Message MasterInfoBuf)
notHandledAdditionalInfo NotHandledBuf
msg
                                      info :: MasterInfo
info = MasterInfoBuf -> MasterInfo
masterInfo MasterInfoBuf
details
                                      node :: NodeEndPoints
node = MasterInfo -> NodeEndPoints
masterInfoNodeEndPoints MasterInfo
info

                                  $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logWarn) [i|Received a non master error on command #{origCmd} [#{pkgId}] on #{node}|]
                                  Maybe NodeEndPoints -> EventStore (Maybe NodeEndPoints)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (NodeEndPoints -> Maybe NodeEndPoints
forall a. a -> Maybe a
Just NodeEndPoints
node)

                          FieldType
  (Field 1 (RequiredField (Always (Enumeration NotHandledReason))))
_ -> do $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logWarn) [i|The server has either not started or is too busy. Retrying command #{origCmd} #{pkgId}|]
                                  Maybe NodeEndPoints -> EventStore (Maybe NodeEndPoints)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NodeEndPoints
forall a. Maybe a
Nothing

                    | Bool
otherwise
                  -> do let respCmd :: Command
respCmd = Package -> Command
packageCmd Package
pkg

                        Mailbox -> Package -> EventStore ()
forall (m :: * -> *). MonadBase IO m => Mailbox -> Package -> m ()
mailboxSendPkg (Request -> Mailbox
requestMailbox Request
req) Package
pkg

                        case Request -> Lifetime
requestLifetime Request
req of
                          Lifetime
OneTime
                            -> do IORef Registry' -> Registry' -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) Registry'
stateWithoutReq
                                  Maybe NodeEndPoints -> EventStore (Maybe NodeEndPoints)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NodeEndPoints
forall a. Maybe a
Nothing

                          KeepAlive Command
endCmd
                            | Command
endCmd Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
respCmd
                            -> do IORef Registry' -> Registry' -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) Registry'
stateWithoutReq
                                  Maybe NodeEndPoints -> EventStore (Maybe NodeEndPoints)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NodeEndPoints
forall a. Maybe a
Nothing
                            | Bool
otherwise -- Means we keep the previous state (Subscription).
                            -> Maybe NodeEndPoints -> EventStore (Maybe NodeEndPoints)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NodeEndPoints
forall a. Maybe a
Nothing

--------------------------------------------------------------------------------
data CRState =
  CRState
  { CRState -> Registry'
crsState :: !Registry'
  , CRState -> [Package]
crsPkgs :: ![Package]
  }

--------------------------------------------------------------------------------
crsStateNew :: Registry' -> CRState
crsStateNew :: Registry' -> CRState
crsStateNew Registry'
reg =
  CRState :: Registry' -> [Package] -> CRState
CRState
  { crsState :: Registry'
crsState = Registry'
reg
  , crsPkgs :: [Package]
crsPkgs = []
  }

--------------------------------------------------------------------------------
crsStateDeleteReq :: Request -> CRState -> CRState
crsStateDeleteReq :: Request -> CRState -> CRState
crsStateDeleteReq Request
req CRState
reg
  = let state :: Registry'
state
          = CRState -> Registry'
crsState CRState
reg
        nextReqs :: Requests
nextReqs
          = ContainerKey Requests -> Requests -> Requests
forall map. IsMap map => ContainerKey map -> map -> map
deleteMap
              (Package -> UUID
packageCorrelation (Package -> UUID) -> (Request -> Package) -> Request -> UUID
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> Package
requestOriginal (Request -> UUID) -> Request -> UUID
forall a b. (a -> b) -> a -> b
$ Request
req)
              (Registry' -> Requests
registryRequests Registry'
state)
        nextState :: Registry'
nextState
          = Registry'
state { registryRequests :: Requests
registryRequests = Requests
nextReqs } in
    CRState
reg { crsState :: Registry'
crsState = Registry'
nextState }

--------------------------------------------------------------------------------
crsStateRegisterReq :: Request -> CRState -> CRState
crsStateRegisterReq :: Request -> CRState -> CRState
crsStateRegisterReq Request
req CRState
reg
  = let state :: Registry'
state
          = CRState -> Registry'
crsState CRState
reg
        nextReqs :: Requests
nextReqs
          = ContainerKey Requests -> MapValue Requests -> Requests -> Requests
forall map.
IsMap map =>
ContainerKey map -> MapValue map -> map -> map
insertMap
              (Package -> UUID
packageCorrelation (Package -> UUID) -> (Request -> Package) -> Request -> UUID
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> Package
requestOriginal (Request -> UUID) -> Request -> UUID
forall a b. (a -> b) -> a -> b
$ Request
req)
              MapValue Requests
Request
req
              (Registry' -> Requests
registryRequests Registry'
state)
        nextState :: Registry'
nextState
          = Registry'
state { registryRequests :: Requests
registryRequests = Requests
nextReqs } in
    CRState
reg { crsState :: Registry'
crsState = Registry'
nextState }

--------------------------------------------------------------------------------
crsStateAddPkg :: Package -> CRState -> CRState
crsStateAddPkg :: Package -> CRState -> CRState
crsStateAddPkg Package
pkg CRState
reg
  = let nextPkgs :: [Package]
nextPkgs = Package
pkg Package -> [Package] -> [Package]
forall a. a -> [a] -> [a]
: CRState -> [Package]
crsPkgs CRState
reg in
    CRState
reg { crsPkgs :: [Package]
crsPkgs = [Package]
nextPkgs }

--------------------------------------------------------------------------------
registryCheckAndRetry
  :: Registry
  -> UUID -- Connection id.
  -> EventStore [Package]
registryCheckAndRetry :: Registry -> UUID -> EventStore [Package]
registryCheckAndRetry Registry
reg UUID
connId
  = do Registry'
state <- IORef Registry' -> EventStore Registry'
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef (Registry -> IORef Registry'
registryState Registry
reg)
       NominalDiffTime
elapsed <- Stopwatch -> EventStore NominalDiffTime
forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed (Registry -> Stopwatch
registryStopwatch Registry
reg)
       let reqs :: [(ContainerKey Requests, MapValue Requests)]
reqs = Requests -> [(ContainerKey Requests, MapValue Requests)]
forall map. IsMap map => map -> [(ContainerKey map, MapValue map)]
mapToList (Requests -> [(ContainerKey Requests, MapValue Requests)])
-> Requests -> [(ContainerKey Requests, MapValue Requests)]
forall a b. (a -> b) -> a -> b
$ Registry' -> Requests
registryRequests Registry'
state

       CRState
newState <- (CRState -> (UUID, Request) -> EventStore CRState)
-> CRState -> [(UUID, Request)] -> EventStore CRState
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM (NominalDiffTime -> CRState -> (UUID, Request) -> EventStore CRState
checking NominalDiffTime
elapsed) (Registry' -> CRState
crsStateNew Registry'
state) [(ContainerKey Requests, MapValue Requests)]
[(UUID, Request)]
reqs
       let newStateTemp :: Registry'
newStateTemp = CRState -> Registry'
crsState CRState
newState
           awaitings :: [Waiting]
awaitings = Registry' -> [Waiting]
registryWaitings Registry'
newStateTemp
           tempState :: Registry'
tempState = Registry'
newStateTemp { registryWaitings :: [Waiting]
registryWaitings = [] }
           newCRState :: CRState
newCRState = CRState
newState { crsState :: Registry'
crsState = Registry'
tempState }
           finalState :: CRState
finalState = (CRState -> Waiting -> CRState) -> CRState -> [Waiting] -> CRState
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (NominalDiffTime -> CRState -> Waiting -> CRState
sending NominalDiffTime
elapsed) CRState
newCRState [Waiting]
awaitings

       IORef Registry' -> Registry' -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) (CRState -> Registry'
crsState CRState
finalState)
       [Package] -> EventStore [Package]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (CRState -> [Package]
crsPkgs CRState
finalState)
  where
    checking :: NominalDiffTime -> CRState -> (UUID, Request) -> EventStore CRState
checking NominalDiffTime
elapsed CRState
state (UUID
_, Request
req)
      = do let maxTimeout :: NominalDiffTime
maxTimeout = Registry' -> NominalDiffTime
registryTimeout (Registry' -> NominalDiffTime)
-> (CRState -> Registry') -> CRState -> NominalDiffTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CRState -> Registry'
crsState (CRState -> NominalDiffTime) -> CRState -> NominalDiffTime
forall a b. (a -> b) -> a -> b
$ CRState
state
               hasTimeout :: Bool
hasTimeout = NominalDiffTime
elapsed NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
- (Request -> NominalDiffTime
requestStarted Request
req) NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
>= NominalDiffTime
maxTimeout
               maxRetry :: Retry
maxRetry = Registry' -> Retry
registryMaxRetry (Registry' -> Retry) -> (CRState -> Registry') -> CRState -> Retry
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CRState -> Registry'
crsState (CRState -> Retry) -> CRState -> Retry
forall a b. (a -> b) -> a -> b
$ CRState
state
           if Request -> UUID
requestConnId Request
req UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
/= UUID
connId
             then
               do Mailbox -> OperationError -> EventStore ()
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) OperationError
ConnectionHasDropped
                  CRState -> EventStore CRState
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request -> CRState -> CRState
crsStateDeleteReq Request
req CRState
state)
           else if Bool -> Bool
not (Request -> Bool
requestIsKeepAlive Request
req) Bool -> Bool -> Bool
&& Bool
hasTimeout
             then case Retry
maxRetry of
                    AtMost Int
maxAtt
                      | Request -> Int
requestRetries Request
req Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxAtt
                      -> do let pkg :: Package
pkg = Request -> Package
requestOriginal Request
req
                                pkgId :: UUID
pkgId = Package -> UUID
packageCorrelation Package
pkg
                                cmd :: Command
cmd = Package -> Command
packageCmd Package
pkg

                            $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logError) [i|Command #{cmd} [#{pkgId}] maximum retries threshold reached (#{maxAtt}), aborted!|]
                            Mailbox -> OperationError -> EventStore ()
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) OperationError
Aborted
                            CRState -> EventStore CRState
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request -> CRState -> CRState
crsStateDeleteReq Request
req CRState
state)
                      | Bool
otherwise
                      -> EventStore CRState
retryReq
                    Retry
KeepRetrying
                      -> EventStore CRState
retryReq
           else
             CRState -> EventStore CRState
forall (f :: * -> *) a. Applicative f => a -> f a
pure CRState
state
      where
        retryReq :: EventStore CRState
retryReq
          = do let nextRetries :: Int
nextRetries
                     = Request -> Int
requestRetries Request
req Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
                   nextReq :: Request
nextReq
                     = Request
req
                       { requestRetries :: Int
requestRetries = Int
nextRetries
                       , requestStarted :: NominalDiffTime
requestStarted = NominalDiffTime
elapsed
                       }

                   maxAtt :: Int
maxAtt
                     = case Registry' -> Retry
registryMaxRetry (Registry' -> Retry) -> (CRState -> Registry') -> CRState -> Retry
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CRState -> Registry'
crsState (CRState -> Retry) -> CRState -> Retry
forall a b. (a -> b) -> a -> b
$ CRState
state of
                         AtMost Int
n -> Int
n
                         Retry
KeepRetrying -> Int
forall a. Bounded a => a
maxBound

                   pkg :: Package
pkg = Request -> Package
requestOriginal Request
req
                   cmd :: Command
cmd = Package -> Command
packageCmd Package
pkg
                   pkgId :: UUID
pkgId = Package -> UUID
packageCorrelation Package
pkg

               $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logWarn) [i|Command #{cmd} [#{pkgId} has timeout. Retrying (attempt #{nextRetries}/#{maxAtt})|]

               CRState -> EventStore CRState
forall (f :: * -> *) a. Applicative f => a -> f a
pure (CRState -> EventStore CRState)
-> (CRState -> CRState) -> CRState -> EventStore CRState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> CRState -> CRState
crsStateRegisterReq Request
nextReq (CRState -> CRState) -> (CRState -> CRState) -> CRState -> CRState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Package -> CRState -> CRState
crsStateAddPkg Package
pkg (CRState -> EventStore CRState) -> CRState -> EventStore CRState
forall a b. (a -> b) -> a -> b
$ CRState
state

    sending :: NominalDiffTime -> CRState -> Waiting -> CRState
sending NominalDiffTime
elapsed CRState
state Waiting
w
      = let req :: Request
req = UUID -> NominalDiffTime -> Waiting -> Request
waitingToRequest UUID
connId NominalDiffTime
elapsed Waiting
w
            pkg :: Package
pkg = Request -> Package
requestOriginal Request
req in
        Request -> CRState -> CRState
crsStateRegisterReq Request
req (CRState -> CRState) -> (CRState -> CRState) -> CRState -> CRState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Package -> CRState -> CRState
crsStateAddPkg Package
pkg (CRState -> CRState) -> CRState -> CRState
forall a b. (a -> b) -> a -> b
$ CRState
state

--------------------------------------------------------------------------------
registryAbort :: Registry -> EventStore ()
registryAbort :: Registry -> EventStore ()
registryAbort Registry
reg
  = do Registry'
state <- IORef Registry' -> EventStore Registry'
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef (Registry -> IORef Registry'
registryState Registry
reg)
       IORef Registry' -> Registry' -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) (Registry' -> Registry'
registryClear Registry'
state)

       Requests -> (Request -> EventStore ()) -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (Registry' -> Requests
registryRequests Registry'
state) ((Request -> EventStore ()) -> EventStore ())
-> (Request -> EventStore ()) -> EventStore ()
forall a b. (a -> b) -> a -> b
$ \Request
req
         -> Mailbox -> OperationError -> EventStore ()
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) OperationError
Aborted

       [Waiting] -> (Waiting -> EventStore ()) -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (Registry' -> [Waiting]
registryWaitings Registry'
state) ((Waiting -> EventStore ()) -> EventStore ())
-> (Waiting -> EventStore ()) -> EventStore ()
forall a b. (a -> b) -> a -> b
$ \Waiting
w
         -> Mailbox -> OperationError -> EventStore ()
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Waiting -> Mailbox
waitingMailbox Waiting
w) OperationError
Aborted

--------------------------------------------------------------------------------
maybeDecodeMessage :: Decode a => ByteString -> Maybe a
maybeDecodeMessage :: ByteString -> Maybe a
maybeDecodeMessage ByteString
bytes =
    case Get a -> ByteString -> Either String a
forall a. Get a -> ByteString -> Either String a
runGet Get a
forall a. Decode a => Get a
decodeMessage ByteString
bytes of
        Right a
a -> a -> Maybe a
forall a. a -> Maybe a
Just a
a
        Either String a
_       -> Maybe a
forall a. Maybe a
Nothing