{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Hercules.Agent.Producer where
import Control.Applicative
import Control.Concurrent hiding (throwTo)
import Control.Concurrent.Async hiding (cancel)
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.IO.Unlift
import Control.Monad.State
import Data.Foldable
import Data.Traversable
import Katip
import UnliftIO.Exception
import Prelude
data Producer p r = Producer
{ forall p r. Producer p r -> STM (Msg p r)
producerQueueRead :: STM (Msg p r),
forall p r. Producer p r -> ThreadId
producerThread :: ThreadId
}
deriving ((forall a b. (a -> b) -> Producer p a -> Producer p b)
-> (forall a b. a -> Producer p b -> Producer p a)
-> Functor (Producer p)
forall a b. a -> Producer p b -> Producer p a
forall a b. (a -> b) -> Producer p a -> Producer p b
forall p a b. a -> Producer p b -> Producer p a
forall p a b. (a -> b) -> Producer p a -> Producer p b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> Producer p b -> Producer p a
$c<$ :: forall p a b. a -> Producer p b -> Producer p a
fmap :: forall a b. (a -> b) -> Producer p a -> Producer p b
$cfmap :: forall p a b. (a -> b) -> Producer p a -> Producer p b
Functor)
data ProducerCancelled = ProducerCancelled
deriving (Int -> ProducerCancelled -> ShowS
[ProducerCancelled] -> ShowS
ProducerCancelled -> String
(Int -> ProducerCancelled -> ShowS)
-> (ProducerCancelled -> String)
-> ([ProducerCancelled] -> ShowS)
-> Show ProducerCancelled
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ProducerCancelled] -> ShowS
$cshowList :: [ProducerCancelled] -> ShowS
show :: ProducerCancelled -> String
$cshow :: ProducerCancelled -> String
showsPrec :: Int -> ProducerCancelled -> ShowS
$cshowsPrec :: Int -> ProducerCancelled -> ShowS
Show, Show ProducerCancelled
Typeable ProducerCancelled
Typeable ProducerCancelled
-> Show ProducerCancelled
-> (ProducerCancelled -> SomeException)
-> (SomeException -> Maybe ProducerCancelled)
-> (ProducerCancelled -> String)
-> Exception ProducerCancelled
SomeException -> Maybe ProducerCancelled
ProducerCancelled -> String
ProducerCancelled -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: ProducerCancelled -> String
$cdisplayException :: ProducerCancelled -> String
fromException :: SomeException -> Maybe ProducerCancelled
$cfromException :: SomeException -> Maybe ProducerCancelled
toException :: ProducerCancelled -> SomeException
$ctoException :: ProducerCancelled -> SomeException
Exception, Typeable)
data Msg p r
=
Payload p
|
Exception SomeException
|
Close r
deriving ((forall a b. (a -> b) -> Msg p a -> Msg p b)
-> (forall a b. a -> Msg p b -> Msg p a) -> Functor (Msg p)
forall a b. a -> Msg p b -> Msg p a
forall a b. (a -> b) -> Msg p a -> Msg p b
forall p a b. a -> Msg p b -> Msg p a
forall p a b. (a -> b) -> Msg p a -> Msg p b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> Msg p b -> Msg p a
$c<$ :: forall p a b. a -> Msg p b -> Msg p a
fmap :: forall a b. (a -> b) -> Msg p a -> Msg p b
$cfmap :: forall p a b. (a -> b) -> Msg p a -> Msg p b
Functor)
forkProducer :: forall m p r. (MonadIO m, MonadUnliftIO m) => ((p -> m ()) -> m r) -> m (Producer p r)
forkProducer :: forall (m :: * -> *) p r.
(MonadIO m, MonadUnliftIO m) =>
((p -> m ()) -> m r) -> m (Producer p r)
forkProducer (p -> m ()) -> m r
f = do
TQueue (Msg p r)
q <- IO (TQueue (Msg p r)) -> m (TQueue (Msg p r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TQueue (Msg p r))
forall a. IO (TQueue a)
newTQueueIO
let write :: MonadIO m' => Msg p r -> m' ()
write :: forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write = IO () -> m' ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m' ()) -> (Msg p r -> IO ()) -> Msg p r -> m' ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (Msg p r -> STM ()) -> Msg p r -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TQueue (Msg p r) -> Msg p r -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Msg p r)
q
IO r
f' <- m r -> m (IO r)
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (IO a)
toIO ((p -> m ()) -> m r
f (Msg p r -> m ()
forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write (Msg p r -> m ()) -> (p -> Msg p r) -> p -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. p -> Msg p r
forall p r. p -> Msg p r
Payload))
ThreadId
t <- IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ThreadId -> m ThreadId) -> IO ThreadId -> m ThreadId
forall a b. (a -> b) -> a -> b
$ IO r -> (Either SomeException r -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally IO r
f' (Msg p r -> IO ()
forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write (Msg p r -> IO ())
-> (Either SomeException r -> Msg p r)
-> Either SomeException r
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either SomeException r -> Msg p r
forall {r} {p}. Either SomeException r -> Msg p r
toResult)
Producer p r -> m (Producer p r)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Producer p r -> m (Producer p r))
-> Producer p r -> m (Producer p r)
forall a b. (a -> b) -> a -> b
$ Producer :: forall p r. STM (Msg p r) -> ThreadId -> Producer p r
Producer {producerQueueRead :: STM (Msg p r)
producerQueueRead = TQueue (Msg p r) -> STM (Msg p r)
forall a. TQueue a -> STM a
readTQueue TQueue (Msg p r)
q, producerThread :: ThreadId
producerThread = ThreadId
t}
where
toResult :: Either SomeException r -> Msg p r
toResult (Left SomeException
e) = SomeException -> Msg p r
forall p r. SomeException -> Msg p r
Exception SomeException
e
toResult (Right r
r) = r -> Msg p r
forall p r. r -> Msg p r
Close r
r
cancel :: MonadIO m => Producer p r -> m ()
cancel :: forall (m :: * -> *) p r. MonadIO m => Producer p r -> m ()
cancel Producer p r
p = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> ProducerCancelled -> IO ()
forall e (m :: * -> *).
(Exception e, MonadIO m) =>
ThreadId -> e -> m ()
throwTo (Producer p r -> ThreadId
forall p r. Producer p r -> ThreadId
producerThread Producer p r
p) ProducerCancelled
ProducerCancelled
withProducer ::
(MonadIO m, MonadUnliftIO m) =>
((p -> m ()) -> m r) ->
(Producer p r -> m a) ->
m a
withProducer :: forall (m :: * -> *) p r a.
(MonadIO m, MonadUnliftIO m) =>
((p -> m ()) -> m r) -> (Producer p r -> m a) -> m a
withProducer (p -> m ()) -> m r
f = m (Producer p r)
-> (Producer p r -> m ()) -> (Producer p r -> m a) -> m a
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (((p -> m ()) -> m r) -> m (Producer p r)
forall (m :: * -> *) p r.
(MonadIO m, MonadUnliftIO m) =>
((p -> m ()) -> m r) -> m (Producer p r)
forkProducer (p -> m ()) -> m r
f) Producer p r -> m ()
forall (m :: * -> *) p r. MonadIO m => Producer p r -> m ()
cancel
listen ::
MonadIO m =>
Producer p r ->
(p -> m a) ->
(r -> m a) ->
STM (m a)
listen :: forall (m :: * -> *) p r a.
MonadIO m =>
Producer p r -> (p -> m a) -> (r -> m a) -> STM (m a)
listen Producer p r
p p -> m a
fPayload r -> m a
fResult =
(Msg p r -> m a) -> STM (Msg p r) -> STM (m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Msg p r -> m a
f (Producer p r -> STM (Msg p r)
forall p r. Producer p r -> STM (Msg p r)
producerQueueRead Producer p r
p)
where
f :: Msg p r -> m a
f (Payload p
payload) = p -> m a
fPayload p
payload
f (Exception SomeException
e) = SomeException -> m a
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e
f (Close r
r) = r -> m a
fResult r
r
joinSTM :: MonadIO m => STM (m a) -> m a
joinSTM :: forall (m :: * -> *) a. MonadIO m => STM (m a) -> m a
joinSTM = m (m a) -> m a
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m a) -> m a) -> (STM (m a) -> m (m a)) -> STM (m a) -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (m a) -> m (m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (m a) -> m (m a))
-> (STM (m a) -> IO (m a)) -> STM (m a) -> m (m a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (m a) -> IO (m a)
forall a. STM a -> IO a
atomically
data Syncing a = Syncable a | Syncer (Maybe SomeException -> STM ())
withSync ::
(MonadIO m, MonadUnliftIO m, Traversable t) =>
t (Syncing a) ->
(t (Maybe a) -> m b) ->
m b
withSync :: forall (m :: * -> *) (t :: * -> *) a b.
(MonadIO m, MonadUnliftIO m, Traversable t) =>
t (Syncing a) -> (t (Maybe a) -> m b) -> m b
withSync t (Syncing a)
t t (Maybe a) -> m b
f = do
let (t (Maybe a)
t', Maybe SomeException -> STM ()
syncs) =
State (Maybe SomeException -> STM ()) (t (Maybe a))
-> (Maybe SomeException -> STM ())
-> (t (Maybe a), Maybe SomeException -> STM ())
forall s a. State s a -> s -> (a, s)
runState
( t (Syncing a)
-> (Syncing a
-> StateT (Maybe SomeException -> STM ()) Identity (Maybe a))
-> State (Maybe SomeException -> STM ()) (t (Maybe a))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for t (Syncing a)
t ((Syncing a
-> StateT (Maybe SomeException -> STM ()) Identity (Maybe a))
-> State (Maybe SomeException -> STM ()) (t (Maybe a)))
-> (Syncing a
-> StateT (Maybe SomeException -> STM ()) Identity (Maybe a))
-> State (Maybe SomeException -> STM ()) (t (Maybe a))
forall a b. (a -> b) -> a -> b
$ \case
Syncable a
a -> Maybe a
-> StateT (Maybe SomeException -> STM ()) Identity (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
Syncer Maybe SomeException -> STM ()
s -> Maybe a
forall a. Maybe a
Nothing Maybe a
-> StateT (Maybe SomeException -> STM ()) Identity ()
-> StateT (Maybe SomeException -> STM ()) Identity (Maybe a)
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ ((Maybe SomeException -> STM ()) -> Maybe SomeException -> STM ())
-> StateT (Maybe SomeException -> STM ()) Identity ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify ((Maybe SomeException -> STM ())
-> (Maybe SomeException -> STM ()) -> Maybe SomeException -> STM ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Maybe SomeException -> STM ()
s)
)
(\Maybe SomeException
_ -> () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
b
b <- t (Maybe a) -> m b
f t (Maybe a)
t' m b -> (SomeException -> m ()) -> m b
forall (m :: * -> *) e a b.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m b) -> m a
`withException` (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (SomeException -> IO ()) -> SomeException -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (SomeException -> STM ()) -> SomeException -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe SomeException -> STM ()
syncs (Maybe SomeException -> STM ())
-> (SomeException -> Maybe SomeException)
-> SomeException
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just)
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe SomeException -> STM ()
syncs Maybe SomeException
forall a. Maybe a
Nothing
b -> m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
b
withBoundedDelayBatchProducer ::
(MonadIO m, MonadUnliftIO m, KatipContext m) =>
Int ->
Int ->
Producer p r ->
(Producer [p] r -> m a) ->
m a
withBoundedDelayBatchProducer :: forall (m :: * -> *) p r a.
(MonadIO m, MonadUnliftIO m, KatipContext m) =>
Int -> Int -> Producer p r -> (Producer [p] r -> m a) -> m a
withBoundedDelayBatchProducer Int
maxDelay Int
maxItems Producer p r
sourceP Producer [p] r -> m a
f = do
UnliftIO {unliftIO :: forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO = forall a. m a -> IO a
unlift} <- m (UnliftIO m)
forall (m :: * -> *). MonadUnliftIO m => m (UnliftIO m)
askUnliftIO
TQueue ()
flushes <- IO (TQueue ()) -> m (TQueue ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TQueue ())
forall a. IO (TQueue a)
newTQueueIO
let producer :: ([p] -> f ()) -> f r
producer [p] -> f ()
writeBatch =
let beginReading :: f r
beginReading = Int -> [p] -> f r
readItems (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 Int
maxItems) []
doPerformBatch :: [p] -> f ()
doPerformBatch [] = () -> f ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
doPerformBatch [p]
buf = [p] -> f ()
writeBatch ([p] -> [p]
forall a. [a] -> [a]
reverse [p]
buf)
readItems :: Int -> [p] -> f r
readItems Int
0 [p]
buf = do
[p] -> f ()
doPerformBatch [p]
buf
f r
beginReading
readItems Int
bufferRemaining [p]
buf =
STM (f r) -> f r
forall (m :: * -> *) a. MonadIO m => STM (m a) -> m a
joinSTM
( Msg p r -> f r
onQueueRead (Msg p r -> f r) -> STM (Msg p r) -> STM (f r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Producer p r -> STM (Msg p r)
forall p r. Producer p r -> STM (Msg p r)
producerQueueRead Producer p r
sourceP
STM (f r) -> STM (f r) -> STM (f r)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> f r
onFlush
f r -> STM () -> STM (f r)
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ TQueue () -> STM ()
forall a. TQueue a -> STM a
readTQueue TQueue ()
flushes
)
where
onQueueRead :: Msg p r -> f r
onQueueRead (Payload p
a) =
Int -> [p] -> f r
readItems (Int
bufferRemaining Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) (p
a p -> [p] -> [p]
forall a. a -> [a] -> [a]
: [p]
buf)
onQueueRead (Close r
r) = do
[p] -> f ()
doPerformBatch [p]
buf
r -> f r
forall (f :: * -> *) a. Applicative f => a -> f a
pure r
r
onQueueRead (Exception SomeException
e) = do
[p] -> f ()
doPerformBatch [p]
buf
IO r -> f r
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO r -> f r) -> IO r -> f r
forall a b. (a -> b) -> a -> b
$ SomeException -> IO r
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e
onFlush :: f r
onFlush = do
[p] -> f ()
doPerformBatch [p]
buf
f r
beginReading
in f r
beginReading
IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$
IO Any -> (Async Any -> IO a) -> IO a
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync
( IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
Int -> IO ()
threadDelay Int
maxDelay
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue () -> () -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue ()
flushes ()
)
((Async Any -> IO a) -> IO a) -> (Async Any -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Async Any
_flusher -> m a -> IO a
forall a. m a -> IO a
unlift (m a -> IO a) -> m a -> IO a
forall a b. (a -> b) -> a -> b
$ (([p] -> m ()) -> m r) -> (Producer [p] r -> m a) -> m a
forall (m :: * -> *) p r a.
(MonadIO m, MonadUnliftIO m) =>
((p -> m ()) -> m r) -> (Producer p r -> m a) -> m a
withProducer ([p] -> m ()) -> m r
forall {f :: * -> *}. MonadIO f => ([p] -> f ()) -> f r
producer Producer [p] r -> m a
f
syncer :: MonadIO m => (Syncing a -> m ()) -> m ()
syncer :: forall (m :: * -> *) a. MonadIO m => (Syncing a -> m ()) -> m ()
syncer Syncing a -> m ()
writer = do
TMVar (Maybe SomeException)
v <- IO (TMVar (Maybe SomeException)) -> m (TMVar (Maybe SomeException))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar (Maybe SomeException))
forall a. IO (TMVar a)
newEmptyTMVarIO
Syncing a -> m ()
writer ((Maybe SomeException -> STM ()) -> Syncing a
forall a. (Maybe SomeException -> STM ()) -> Syncing a
Syncer ((Maybe SomeException -> STM ()) -> Syncing a)
-> (Maybe SomeException -> STM ()) -> Syncing a
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe SomeException) -> Maybe SomeException -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe SomeException)
v)
Maybe SomeException
mexc <- IO (Maybe SomeException) -> m (Maybe SomeException)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe SomeException) -> m (Maybe SomeException))
-> IO (Maybe SomeException) -> m (Maybe SomeException)
forall a b. (a -> b) -> a -> b
$ STM (Maybe SomeException) -> IO (Maybe SomeException)
forall a. STM a -> IO a
atomically (STM (Maybe SomeException) -> IO (Maybe SomeException))
-> STM (Maybe SomeException) -> IO (Maybe SomeException)
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe SomeException) -> STM (Maybe SomeException)
forall a. TMVar a -> STM a
readTMVar TMVar (Maybe SomeException)
v
Maybe SomeException -> (SomeException -> m Any) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe SomeException
mexc (IO Any -> m Any
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Any -> m Any)
-> (SomeException -> IO Any) -> SomeException -> m Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> IO Any
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO)