{-# OPTIONS_GHC -Wno-missing-signatures #-}
{-# LANGUAGE GeneralizedNewtypeDeriving, TemplateHaskell #-}

module Pulsar.AppState where

import           Control.Concurrent.Async       ( Async )
import           Control.Concurrent.Chan
import           Control.Concurrent.MVar
import           Control.Monad.IO.Class
import qualified Data.Binary                   as B
import           Data.Foldable                  ( traverse_ )
import           Data.IORef
import           Lens.Family
import           Lens.Family.TH
import           Pulsar.Protocol.Frame          ( Response(..) )

newtype ReqId = ReqId B.Word64 deriving (ReqId -> ReqId -> Bool
(ReqId -> ReqId -> Bool) -> (ReqId -> ReqId -> Bool) -> Eq ReqId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ReqId -> ReqId -> Bool
$c/= :: ReqId -> ReqId -> Bool
== :: ReqId -> ReqId -> Bool
$c== :: ReqId -> ReqId -> Bool
Eq, Integer -> ReqId
ReqId -> ReqId
ReqId -> ReqId -> ReqId
(ReqId -> ReqId -> ReqId)
-> (ReqId -> ReqId -> ReqId)
-> (ReqId -> ReqId -> ReqId)
-> (ReqId -> ReqId)
-> (ReqId -> ReqId)
-> (ReqId -> ReqId)
-> (Integer -> ReqId)
-> Num ReqId
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> ReqId
$cfromInteger :: Integer -> ReqId
signum :: ReqId -> ReqId
$csignum :: ReqId -> ReqId
abs :: ReqId -> ReqId
$cabs :: ReqId -> ReqId
negate :: ReqId -> ReqId
$cnegate :: ReqId -> ReqId
* :: ReqId -> ReqId -> ReqId
$c* :: ReqId -> ReqId -> ReqId
- :: ReqId -> ReqId -> ReqId
$c- :: ReqId -> ReqId -> ReqId
+ :: ReqId -> ReqId -> ReqId
$c+ :: ReqId -> ReqId -> ReqId
Num, Int -> ReqId -> ShowS
[ReqId] -> ShowS
ReqId -> String
(Int -> ReqId -> ShowS)
-> (ReqId -> String) -> ([ReqId] -> ShowS) -> Show ReqId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReqId] -> ShowS
$cshowList :: [ReqId] -> ShowS
show :: ReqId -> String
$cshow :: ReqId -> String
showsPrec :: Int -> ReqId -> ShowS
$cshowsPrec :: Int -> ReqId -> ShowS
Show)
newtype SeqId = SeqId B.Word64 deriving (SeqId -> SeqId -> Bool
(SeqId -> SeqId -> Bool) -> (SeqId -> SeqId -> Bool) -> Eq SeqId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SeqId -> SeqId -> Bool
$c/= :: SeqId -> SeqId -> Bool
== :: SeqId -> SeqId -> Bool
$c== :: SeqId -> SeqId -> Bool
Eq, Integer -> SeqId
SeqId -> SeqId
SeqId -> SeqId -> SeqId
(SeqId -> SeqId -> SeqId)
-> (SeqId -> SeqId -> SeqId)
-> (SeqId -> SeqId -> SeqId)
-> (SeqId -> SeqId)
-> (SeqId -> SeqId)
-> (SeqId -> SeqId)
-> (Integer -> SeqId)
-> Num SeqId
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> SeqId
$cfromInteger :: Integer -> SeqId
signum :: SeqId -> SeqId
$csignum :: SeqId -> SeqId
abs :: SeqId -> SeqId
$cabs :: SeqId -> SeqId
negate :: SeqId -> SeqId
$cnegate :: SeqId -> SeqId
* :: SeqId -> SeqId -> SeqId
$c* :: SeqId -> SeqId -> SeqId
- :: SeqId -> SeqId -> SeqId
$c- :: SeqId -> SeqId -> SeqId
+ :: SeqId -> SeqId -> SeqId
$c+ :: SeqId -> SeqId -> SeqId
Num, Int -> SeqId -> ShowS
[SeqId] -> ShowS
SeqId -> String
(Int -> SeqId -> ShowS)
-> (SeqId -> String) -> ([SeqId] -> ShowS) -> Show SeqId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SeqId] -> ShowS
$cshowList :: [SeqId] -> ShowS
show :: SeqId -> String
$cshow :: SeqId -> String
showsPrec :: Int -> SeqId -> ShowS
$cshowsPrec :: Int -> SeqId -> ShowS
Show)
newtype ProducerId = PId B.Word64 deriving (ProducerId -> ProducerId -> Bool
(ProducerId -> ProducerId -> Bool)
-> (ProducerId -> ProducerId -> Bool) -> Eq ProducerId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ProducerId -> ProducerId -> Bool
$c/= :: ProducerId -> ProducerId -> Bool
== :: ProducerId -> ProducerId -> Bool
$c== :: ProducerId -> ProducerId -> Bool
Eq, Integer -> ProducerId
ProducerId -> ProducerId
ProducerId -> ProducerId -> ProducerId
(ProducerId -> ProducerId -> ProducerId)
-> (ProducerId -> ProducerId -> ProducerId)
-> (ProducerId -> ProducerId -> ProducerId)
-> (ProducerId -> ProducerId)
-> (ProducerId -> ProducerId)
-> (ProducerId -> ProducerId)
-> (Integer -> ProducerId)
-> Num ProducerId
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> ProducerId
$cfromInteger :: Integer -> ProducerId
signum :: ProducerId -> ProducerId
$csignum :: ProducerId -> ProducerId
abs :: ProducerId -> ProducerId
$cabs :: ProducerId -> ProducerId
negate :: ProducerId -> ProducerId
$cnegate :: ProducerId -> ProducerId
* :: ProducerId -> ProducerId -> ProducerId
$c* :: ProducerId -> ProducerId -> ProducerId
- :: ProducerId -> ProducerId -> ProducerId
$c- :: ProducerId -> ProducerId -> ProducerId
+ :: ProducerId -> ProducerId -> ProducerId
$c+ :: ProducerId -> ProducerId -> ProducerId
Num, Int -> ProducerId -> ShowS
[ProducerId] -> ShowS
ProducerId -> String
(Int -> ProducerId -> ShowS)
-> (ProducerId -> String)
-> ([ProducerId] -> ShowS)
-> Show ProducerId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ProducerId] -> ShowS
$cshowList :: [ProducerId] -> ShowS
show :: ProducerId -> String
$cshow :: ProducerId -> String
showsPrec :: Int -> ProducerId -> ShowS
$cshowsPrec :: Int -> ProducerId -> ShowS
Show)
newtype ConsumerId = CId B.Word64 deriving (ConsumerId -> ConsumerId -> Bool
(ConsumerId -> ConsumerId -> Bool)
-> (ConsumerId -> ConsumerId -> Bool) -> Eq ConsumerId
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConsumerId -> ConsumerId -> Bool
$c/= :: ConsumerId -> ConsumerId -> Bool
== :: ConsumerId -> ConsumerId -> Bool
$c== :: ConsumerId -> ConsumerId -> Bool
Eq, Integer -> ConsumerId
ConsumerId -> ConsumerId
ConsumerId -> ConsumerId -> ConsumerId
(ConsumerId -> ConsumerId -> ConsumerId)
-> (ConsumerId -> ConsumerId -> ConsumerId)
-> (ConsumerId -> ConsumerId -> ConsumerId)
-> (ConsumerId -> ConsumerId)
-> (ConsumerId -> ConsumerId)
-> (ConsumerId -> ConsumerId)
-> (Integer -> ConsumerId)
-> Num ConsumerId
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> ConsumerId
$cfromInteger :: Integer -> ConsumerId
signum :: ConsumerId -> ConsumerId
$csignum :: ConsumerId -> ConsumerId
abs :: ConsumerId -> ConsumerId
$cabs :: ConsumerId -> ConsumerId
negate :: ConsumerId -> ConsumerId
$cnegate :: ConsumerId -> ConsumerId
* :: ConsumerId -> ConsumerId -> ConsumerId
$c* :: ConsumerId -> ConsumerId -> ConsumerId
- :: ConsumerId -> ConsumerId -> ConsumerId
$c- :: ConsumerId -> ConsumerId -> ConsumerId
+ :: ConsumerId -> ConsumerId -> ConsumerId
$c+ :: ConsumerId -> ConsumerId -> ConsumerId
Num, Int -> ConsumerId -> ShowS
[ConsumerId] -> ShowS
ConsumerId -> String
(Int -> ConsumerId -> ShowS)
-> (ConsumerId -> String)
-> ([ConsumerId] -> ShowS)
-> Show ConsumerId
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConsumerId] -> ShowS
$cshowList :: [ConsumerId] -> ShowS
show :: ConsumerId -> String
$cshow :: ConsumerId -> String
showsPrec :: Int -> ConsumerId -> ShowS
$cshowsPrec :: Int -> ConsumerId -> ShowS
Show)

newtype Permits = Permits B.Word32 deriving (Permits -> Permits -> Bool
(Permits -> Permits -> Bool)
-> (Permits -> Permits -> Bool) -> Eq Permits
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Permits -> Permits -> Bool
$c/= :: Permits -> Permits -> Bool
== :: Permits -> Permits -> Bool
$c== :: Permits -> Permits -> Bool
Eq, Integer -> Permits
Permits -> Permits
Permits -> Permits -> Permits
(Permits -> Permits -> Permits)
-> (Permits -> Permits -> Permits)
-> (Permits -> Permits -> Permits)
-> (Permits -> Permits)
-> (Permits -> Permits)
-> (Permits -> Permits)
-> (Integer -> Permits)
-> Num Permits
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> Permits
$cfromInteger :: Integer -> Permits
signum :: Permits -> Permits
$csignum :: Permits -> Permits
abs :: Permits -> Permits
$cabs :: Permits -> Permits
negate :: Permits -> Permits
$cnegate :: Permits -> Permits
* :: Permits -> Permits -> Permits
$c* :: Permits -> Permits -> Permits
- :: Permits -> Permits -> Permits
$c- :: Permits -> Permits -> Permits
+ :: Permits -> Permits -> Permits
$c+ :: Permits -> Permits -> Permits
Num, Int -> Permits -> ShowS
[Permits] -> ShowS
Permits -> String
(Int -> Permits -> ShowS)
-> (Permits -> String) -> ([Permits] -> ShowS) -> Show Permits
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Permits] -> ShowS
$cshowList :: [Permits] -> ShowS
show :: Permits -> String
$cshow :: Permits -> String
showsPrec :: Int -> Permits -> ShowS
$cshowsPrec :: Int -> Permits -> ShowS
Show)

{- | It represents a running worker in the background along with a synchronizer. -}
type Worker = (Async (), MVar ())

{- | It represents a list of registered sequence ids for a producer. -}
type ProducerSeqs = [(SeqId, MVar Response)]

data AppState = AppState
  { AppState -> [(ConsumerId, Chan Response)]
_appConsumers :: [(ConsumerId, Chan Response)]    -- a list of consumer identifiers associated with a communication channel
  , AppState -> ConsumerId
_appConsumerId :: ConsumerId                      -- an incremental counter to assign unique consumer ids
  , AppState -> ProducerId
_appProducerId :: ProducerId                      -- an incremental counter to assign unique producer ids
  , AppState -> ReqId
_appRequestId :: ReqId                            -- an incremental counter to assign unique request ids for all commands
  , AppState -> [Worker]
_appWorkers :: [Worker]                           -- a list of workers for consumers and producers that run in the background
  , AppState -> [(ReqId, MVar Response)]
_appResponse :: [(ReqId, MVar Response)]          -- a list of registered requests that need a Request Id
  , AppState -> [(ProducerId, ProducerSeqs)]
_appSendReceipts :: [(ProducerId, ProducerSeqs)]  -- a list of registered messages sent by a specific producer
  }
$(makeLenses ''AppState)

mkConsumerId :: MonadIO m => Chan Response -> IORef AppState -> m ConsumerId
mkConsumerId :: Chan Response -> IORef AppState -> m ConsumerId
mkConsumerId chan :: Chan Response
chan ref :: IORef AppState
ref = IO ConsumerId -> m ConsumerId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ConsumerId -> m ConsumerId) -> IO ConsumerId -> m ConsumerId
forall a b. (a -> b) -> a -> b
$ IORef AppState
-> (AppState -> (AppState, ConsumerId)) -> IO ConsumerId
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef AppState
ref ((AppState -> (AppState, ConsumerId)) -> IO ConsumerId)
-> (AppState -> (AppState, ConsumerId)) -> IO ConsumerId
forall a b. (a -> b) -> a -> b
$ \app :: AppState
app ->
  let cid :: ConsumerId
cid = AppState
app AppState
-> FoldLike ConsumerId AppState AppState ConsumerId ConsumerId
-> ConsumerId
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike ConsumerId AppState AppState ConsumerId ConsumerId
forall (f :: * -> *).
Functor f =>
(ConsumerId -> f ConsumerId) -> AppState -> f AppState
appConsumerId
      f :: AppState
f   = ASetter
  AppState
  AppState
  [(ConsumerId, Chan Response)]
  [(ConsumerId, Chan Response)]
-> ([(ConsumerId, Chan Response)] -> [(ConsumerId, Chan Response)])
-> AppState
-> AppState
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over ASetter
  AppState
  AppState
  [(ConsumerId, Chan Response)]
  [(ConsumerId, Chan Response)]
forall (f :: * -> *).
Functor f =>
([(ConsumerId, Chan Response)] -> f [(ConsumerId, Chan Response)])
-> AppState -> f AppState
appConsumers ((ConsumerId
cid, Chan Response
chan) (ConsumerId, Chan Response)
-> [(ConsumerId, Chan Response)] -> [(ConsumerId, Chan Response)]
forall a. a -> [a] -> [a]
:) AppState
app
  in  (ASetter AppState AppState ConsumerId ConsumerId
-> (ConsumerId -> ConsumerId) -> AppState -> AppState
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over ASetter AppState AppState ConsumerId ConsumerId
forall (f :: * -> *).
Functor f =>
(ConsumerId -> f ConsumerId) -> AppState -> f AppState
appConsumerId (ConsumerId -> ConsumerId -> ConsumerId
forall a. Num a => a -> a -> a
+ 1) AppState
f, ConsumerId
cid)

mkProducerId :: MonadIO m => IORef AppState -> m ProducerId
mkProducerId :: IORef AppState -> m ProducerId
mkProducerId ref :: IORef AppState
ref = IO ProducerId -> m ProducerId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ProducerId -> m ProducerId) -> IO ProducerId -> m ProducerId
forall a b. (a -> b) -> a -> b
$ IORef AppState
-> (AppState -> (AppState, ProducerId)) -> IO ProducerId
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef AppState
ref ((AppState -> (AppState, ProducerId)) -> IO ProducerId)
-> (AppState -> (AppState, ProducerId)) -> IO ProducerId
forall a b. (a -> b) -> a -> b
$ \app :: AppState
app ->
  let pid :: ProducerId
pid = AppState
app AppState
-> FoldLike ProducerId AppState AppState ProducerId ProducerId
-> ProducerId
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike ProducerId AppState AppState ProducerId ProducerId
forall (f :: * -> *).
Functor f =>
(ProducerId -> f ProducerId) -> AppState -> f AppState
appProducerId
      f :: AppState
f   = ASetter
  AppState
  AppState
  [(ProducerId, ProducerSeqs)]
  [(ProducerId, ProducerSeqs)]
-> ([(ProducerId, ProducerSeqs)] -> [(ProducerId, ProducerSeqs)])
-> AppState
-> AppState
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over ASetter
  AppState
  AppState
  [(ProducerId, ProducerSeqs)]
  [(ProducerId, ProducerSeqs)]
forall (f :: * -> *).
Functor f =>
([(ProducerId, ProducerSeqs)] -> f [(ProducerId, ProducerSeqs)])
-> AppState -> f AppState
appSendReceipts ((ProducerId
pid, []) (ProducerId, ProducerSeqs)
-> [(ProducerId, ProducerSeqs)] -> [(ProducerId, ProducerSeqs)]
forall a. a -> [a] -> [a]
:) AppState
app
  in  (ASetter AppState AppState ProducerId ProducerId
-> (ProducerId -> ProducerId) -> AppState -> AppState
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over ASetter AppState AppState ProducerId ProducerId
forall (f :: * -> *).
Functor f =>
(ProducerId -> f ProducerId) -> AppState -> f AppState
appProducerId (ProducerId -> ProducerId -> ProducerId
forall a. Num a => a -> a -> a
+ 1) AppState
f, ProducerId
pid)

mkRequestId :: MonadIO m => IORef AppState -> m (ReqId, MVar Response)
mkRequestId :: IORef AppState -> m (ReqId, MVar Response)
mkRequestId ref :: IORef AppState
ref = IO (ReqId, MVar Response) -> m (ReqId, MVar Response)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (ReqId, MVar Response) -> m (ReqId, MVar Response))
-> IO (ReqId, MVar Response) -> m (ReqId, MVar Response)
forall a b. (a -> b) -> a -> b
$ do
  MVar Response
var <- IO (MVar Response)
forall a. IO (MVar a)
newEmptyMVar
  IORef AppState
-> (AppState -> (AppState, (ReqId, MVar Response)))
-> IO (ReqId, MVar Response)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef AppState
ref ((AppState -> (AppState, (ReqId, MVar Response)))
 -> IO (ReqId, MVar Response))
-> (AppState -> (AppState, (ReqId, MVar Response)))
-> IO (ReqId, MVar Response)
forall a b. (a -> b) -> a -> b
$ \app :: AppState
app ->
    let req :: ReqId
req = AppState
app AppState -> FoldLike ReqId AppState AppState ReqId ReqId -> ReqId
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike ReqId AppState AppState ReqId ReqId
forall (f :: * -> *).
Functor f =>
(ReqId -> f ReqId) -> AppState -> f AppState
appRequestId
        f :: AppState
f   = ASetter
  AppState AppState [(ReqId, MVar Response)] [(ReqId, MVar Response)]
-> ([(ReqId, MVar Response)] -> [(ReqId, MVar Response)])
-> AppState
-> AppState
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over ASetter
  AppState AppState [(ReqId, MVar Response)] [(ReqId, MVar Response)]
forall (f :: * -> *).
Functor f =>
([(ReqId, MVar Response)] -> f [(ReqId, MVar Response)])
-> AppState -> f AppState
appResponse ((ReqId
req, MVar Response
var) (ReqId, MVar Response)
-> [(ReqId, MVar Response)] -> [(ReqId, MVar Response)]
forall a. a -> [a] -> [a]
:) AppState
app
    in  (ASetter AppState AppState ReqId ReqId
-> (ReqId -> ReqId) -> AppState -> AppState
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over ASetter AppState AppState ReqId ReqId
forall (f :: * -> *).
Functor f =>
(ReqId -> f ReqId) -> AppState -> f AppState
appRequestId (ReqId -> ReqId -> ReqId
forall a. Num a => a -> a -> a
+ 1) AppState
f, (ReqId
req, MVar Response
var))

addWorker :: MonadIO m => IORef AppState -> (Async (), MVar ()) -> m ()
addWorker :: IORef AppState -> Worker -> m ()
addWorker ref :: IORef AppState
ref nw :: Worker
nw =
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef AppState -> (AppState -> (AppState, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef AppState
ref ((AppState -> (AppState, ())) -> IO ())
-> (AppState -> (AppState, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \app :: AppState
app -> (ASetter AppState AppState [Worker] [Worker]
-> ([Worker] -> [Worker]) -> AppState -> AppState
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over ASetter AppState AppState [Worker] [Worker]
forall (f :: * -> *).
Functor f =>
([Worker] -> f [Worker]) -> AppState -> f AppState
appWorkers (Worker
nw Worker -> [Worker] -> [Worker]
forall a. a -> [a] -> [a]
:) AppState
app, ())

{- | Register a response for a request and unregister request. -}
registerReqResponse :: MonadIO m => IORef AppState -> ReqId -> Response -> m ()
registerReqResponse :: IORef AppState -> ReqId -> Response -> m ()
registerReqResponse ref :: IORef AppState
ref rid :: ReqId
rid resp :: Response
resp = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  Maybe (MVar Response)
maybeVar <- IORef AppState
-> (AppState -> (AppState, Maybe (MVar Response)))
-> IO (Maybe (MVar Response))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef AppState
ref
    ((AppState -> (AppState, Maybe (MVar Response)))
 -> IO (Maybe (MVar Response)))
-> (AppState -> (AppState, Maybe (MVar Response)))
-> IO (Maybe (MVar Response))
forall a b. (a -> b) -> a -> b
$ \app :: AppState
app -> (ASetter
  AppState AppState [(ReqId, MVar Response)] [(ReqId, MVar Response)]
-> ([(ReqId, MVar Response)] -> [(ReqId, MVar Response)])
-> AppState
-> AppState
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over ASetter
  AppState AppState [(ReqId, MVar Response)] [(ReqId, MVar Response)]
forall (f :: * -> *).
Functor f =>
([(ReqId, MVar Response)] -> f [(ReqId, MVar Response)])
-> AppState -> f AppState
appResponse [(ReqId, MVar Response)] -> [(ReqId, MVar Response)]
forall b. [(ReqId, b)] -> [(ReqId, b)]
h AppState
app, ReqId -> [(ReqId, MVar Response)] -> Maybe (MVar Response)
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup ReqId
rid ([(ReqId, MVar Response)] -> Maybe (MVar Response))
-> [(ReqId, MVar Response)] -> Maybe (MVar Response)
forall a b. (a -> b) -> a -> b
$ AppState
app AppState
-> FoldLike
     [(ReqId, MVar Response)]
     AppState
     AppState
     [(ReqId, MVar Response)]
     [(ReqId, MVar Response)]
-> [(ReqId, MVar Response)]
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  [(ReqId, MVar Response)]
  AppState
  AppState
  [(ReqId, MVar Response)]
  [(ReqId, MVar Response)]
forall (f :: * -> *).
Functor f =>
([(ReqId, MVar Response)] -> f [(ReqId, MVar Response)])
-> AppState -> f AppState
appResponse)
  (MVar Response -> IO ()) -> Maybe (MVar Response) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (MVar Response -> Response -> IO ()
forall a. MVar a -> a -> IO ()
`putMVar` Response
resp) Maybe (MVar Response)
maybeVar
  where h :: [(ReqId, b)] -> [(ReqId, b)]
h = ((ReqId, b) -> Bool) -> [(ReqId, b)] -> [(ReqId, b)]
forall a. (a -> Bool) -> [a] -> [a]
filter ((ReqId
rid ReqId -> ReqId -> Bool
forall a. Eq a => a -> a -> Bool
/=) (ReqId -> Bool) -> ((ReqId, b) -> ReqId) -> (ReqId, b) -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ReqId, b) -> ReqId
forall a b. (a, b) -> a
fst) -- unregister request

getProducerSeqs :: a -> [(a, [b])] -> [b]
getProducerSeqs pid :: a
pid xs :: [(a, [b])]
xs = ((a, [b]) -> Bool) -> [(a, [b])] -> [(a, [b])]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(p :: a
p, _) -> a
pid a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
p) [(a, [b])]
xs [(a, [b])] -> ((a, [b]) -> [b]) -> [b]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (a, [b]) -> [b]
forall a b. (a, b) -> b
snd

updateProducerSeqs :: a -> b -> f (a, b) -> f (a, b)
updateProducerSeqs pid :: a
pid g :: b
g xs :: f (a, b)
xs =
  (\(p :: a
p, ys :: b
ys) -> if a
p a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
pid then (a
p, b
g) else (a
p, b
ys)) ((a, b) -> (a, b)) -> f (a, b) -> f (a, b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> f (a, b)
xs

registerSeqId
  :: MonadIO m => IORef AppState -> ProducerId -> SeqId -> m (MVar Response)
registerSeqId :: IORef AppState -> ProducerId -> SeqId -> m (MVar Response)
registerSeqId ref :: IORef AppState
ref pid :: ProducerId
pid sid :: SeqId
sid = IO (MVar Response) -> m (MVar Response)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (MVar Response) -> m (MVar Response))
-> IO (MVar Response) -> m (MVar Response)
forall a b. (a -> b) -> a -> b
$ do
  MVar Response
var <- IO (MVar Response)
forall a. IO (MVar a)
newEmptyMVar
  IORef AppState
-> (AppState -> (AppState, MVar Response)) -> IO (MVar Response)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef AppState
ref ((AppState -> (AppState, MVar Response)) -> IO (MVar Response))
-> (AppState -> (AppState, MVar Response)) -> IO (MVar Response)
forall a b. (a -> b) -> a -> b
$ \app :: AppState
app ->
    let xs :: [(ProducerId, ProducerSeqs)]
xs = AppState
app AppState
-> FoldLike
     [(ProducerId, ProducerSeqs)]
     AppState
     AppState
     [(ProducerId, ProducerSeqs)]
     [(ProducerId, ProducerSeqs)]
-> [(ProducerId, ProducerSeqs)]
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  [(ProducerId, ProducerSeqs)]
  AppState
  AppState
  [(ProducerId, ProducerSeqs)]
  [(ProducerId, ProducerSeqs)]
forall (f :: * -> *).
Functor f =>
([(ProducerId, ProducerSeqs)] -> f [(ProducerId, ProducerSeqs)])
-> AppState -> f AppState
appSendReceipts
        g :: ProducerSeqs
g  = (SeqId
sid, MVar Response
var) (SeqId, MVar Response) -> ProducerSeqs -> ProducerSeqs
forall a. a -> [a] -> [a]
: ProducerId -> [(ProducerId, ProducerSeqs)] -> ProducerSeqs
forall a b. Eq a => a -> [(a, [b])] -> [b]
getProducerSeqs ProducerId
pid [(ProducerId, ProducerSeqs)]
xs
        h :: [(ProducerId, ProducerSeqs)]
h  = ProducerId
-> ProducerSeqs
-> [(ProducerId, ProducerSeqs)]
-> [(ProducerId, ProducerSeqs)]
forall (f :: * -> *) a b.
(Functor f, Eq a) =>
a -> b -> f (a, b) -> f (a, b)
updateProducerSeqs ProducerId
pid ProducerSeqs
g [(ProducerId, ProducerSeqs)]
xs
    in  (ASetter
  AppState
  AppState
  [(ProducerId, ProducerSeqs)]
  [(ProducerId, ProducerSeqs)]
-> [(ProducerId, ProducerSeqs)] -> AppState -> AppState
forall s t a b. ASetter s t a b -> b -> s -> t
set ASetter
  AppState
  AppState
  [(ProducerId, ProducerSeqs)]
  [(ProducerId, ProducerSeqs)]
forall (f :: * -> *).
Functor f =>
([(ProducerId, ProducerSeqs)] -> f [(ProducerId, ProducerSeqs)])
-> AppState -> f AppState
appSendReceipts [(ProducerId, ProducerSeqs)]
h AppState
app, MVar Response
var)

{- | Register a response for a message sent and unregister sequence id. -}
registerSendReceipt
  :: MonadIO m => IORef AppState -> ProducerId -> SeqId -> Response -> m ()
registerSendReceipt :: IORef AppState -> ProducerId -> SeqId -> Response -> m ()
registerSendReceipt ref :: IORef AppState
ref pid :: ProducerId
pid sid :: SeqId
sid resp :: Response
resp = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  Maybe (MVar Response)
maybeVar <- IORef AppState
-> (AppState -> (AppState, Maybe (MVar Response)))
-> IO (Maybe (MVar Response))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef AppState
ref
    ((AppState -> (AppState, Maybe (MVar Response)))
 -> IO (Maybe (MVar Response)))
-> (AppState -> (AppState, Maybe (MVar Response)))
-> IO (Maybe (MVar Response))
forall a b. (a -> b) -> a -> b
$ \app :: AppState
app -> (ASetter
  AppState
  AppState
  [(ProducerId, ProducerSeqs)]
  [(ProducerId, ProducerSeqs)]
-> ([(ProducerId, ProducerSeqs)] -> [(ProducerId, ProducerSeqs)])
-> AppState
-> AppState
forall s t a b. ASetter s t a b -> (a -> b) -> s -> t
over ASetter
  AppState
  AppState
  [(ProducerId, ProducerSeqs)]
  [(ProducerId, ProducerSeqs)]
forall (f :: * -> *).
Functor f =>
([(ProducerId, ProducerSeqs)] -> f [(ProducerId, ProducerSeqs)])
-> AppState -> f AppState
appSendReceipts [(ProducerId, ProducerSeqs)] -> [(ProducerId, ProducerSeqs)]
forall b.
[(ProducerId, [(SeqId, b)])] -> [(ProducerId, [(SeqId, b)])]
h AppState
app, [(ProducerId, ProducerSeqs)] -> Maybe (MVar Response)
forall b. [(ProducerId, [(SeqId, b)])] -> Maybe b
g ([(ProducerId, ProducerSeqs)] -> Maybe (MVar Response))
-> [(ProducerId, ProducerSeqs)] -> Maybe (MVar Response)
forall a b. (a -> b) -> a -> b
$ AppState
app AppState
-> FoldLike
     [(ProducerId, ProducerSeqs)]
     AppState
     AppState
     [(ProducerId, ProducerSeqs)]
     [(ProducerId, ProducerSeqs)]
-> [(ProducerId, ProducerSeqs)]
forall s a t b. s -> FoldLike a s t a b -> a
^. FoldLike
  [(ProducerId, ProducerSeqs)]
  AppState
  AppState
  [(ProducerId, ProducerSeqs)]
  [(ProducerId, ProducerSeqs)]
forall (f :: * -> *).
Functor f =>
([(ProducerId, ProducerSeqs)] -> f [(ProducerId, ProducerSeqs)])
-> AppState -> f AppState
appSendReceipts)
  (MVar Response -> IO ()) -> Maybe (MVar Response) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (MVar Response -> Response -> IO ()
forall a. MVar a -> a -> IO ()
`putMVar` Response
resp) Maybe (MVar Response)
maybeVar
 where
  h :: [(ProducerId, [(SeqId, b)])] -> [(ProducerId, [(SeqId, b)])]
h xs :: [(ProducerId, [(SeqId, b)])]
xs =
    let f :: [(SeqId, b)]
f = ((SeqId, b) -> Bool) -> [(SeqId, b)] -> [(SeqId, b)]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(s :: SeqId
s, _) -> SeqId
s SeqId -> SeqId -> Bool
forall a. Eq a => a -> a -> Bool
/= SeqId
sid) ([(SeqId, b)] -> [(SeqId, b)]) -> [(SeqId, b)] -> [(SeqId, b)]
forall a b. (a -> b) -> a -> b
$ ProducerId -> [(ProducerId, [(SeqId, b)])] -> [(SeqId, b)]
forall a b. Eq a => a -> [(a, [b])] -> [b]
getProducerSeqs ProducerId
pid [(ProducerId, [(SeqId, b)])]
xs
    in  ProducerId
-> [(SeqId, b)]
-> [(ProducerId, [(SeqId, b)])]
-> [(ProducerId, [(SeqId, b)])]
forall (f :: * -> *) a b.
(Functor f, Eq a) =>
a -> b -> f (a, b) -> f (a, b)
updateProducerSeqs ProducerId
pid [(SeqId, b)]
f [(ProducerId, [(SeqId, b)])]
xs
  g :: [(ProducerId, [(SeqId, b)])] -> Maybe b
g = SeqId -> [(SeqId, b)] -> Maybe b
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup SeqId
sid ([(SeqId, b)] -> Maybe b)
-> ([(ProducerId, [(SeqId, b)])] -> [(SeqId, b)])
-> [(ProducerId, [(SeqId, b)])]
-> Maybe b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProducerId -> [(ProducerId, [(SeqId, b)])] -> [(SeqId, b)]
forall a b. Eq a => a -> [(a, [b])] -> [b]
getProducerSeqs ProducerId
pid