module Database.EventStore.Internal.Manager.Operation.Registry where
import qualified Data.HashMap.Strict as HashMap
import Data.ProtocolBuffers
import Data.Serialize
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.EndPoint
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Operation
import Database.EventStore.Internal.Stopwatch
import Database.EventStore.Internal.Types
data Request =
Request
{ Request -> Package
requestOriginal :: !Package
, Request -> UUID
requestConnId :: !UUID
, Request -> Int
requestRetries :: !Int
, Request -> NominalDiffTime
requestStarted :: !NominalDiffTime
, Request -> Mailbox
requestMailbox :: !Mailbox
, Request -> Lifetime
requestLifetime :: !Lifetime
}
requestIsKeepAlive :: Request -> Bool
requestIsKeepAlive :: Request -> Bool
requestIsKeepAlive Request
req
= case Request -> Lifetime
requestLifetime Request
req of
Lifetime
OneTime -> Bool
False
KeepAlive Command
_ -> Bool
True
requestToWaiting :: Request -> Waiting
requestToWaiting :: Request -> Waiting
requestToWaiting Request
req =
Waiting :: Lifetime -> Package -> Mailbox -> Waiting
Waiting
{ waitingLifetime :: Lifetime
waitingLifetime = Request -> Lifetime
requestLifetime Request
req
, waitingPkg :: Package
waitingPkg = Request -> Package
requestOriginal Request
req
, waitingMailbox :: Mailbox
waitingMailbox = Request -> Mailbox
requestMailbox Request
req
}
waitingToRequest
:: UUID
-> NominalDiffTime
-> Waiting
-> Request
waitingToRequest :: UUID -> NominalDiffTime -> Waiting -> Request
waitingToRequest UUID
connId NominalDiffTime
started Waiting
w
= Request :: Package
-> UUID -> Int -> NominalDiffTime -> Mailbox -> Lifetime -> Request
Request
{ requestOriginal :: Package
requestOriginal = Waiting -> Package
waitingPkg Waiting
w
, requestConnId :: UUID
requestConnId = UUID
connId
, requestRetries :: Int
requestRetries = Int
1
, requestStarted :: NominalDiffTime
requestStarted = NominalDiffTime
started
, requestMailbox :: Mailbox
requestMailbox = Waiting -> Mailbox
waitingMailbox Waiting
w
, requestLifetime :: Lifetime
requestLifetime = Waiting -> Lifetime
waitingLifetime Waiting
w
}
data Waiting =
Waiting
{ Waiting -> Lifetime
waitingLifetime :: !Lifetime
, Waiting -> Package
waitingPkg :: !Package
, Waiting -> Mailbox
waitingMailbox :: !Mailbox
}
type Requests = HashMap UUID Request
data Registry' =
Registry'
{ Registry' -> Requests
registryRequests :: !Requests
, Registry' -> [Waiting]
registryWaitings :: ![Waiting]
, Registry' -> NominalDiffTime
registryTimeout :: !NominalDiffTime
, Registry' -> Retry
registryMaxRetry :: !Retry
}
registryClear :: Registry' -> Registry'
registryClear :: Registry' -> Registry'
registryClear Registry'
reg =
Registry'
reg
{ registryRequests :: Requests
registryRequests = Requests
forall a. Monoid a => a
mempty
, registryWaitings :: [Waiting]
registryWaitings = []
}
data Registry =
Registry
{ Registry -> IORef Registry'
registryState :: IORef Registry'
, Registry -> Stopwatch
registryStopwatch :: Stopwatch
}
data Blob a b = Blob a b
instance Functor (Blob a) where
fmap :: (a -> b) -> Blob a a -> Blob a b
fmap a -> b
f (Blob a
a a
b) = a -> b -> Blob a b
forall a b. a -> b -> Blob a b
Blob a
a (a -> b
f a
b)
registryRemoveRequest
:: UUID
-> Registry'
-> (Maybe Request, Registry')
registryRemoveRequest :: UUID -> Registry' -> (Maybe Request, Registry')
registryRemoveRequest UUID
key Registry'
reg
= let Blob Maybe Request
result Requests
newMap
= (Maybe Request -> Blob (Maybe Request) (Maybe Request))
-> UUID -> Requests -> Blob (Maybe Request) Requests
forall (f :: * -> *) k v.
(Functor f, Eq k, Hashable k) =>
(Maybe v -> f (Maybe v)) -> k -> HashMap k v -> f (HashMap k v)
HashMap.alterF Maybe Request -> Blob (Maybe Request) (Maybe Request)
forall a a. Maybe a -> Blob (Maybe a) (Maybe a)
go UUID
key (Registry' -> Requests
registryRequests Registry'
reg)
in (Maybe Request
result, Registry'
reg { registryRequests :: Requests
registryRequests = Requests
newMap })
where
go :: Maybe a -> Blob (Maybe a) (Maybe a)
go Maybe a
Nothing = Maybe a -> Maybe a -> Blob (Maybe a) (Maybe a)
forall a b. a -> b -> Blob a b
Blob Maybe a
forall a. Maybe a
Nothing Maybe a
forall a. Maybe a
Nothing
go (Just a
e) = Maybe a -> Maybe a -> Blob (Maybe a) (Maybe a)
forall a b. a -> b -> Blob a b
Blob (a -> Maybe a
forall a. a -> Maybe a
Just a
e) Maybe a
forall a. Maybe a
Nothing
registryNew :: NominalDiffTime -> Retry -> IO Registry
registryNew :: NominalDiffTime -> Retry -> IO Registry
registryNew NominalDiffTime
timeout Retry
maxRetry
= IORef Registry' -> Stopwatch -> Registry
Registry
(IORef Registry' -> Stopwatch -> Registry)
-> IO (IORef Registry') -> IO (Stopwatch -> Registry)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Registry' -> IO (IORef Registry')
forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef Registry'
state
IO (Stopwatch -> Registry) -> IO Stopwatch -> IO Registry
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO Stopwatch
forall (m :: * -> *). MonadBase IO m => m Stopwatch
newStopwatch
where
state :: Registry'
state =
Registry' :: Requests -> [Waiting] -> NominalDiffTime -> Retry -> Registry'
Registry'
{ registryRequests :: Requests
registryRequests = Requests
forall a. Monoid a => a
mempty
, registryWaitings :: [Waiting]
registryWaitings = []
, registryTimeout :: NominalDiffTime
registryTimeout = NominalDiffTime
timeout
, registryMaxRetry :: Retry
registryMaxRetry = Retry
maxRetry
}
registryRegister
:: Registry
-> UUID
-> Lifetime
-> Package
-> Mailbox
-> EventStore ()
registryRegister :: Registry -> UUID -> Lifetime -> Package -> Mailbox -> EventStore ()
registryRegister Registry
reg UUID
connId Lifetime
lifetime Package
pkg Mailbox
mailbox
= do NominalDiffTime
started <- Stopwatch -> EventStore NominalDiffTime
forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed (Registry -> Stopwatch
registryStopwatch Registry
reg)
IORef Registry' -> (Registry' -> Registry') -> EventStore ()
forall (m :: * -> *) a.
MonadBase IO m =>
IORef a -> (a -> a) -> m ()
modifyIORef' (Registry -> IORef Registry'
registryState Registry
reg) (NominalDiffTime -> Registry' -> Registry'
go NominalDiffTime
started)
where
go :: NominalDiffTime -> Registry' -> Registry'
go NominalDiffTime
started Registry'
state =
let req :: Request
req = Request :: Package
-> UUID -> Int -> NominalDiffTime -> Mailbox -> Lifetime -> Request
Request
{ requestOriginal :: Package
requestOriginal = Package
pkg
, requestConnId :: UUID
requestConnId = UUID
connId
, requestRetries :: Int
requestRetries = Int
1
, requestStarted :: NominalDiffTime
requestStarted = NominalDiffTime
started
, requestMailbox :: Mailbox
requestMailbox = Mailbox
mailbox
, requestLifetime :: Lifetime
requestLifetime = Lifetime
lifetime
}
correlation :: UUID
correlation = Package -> UUID
packageCorrelation Package
pkg
nextReqs :: Requests
nextReqs = ContainerKey Requests -> MapValue Requests -> Requests -> Requests
forall map.
IsMap map =>
ContainerKey map -> MapValue map -> map -> map
insertMap ContainerKey Requests
UUID
correlation MapValue Requests
Request
req (Registry' -> Requests
registryRequests Registry'
state)
in Registry'
state { registryRequests :: Requests
registryRequests = Requests
nextReqs }
registryPostpone
:: Registry
-> Mailbox
-> Lifetime
-> Package
-> EventStore ()
registryPostpone :: Registry -> Mailbox -> Lifetime -> Package -> EventStore ()
registryPostpone Registry
reg Mailbox
mailbox Lifetime
lifetime Package
pkg
= IORef Registry' -> (Registry' -> Registry') -> EventStore ()
forall (m :: * -> *) a.
MonadBase IO m =>
IORef a -> (a -> a) -> m ()
modifyIORef' (Registry -> IORef Registry'
registryState Registry
reg) Registry' -> Registry'
go
where
go :: Registry' -> Registry'
go Registry'
state
= let waiting :: Waiting
waiting
= Waiting :: Lifetime -> Package -> Mailbox -> Waiting
Waiting
{ waitingLifetime :: Lifetime
waitingLifetime = Lifetime
lifetime
, waitingPkg :: Package
waitingPkg = Package
pkg
, waitingMailbox :: Mailbox
waitingMailbox = Mailbox
mailbox
}
nextWs :: [Waiting]
nextWs = Waiting
waiting Waiting -> [Waiting] -> [Waiting]
forall a. a -> [a] -> [a]
: Registry' -> [Waiting]
registryWaitings Registry'
state
in Registry'
state { registryWaitings :: [Waiting]
registryWaitings = [Waiting]
nextWs }
registryHandle
:: Registry
-> Package
-> EventStore (Maybe NodeEndPoints)
registryHandle :: Registry -> Package -> EventStore (Maybe NodeEndPoints)
registryHandle Registry
reg Package
pkg
= do Registry'
state <- IORef Registry' -> EventStore Registry'
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef (Registry -> IORef Registry'
registryState Registry
reg)
case UUID -> Registry' -> (Maybe Request, Registry')
registryRemoveRequest (Package -> UUID
packageCorrelation Package
pkg) Registry'
state of
(Maybe Request
Nothing, Registry'
_)
-> do Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logWarn [i|No operation associated to package: #{pkg}|]
Maybe NodeEndPoints -> EventStore (Maybe NodeEndPoints)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NodeEndPoints
forall a. Maybe a
Nothing
(Just Request
req, Registry'
stateWithoutReq)
-> case Package -> Command
packageCmd Package
pkg of
Command
cmd | Command
cmd Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
badRequestCmd
-> do let reason :: Maybe Text
reason = Package -> Maybe Text
packageDataAsText Package
pkg
Mailbox -> OperationError -> EventStore ()
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) (Maybe Text -> OperationError
ServerError Maybe Text
reason)
Maybe NodeEndPoints -> EventStore (Maybe NodeEndPoints)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NodeEndPoints
forall a. Maybe a
Nothing
| Command
cmd Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
notAuthenticatedCmd
-> do Mailbox -> OperationError -> EventStore ()
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) OperationError
NotAuthenticatedOp
Maybe NodeEndPoints -> EventStore (Maybe NodeEndPoints)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NodeEndPoints
forall a. Maybe a
Nothing
| Command
cmd Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
notHandledCmd
-> do $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logWarn) [i|Not handled response received: #{pkg}.|]
let Just NotHandledBuf
msg = ByteString -> Maybe NotHandledBuf
forall a. Decode a => ByteString -> Maybe a
maybeDecodeMessage (ByteString -> Maybe NotHandledBuf)
-> ByteString -> Maybe NotHandledBuf
forall a b. (a -> b) -> a -> b
$ Package -> ByteString
packageData Package
pkg
reason :: FieldType
(Field 1 (RequiredField (Always (Enumeration NotHandledReason))))
reason = Field 1 (RequiredField (Always (Enumeration NotHandledReason)))
-> FieldType
(Field 1 (RequiredField (Always (Enumeration NotHandledReason))))
forall a. HasField a => a -> FieldType a
getField (Field 1 (RequiredField (Always (Enumeration NotHandledReason)))
-> FieldType
(Field 1 (RequiredField (Always (Enumeration NotHandledReason)))))
-> Field 1 (RequiredField (Always (Enumeration NotHandledReason)))
-> FieldType
(Field 1 (RequiredField (Always (Enumeration NotHandledReason))))
forall a b. (a -> b) -> a -> b
$ NotHandledBuf -> Required 1 (Enumeration NotHandledReason)
notHandledReason NotHandledBuf
msg
waiting :: Waiting
waiting = Request -> Waiting
requestToWaiting Request
req
nextWs :: [Waiting]
nextWs = Waiting
waiting Waiting -> [Waiting] -> [Waiting]
forall a. a -> [a] -> [a]
: Registry' -> [Waiting]
registryWaitings Registry'
stateWithoutReq
finalState :: Registry'
finalState = Registry'
stateWithoutReq { registryWaitings :: [Waiting]
registryWaitings = [Waiting]
nextWs }
origCmd :: Command
origCmd = Package -> Command
packageCmd (Request -> Package
requestOriginal Request
req)
pkgId :: UUID
pkgId = Package -> UUID
packageCorrelation Package
pkg
IORef Registry' -> Registry' -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) Registry'
finalState
case FieldType
(Field 1 (RequiredField (Always (Enumeration NotHandledReason))))
reason of
FieldType
(Field 1 (RequiredField (Always (Enumeration NotHandledReason))))
N_NotMaster
-> do let Just MasterInfoBuf
details = Field 2 (OptionalField (Maybe (Message MasterInfoBuf)))
-> FieldType
(Field 2 (OptionalField (Maybe (Message MasterInfoBuf))))
forall a. HasField a => a -> FieldType a
getField (Field 2 (OptionalField (Maybe (Message MasterInfoBuf)))
-> FieldType
(Field 2 (OptionalField (Maybe (Message MasterInfoBuf)))))
-> Field 2 (OptionalField (Maybe (Message MasterInfoBuf)))
-> FieldType
(Field 2 (OptionalField (Maybe (Message MasterInfoBuf))))
forall a b. (a -> b) -> a -> b
$ NotHandledBuf -> Optional 2 (Message MasterInfoBuf)
notHandledAdditionalInfo NotHandledBuf
msg
info :: MasterInfo
info = MasterInfoBuf -> MasterInfo
masterInfo MasterInfoBuf
details
node :: NodeEndPoints
node = MasterInfo -> NodeEndPoints
masterInfoNodeEndPoints MasterInfo
info
$(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logWarn) [i|Received a non master error on command #{origCmd} [#{pkgId}] on #{node}|]
Maybe NodeEndPoints -> EventStore (Maybe NodeEndPoints)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (NodeEndPoints -> Maybe NodeEndPoints
forall a. a -> Maybe a
Just NodeEndPoints
node)
FieldType
(Field 1 (RequiredField (Always (Enumeration NotHandledReason))))
_ -> do $(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logWarn) [i|The server has either not started or is too busy. Retrying command #{origCmd} #{pkgId}|]
Maybe NodeEndPoints -> EventStore (Maybe NodeEndPoints)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NodeEndPoints
forall a. Maybe a
Nothing
| Bool
otherwise
-> do let respCmd :: Command
respCmd = Package -> Command
packageCmd Package
pkg
Mailbox -> Package -> EventStore ()
forall (m :: * -> *). MonadBase IO m => Mailbox -> Package -> m ()
mailboxSendPkg (Request -> Mailbox
requestMailbox Request
req) Package
pkg
case Request -> Lifetime
requestLifetime Request
req of
Lifetime
OneTime
-> do IORef Registry' -> Registry' -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) Registry'
stateWithoutReq
Maybe NodeEndPoints -> EventStore (Maybe NodeEndPoints)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NodeEndPoints
forall a. Maybe a
Nothing
KeepAlive Command
endCmd
| Command
endCmd Command -> Command -> Bool
forall a. Eq a => a -> a -> Bool
== Command
respCmd
-> do IORef Registry' -> Registry' -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) Registry'
stateWithoutReq
Maybe NodeEndPoints -> EventStore (Maybe NodeEndPoints)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NodeEndPoints
forall a. Maybe a
Nothing
| Bool
otherwise
-> Maybe NodeEndPoints -> EventStore (Maybe NodeEndPoints)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe NodeEndPoints
forall a. Maybe a
Nothing
data CRState =
CRState
{ :: !Registry'
, CRState -> [Package]
crsPkgs :: ![Package]
}
crsStateNew :: Registry' -> CRState
Registry'
reg =
CRState :: Registry' -> [Package] -> CRState
CRState
{ crsState :: Registry'
crsState = Registry'
reg
, crsPkgs :: [Package]
crsPkgs = []
}
crsStateDeleteReq :: Request -> CRState -> CRState
Request
req CRState
reg
= let state :: Registry'
state
= CRState -> Registry'
crsState CRState
reg
nextReqs :: Requests
nextReqs
= ContainerKey Requests -> Requests -> Requests
forall map. IsMap map => ContainerKey map -> map -> map
deleteMap
(Package -> UUID
packageCorrelation (Package -> UUID) -> (Request -> Package) -> Request -> UUID
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> Package
requestOriginal (Request -> UUID) -> Request -> UUID
forall a b. (a -> b) -> a -> b
$ Request
req)
(Registry' -> Requests
registryRequests Registry'
state)
nextState :: Registry'
nextState
= Registry'
state { registryRequests :: Requests
registryRequests = Requests
nextReqs } in
CRState
reg { crsState :: Registry'
crsState = Registry'
nextState }
crsStateRegisterReq :: Request -> CRState -> CRState
Request
req CRState
reg
= let state :: Registry'
state
= CRState -> Registry'
crsState CRState
reg
nextReqs :: Requests
nextReqs
= ContainerKey Requests -> MapValue Requests -> Requests -> Requests
forall map.
IsMap map =>
ContainerKey map -> MapValue map -> map -> map
insertMap
(Package -> UUID
packageCorrelation (Package -> UUID) -> (Request -> Package) -> Request -> UUID
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> Package
requestOriginal (Request -> UUID) -> Request -> UUID
forall a b. (a -> b) -> a -> b
$ Request
req)
MapValue Requests
Request
req
(Registry' -> Requests
registryRequests Registry'
state)
nextState :: Registry'
nextState
= Registry'
state { registryRequests :: Requests
registryRequests = Requests
nextReqs } in
CRState
reg { crsState :: Registry'
crsState = Registry'
nextState }
crsStateAddPkg :: Package -> CRState -> CRState
Package
pkg CRState
reg
= let nextPkgs :: [Package]
nextPkgs = Package
pkg Package -> [Package] -> [Package]
forall a. a -> [a] -> [a]
: CRState -> [Package]
crsPkgs CRState
reg in
CRState
reg { crsPkgs :: [Package]
crsPkgs = [Package]
nextPkgs }
registryCheckAndRetry
:: Registry
-> UUID
-> EventStore [Package]
registryCheckAndRetry :: Registry -> UUID -> EventStore [Package]
registryCheckAndRetry Registry
reg UUID
connId
= do Registry'
state <- IORef Registry' -> EventStore Registry'
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef (Registry -> IORef Registry'
registryState Registry
reg)
NominalDiffTime
elapsed <- Stopwatch -> EventStore NominalDiffTime
forall (m :: * -> *).
MonadBaseControl IO m =>
Stopwatch -> m NominalDiffTime
stopwatchElapsed (Registry -> Stopwatch
registryStopwatch Registry
reg)
let reqs :: [(ContainerKey Requests, MapValue Requests)]
reqs = Requests -> [(ContainerKey Requests, MapValue Requests)]
forall map. IsMap map => map -> [(ContainerKey map, MapValue map)]
mapToList (Requests -> [(ContainerKey Requests, MapValue Requests)])
-> Requests -> [(ContainerKey Requests, MapValue Requests)]
forall a b. (a -> b) -> a -> b
$ Registry' -> Requests
registryRequests Registry'
state
CRState
newState <- (CRState -> (UUID, Request) -> EventStore CRState)
-> CRState -> [(UUID, Request)] -> EventStore CRState
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM (NominalDiffTime -> CRState -> (UUID, Request) -> EventStore CRState
checking NominalDiffTime
elapsed) (Registry' -> CRState
crsStateNew Registry'
state) [(ContainerKey Requests, MapValue Requests)]
[(UUID, Request)]
reqs
let newStateTemp :: Registry'
newStateTemp = CRState -> Registry'
crsState CRState
newState
awaitings :: [Waiting]
awaitings = Registry' -> [Waiting]
registryWaitings Registry'
newStateTemp
tempState :: Registry'
tempState = Registry'
newStateTemp { registryWaitings :: [Waiting]
registryWaitings = [] }
newCRState :: CRState
newCRState = CRState
newState { crsState :: Registry'
crsState = Registry'
tempState }
finalState :: CRState
finalState = (CRState -> Waiting -> CRState) -> CRState -> [Waiting] -> CRState
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (NominalDiffTime -> CRState -> Waiting -> CRState
sending NominalDiffTime
elapsed) CRState
newCRState [Waiting]
awaitings
IORef Registry' -> Registry' -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) (CRState -> Registry'
crsState CRState
finalState)
[Package] -> EventStore [Package]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (CRState -> [Package]
crsPkgs CRState
finalState)
where
checking :: NominalDiffTime -> CRState -> (UUID, Request) -> EventStore CRState
checking NominalDiffTime
elapsed CRState
state (UUID
_, Request
req)
= do let maxTimeout :: NominalDiffTime
maxTimeout = Registry' -> NominalDiffTime
registryTimeout (Registry' -> NominalDiffTime)
-> (CRState -> Registry') -> CRState -> NominalDiffTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CRState -> Registry'
crsState (CRState -> NominalDiffTime) -> CRState -> NominalDiffTime
forall a b. (a -> b) -> a -> b
$ CRState
state
hasTimeout :: Bool
hasTimeout = NominalDiffTime
elapsed NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
- (Request -> NominalDiffTime
requestStarted Request
req) NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
>= NominalDiffTime
maxTimeout
maxRetry :: Retry
maxRetry = Registry' -> Retry
registryMaxRetry (Registry' -> Retry) -> (CRState -> Registry') -> CRState -> Retry
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CRState -> Registry'
crsState (CRState -> Retry) -> CRState -> Retry
forall a b. (a -> b) -> a -> b
$ CRState
state
if Request -> UUID
requestConnId Request
req UUID -> UUID -> Bool
forall a. Eq a => a -> a -> Bool
/= UUID
connId
then
do Mailbox -> OperationError -> EventStore ()
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) OperationError
ConnectionHasDropped
CRState -> EventStore CRState
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request -> CRState -> CRState
crsStateDeleteReq Request
req CRState
state)
else if Bool -> Bool
not (Request -> Bool
requestIsKeepAlive Request
req) Bool -> Bool -> Bool
&& Bool
hasTimeout
then case Retry
maxRetry of
AtMost Int
maxAtt
| Request -> Int
requestRetries Request
req Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxAtt
-> do let pkg :: Package
pkg = Request -> Package
requestOriginal Request
req
pkgId :: UUID
pkgId = Package -> UUID
packageCorrelation Package
pkg
cmd :: Command
cmd = Package -> Command
packageCmd Package
pkg
$(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logError) [i|Command #{cmd} [#{pkgId}] maximum retries threshold reached (#{maxAtt}), aborted!|]
Mailbox -> OperationError -> EventStore ()
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) OperationError
Aborted
CRState -> EventStore CRState
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Request -> CRState -> CRState
crsStateDeleteReq Request
req CRState
state)
| Bool
otherwise
-> EventStore CRState
retryReq
Retry
KeepRetrying
-> EventStore CRState
retryReq
else
CRState -> EventStore CRState
forall (f :: * -> *) a. Applicative f => a -> f a
pure CRState
state
where
retryReq :: EventStore CRState
retryReq
= do let nextRetries :: Int
nextRetries
= Request -> Int
requestRetries Request
req Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
nextReq :: Request
nextReq
= Request
req
{ requestRetries :: Int
requestRetries = Int
nextRetries
, requestStarted :: NominalDiffTime
requestStarted = NominalDiffTime
elapsed
}
maxAtt :: Int
maxAtt
= case Registry' -> Retry
registryMaxRetry (Registry' -> Retry) -> (CRState -> Registry') -> CRState -> Retry
forall b c a. (b -> c) -> (a -> b) -> a -> c
. CRState -> Registry'
crsState (CRState -> Retry) -> CRState -> Retry
forall a b. (a -> b) -> a -> b
$ CRState
state of
AtMost Int
n -> Int
n
Retry
KeepRetrying -> Int
forall a. Bounded a => a
maxBound
pkg :: Package
pkg = Request -> Package
requestOriginal Request
req
cmd :: Command
cmd = Package -> Command
packageCmd Package
pkg
pkgId :: UUID
pkgId = Package -> UUID
packageCorrelation Package
pkg
$(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logWarn) [i|Command #{cmd} [#{pkgId} has timeout. Retrying (attempt #{nextRetries}/#{maxAtt})|]
CRState -> EventStore CRState
forall (f :: * -> *) a. Applicative f => a -> f a
pure (CRState -> EventStore CRState)
-> (CRState -> CRState) -> CRState -> EventStore CRState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Request -> CRState -> CRState
crsStateRegisterReq Request
nextReq (CRState -> CRState) -> (CRState -> CRState) -> CRState -> CRState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Package -> CRState -> CRState
crsStateAddPkg Package
pkg (CRState -> EventStore CRState) -> CRState -> EventStore CRState
forall a b. (a -> b) -> a -> b
$ CRState
state
sending :: NominalDiffTime -> CRState -> Waiting -> CRState
sending NominalDiffTime
elapsed CRState
state Waiting
w
= let req :: Request
req = UUID -> NominalDiffTime -> Waiting -> Request
waitingToRequest UUID
connId NominalDiffTime
elapsed Waiting
w
pkg :: Package
pkg = Request -> Package
requestOriginal Request
req in
Request -> CRState -> CRState
crsStateRegisterReq Request
req (CRState -> CRState) -> (CRState -> CRState) -> CRState -> CRState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Package -> CRState -> CRState
crsStateAddPkg Package
pkg (CRState -> CRState) -> CRState -> CRState
forall a b. (a -> b) -> a -> b
$ CRState
state
registryAbort :: Registry -> EventStore ()
registryAbort :: Registry -> EventStore ()
registryAbort Registry
reg
= do Registry'
state <- IORef Registry' -> EventStore Registry'
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef (Registry -> IORef Registry'
registryState Registry
reg)
IORef Registry' -> Registry' -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef (Registry -> IORef Registry'
registryState Registry
reg) (Registry' -> Registry'
registryClear Registry'
state)
Requests -> (Request -> EventStore ()) -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (Registry' -> Requests
registryRequests Registry'
state) ((Request -> EventStore ()) -> EventStore ())
-> (Request -> EventStore ()) -> EventStore ()
forall a b. (a -> b) -> a -> b
$ \Request
req
-> Mailbox -> OperationError -> EventStore ()
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Request -> Mailbox
requestMailbox Request
req) OperationError
Aborted
[Waiting] -> (Waiting -> EventStore ()) -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (Registry' -> [Waiting]
registryWaitings Registry'
state) ((Waiting -> EventStore ()) -> EventStore ())
-> (Waiting -> EventStore ()) -> EventStore ()
forall a b. (a -> b) -> a -> b
$ \Waiting
w
-> Mailbox -> OperationError -> EventStore ()
forall (m :: * -> *).
MonadBase IO m =>
Mailbox -> OperationError -> m ()
mailboxFail (Waiting -> Mailbox
waitingMailbox Waiting
w) OperationError
Aborted
maybeDecodeMessage :: Decode a => ByteString -> Maybe a
maybeDecodeMessage :: ByteString -> Maybe a
maybeDecodeMessage ByteString
bytes =
case Get a -> ByteString -> Either String a
forall a. Get a -> ByteString -> Either String a
runGet Get a
forall a. Decode a => Get a
decodeMessage ByteString
bytes of
Right a
a -> a -> Maybe a
forall a. a -> Maybe a
Just a
a
Either String a
_ -> Maybe a
forall a. Maybe a
Nothing