{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Hercules.Agent.Producer where
import Control.Applicative
import Control.Concurrent hiding (throwTo)
import Control.Concurrent.Async hiding (cancel)
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.IO.Unlift
import Control.Monad.State
import Data.Foldable
import Data.Traversable
import UnliftIO.Exception
import Prelude
data Producer p r = Producer
{ forall p r. Producer p r -> STM (Msg p r)
producerQueueRead :: STM (Msg p r),
forall p r. Producer p r -> ThreadId
producerThread :: ThreadId
}
deriving (forall a b. a -> Producer p b -> Producer p a
forall a b. (a -> b) -> Producer p a -> Producer p b
forall p a b. a -> Producer p b -> Producer p a
forall p a b. (a -> b) -> Producer p a -> Producer p b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> Producer p b -> Producer p a
$c<$ :: forall p a b. a -> Producer p b -> Producer p a
fmap :: forall a b. (a -> b) -> Producer p a -> Producer p b
$cfmap :: forall p a b. (a -> b) -> Producer p a -> Producer p b
Functor)
data ProducerCancelled = ProducerCancelled
deriving (Int -> ProducerCancelled -> ShowS
[ProducerCancelled] -> ShowS
ProducerCancelled -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ProducerCancelled] -> ShowS
$cshowList :: [ProducerCancelled] -> ShowS
show :: ProducerCancelled -> String
$cshow :: ProducerCancelled -> String
showsPrec :: Int -> ProducerCancelled -> ShowS
$cshowsPrec :: Int -> ProducerCancelled -> ShowS
Show, Show ProducerCancelled
Typeable ProducerCancelled
SomeException -> Maybe ProducerCancelled
ProducerCancelled -> String
ProducerCancelled -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: ProducerCancelled -> String
$cdisplayException :: ProducerCancelled -> String
fromException :: SomeException -> Maybe ProducerCancelled
$cfromException :: SomeException -> Maybe ProducerCancelled
toException :: ProducerCancelled -> SomeException
$ctoException :: ProducerCancelled -> SomeException
Exception, Typeable)
data Msg p r
=
Payload p
|
Exception SomeException
|
Close r
deriving (forall a b. a -> Msg p b -> Msg p a
forall a b. (a -> b) -> Msg p a -> Msg p b
forall p a b. a -> Msg p b -> Msg p a
forall p a b. (a -> b) -> Msg p a -> Msg p b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> Msg p b -> Msg p a
$c<$ :: forall p a b. a -> Msg p b -> Msg p a
fmap :: forall a b. (a -> b) -> Msg p a -> Msg p b
$cfmap :: forall p a b. (a -> b) -> Msg p a -> Msg p b
Functor)
forkProducer :: forall m p r. (MonadIO m, MonadUnliftIO m) => ((p -> m ()) -> m r) -> m (Producer p r)
forkProducer :: forall (m :: * -> *) p r.
(MonadIO m, MonadUnliftIO m) =>
((p -> m ()) -> m r) -> m (Producer p r)
forkProducer (p -> m ()) -> m r
f = do
TQueue (Msg p r)
q <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a. IO (TQueue a)
newTQueueIO
let write :: MonadIO m' => Msg p r -> m' ()
write :: forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Msg p r)
q
IO r
f' <- forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (IO a)
toIO ((p -> m ()) -> m r
f (forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall p r. p -> Msg p r
Payload))
ThreadId
t <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally IO r
f' (forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall {r} {p}. Either SomeException r -> Msg p r
toResult)
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ Producer {producerQueueRead :: STM (Msg p r)
producerQueueRead = forall a. TQueue a -> STM a
readTQueue TQueue (Msg p r)
q, producerThread :: ThreadId
producerThread = ThreadId
t}
where
toResult :: Either SomeException r -> Msg p r
toResult (Left SomeException
e) = forall p r. SomeException -> Msg p r
Exception SomeException
e
toResult (Right r
r) = forall p r. r -> Msg p r
Close r
r
cancel :: MonadIO m => Producer p r -> m ()
cancel :: forall (m :: * -> *) p r. MonadIO m => Producer p r -> m ()
cancel Producer p r
p = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall e (m :: * -> *).
(Exception e, MonadIO m) =>
ThreadId -> e -> m ()
throwTo (forall p r. Producer p r -> ThreadId
producerThread Producer p r
p) ProducerCancelled
ProducerCancelled
withProducer ::
(MonadUnliftIO m) =>
((p -> m ()) -> m r) ->
(Producer p r -> m a) ->
m a
withProducer :: forall (m :: * -> *) p r a.
MonadUnliftIO m =>
((p -> m ()) -> m r) -> (Producer p r -> m a) -> m a
withProducer (p -> m ()) -> m r
f = forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (forall (m :: * -> *) p r.
(MonadIO m, MonadUnliftIO m) =>
((p -> m ()) -> m r) -> m (Producer p r)
forkProducer (p -> m ()) -> m r
f) forall (m :: * -> *) p r. MonadIO m => Producer p r -> m ()
cancel
listen ::
MonadIO m =>
Producer p r ->
(p -> m a) ->
(r -> m a) ->
STM (m a)
listen :: forall (m :: * -> *) p r a.
MonadIO m =>
Producer p r -> (p -> m a) -> (r -> m a) -> STM (m a)
listen Producer p r
p p -> m a
fPayload r -> m a
fResult =
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Msg p r -> m a
f (forall p r. Producer p r -> STM (Msg p r)
producerQueueRead Producer p r
p)
where
f :: Msg p r -> m a
f (Payload p
payload) = p -> m a
fPayload p
payload
f (Exception SomeException
e) = forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e
f (Close r
r) = r -> m a
fResult r
r
joinSTM :: MonadIO m => STM (m a) -> m a
joinSTM :: forall (m :: * -> *) a. MonadIO m => STM (m a) -> m a
joinSTM = forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically
data Syncing a = Syncable a | Syncer (Maybe SomeException -> STM ())
withSync ::
(MonadIO m, MonadUnliftIO m, Traversable t) =>
t (Syncing a) ->
(t (Maybe a) -> m b) ->
m b
withSync :: forall (m :: * -> *) (t :: * -> *) a b.
(MonadIO m, MonadUnliftIO m, Traversable t) =>
t (Syncing a) -> (t (Maybe a) -> m b) -> m b
withSync t (Syncing a)
t t (Maybe a) -> m b
f = do
let (t (Maybe a)
t', Maybe SomeException -> STM ()
syncs) =
forall s a. State s a -> s -> (a, s)
runState
( forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for t (Syncing a)
t forall a b. (a -> b) -> a -> b
$ \case
Syncable a
a -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just a
a)
Syncer Maybe SomeException -> STM ()
s -> forall a. Maybe a
Nothing forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify (forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Maybe SomeException -> STM ()
s)
)
(\Maybe SomeException
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
b
b <- t (Maybe a) -> m b
f t (Maybe a)
t' forall (m :: * -> *) e a b.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m b) -> m a
`withException` (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe SomeException -> STM ()
syncs forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> Maybe a
Just)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ Maybe SomeException -> STM ()
syncs forall a. Maybe a
Nothing
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
b
withBoundedDelayBatchProducer ::
(MonadIO m, MonadUnliftIO m) =>
Int ->
Int ->
Producer p r ->
(Producer [p] r -> m a) ->
m a
withBoundedDelayBatchProducer :: forall (m :: * -> *) p r a.
(MonadIO m, MonadUnliftIO m) =>
Int -> Int -> Producer p r -> (Producer [p] r -> m a) -> m a
withBoundedDelayBatchProducer Int
maxDelay Int
maxItems Producer p r
sourceP Producer [p] r -> m a
f = do
UnliftIO {unliftIO :: forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO = forall a. m a -> IO a
unlift} <- forall (m :: * -> *). MonadUnliftIO m => m (UnliftIO m)
askUnliftIO
TQueue ()
flushes <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a. IO (TQueue a)
newTQueueIO
let producer :: ([p] -> f ()) -> f r
producer [p] -> f ()
writeBatch =
let beginReading :: f r
beginReading = Int -> [p] -> f r
readItems (forall a. Ord a => a -> a -> a
max Int
1 Int
maxItems) []
doPerformBatch :: [p] -> f ()
doPerformBatch [] = forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
doPerformBatch [p]
buf = [p] -> f ()
writeBatch (forall a. [a] -> [a]
reverse [p]
buf)
readItems :: Int -> [p] -> f r
readItems Int
0 [p]
buf = do
[p] -> f ()
doPerformBatch [p]
buf
f r
beginReading
readItems Int
bufferRemaining [p]
buf =
forall (m :: * -> *) a. MonadIO m => STM (m a) -> m a
joinSTM
( Msg p r -> f r
onQueueRead forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall p r. Producer p r -> STM (Msg p r)
producerQueueRead Producer p r
sourceP
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> f r
onFlush
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ forall a. TQueue a -> STM a
readTQueue TQueue ()
flushes
)
where
onQueueRead :: Msg p r -> f r
onQueueRead (Payload p
a) =
Int -> [p] -> f r
readItems (Int
bufferRemaining forall a. Num a => a -> a -> a
- Int
1) (p
a forall a. a -> [a] -> [a]
: [p]
buf)
onQueueRead (Close r
r) = do
[p] -> f ()
doPerformBatch [p]
buf
forall (f :: * -> *) a. Applicative f => a -> f a
pure r
r
onQueueRead (Exception SomeException
e) = do
[p] -> f ()
doPerformBatch [p]
buf
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e
onFlush :: f r
onFlush = do
[p] -> f ()
doPerformBatch [p]
buf
f r
beginReading
in f r
beginReading
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
forall a b. (a -> b) -> a -> b
$ forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync
( forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
Int -> IO ()
threadDelay Int
maxDelay
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TQueue a -> a -> STM ()
writeTQueue TQueue ()
flushes ()
)
forall a b. (a -> b) -> a -> b
$ \Async Any
_flusher -> forall a. m a -> IO a
unlift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) p r a.
MonadUnliftIO m =>
((p -> m ()) -> m r) -> (Producer p r -> m a) -> m a
withProducer forall {f :: * -> *}. MonadIO f => ([p] -> f ()) -> f r
producer Producer [p] r -> m a
f
syncer :: MonadIO m => (Syncing a -> m ()) -> m ()
syncer :: forall (m :: * -> *) a. MonadIO m => (Syncing a -> m ()) -> m ()
syncer Syncing a -> m ()
writer = do
TMVar (Maybe SomeException)
v <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a. IO (TMVar a)
newEmptyTMVarIO
Syncing a -> m ()
writer (forall a. (Maybe SomeException -> STM ()) -> Syncing a
Syncer forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe SomeException)
v)
Maybe SomeException
mexc <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> STM a
readTMVar TMVar (Maybe SomeException)
v
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe SomeException
mexc (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO)