{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Hercules.Agent.Producer where

import Control.Applicative
import Control.Concurrent hiding (throwTo)
import Control.Concurrent.Async hiding (cancel)
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.IO.Unlift
import Control.Monad.State
import Data.Foldable
import Data.Traversable
import UnliftIO.Exception
import Prelude

-- | A thread producing zero or more payloads and a final value.
-- Handles exception propagation.
data Producer p r = Producer
  { forall p r. Producer p r -> STM (Msg p r)
producerQueueRead :: STM (Msg p r),
    forall p r. Producer p r -> ThreadId
producerThread :: ThreadId
  }
  deriving ((forall a b. (a -> b) -> Producer p a -> Producer p b)
-> (forall a b. a -> Producer p b -> Producer p a)
-> Functor (Producer p)
forall a b. a -> Producer p b -> Producer p a
forall a b. (a -> b) -> Producer p a -> Producer p b
forall p a b. a -> Producer p b -> Producer p a
forall p a b. (a -> b) -> Producer p a -> Producer p b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall p a b. (a -> b) -> Producer p a -> Producer p b
fmap :: forall a b. (a -> b) -> Producer p a -> Producer p b
$c<$ :: forall p a b. a -> Producer p b -> Producer p a
<$ :: forall a b. a -> Producer p b -> Producer p a
Functor)

data ProducerCancelled = ProducerCancelled
  deriving (Int -> ProducerCancelled -> ShowS
[ProducerCancelled] -> ShowS
ProducerCancelled -> String
(Int -> ProducerCancelled -> ShowS)
-> (ProducerCancelled -> String)
-> ([ProducerCancelled] -> ShowS)
-> Show ProducerCancelled
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> ProducerCancelled -> ShowS
showsPrec :: Int -> ProducerCancelled -> ShowS
$cshow :: ProducerCancelled -> String
show :: ProducerCancelled -> String
$cshowList :: [ProducerCancelled] -> ShowS
showList :: [ProducerCancelled] -> ShowS
Show, Show ProducerCancelled
Typeable ProducerCancelled
Typeable ProducerCancelled
-> Show ProducerCancelled
-> (ProducerCancelled -> SomeException)
-> (SomeException -> Maybe ProducerCancelled)
-> (ProducerCancelled -> String)
-> Exception ProducerCancelled
SomeException -> Maybe ProducerCancelled
ProducerCancelled -> String
ProducerCancelled -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
$ctoException :: ProducerCancelled -> SomeException
toException :: ProducerCancelled -> SomeException
$cfromException :: SomeException -> Maybe ProducerCancelled
fromException :: SomeException -> Maybe ProducerCancelled
$cdisplayException :: ProducerCancelled -> String
displayException :: ProducerCancelled -> String
Exception, Typeable)

data Msg p r
  = -- | One of possibly many payloads from the producer
    Payload p
  | -- | The producer stopped due to an exception
    Exception SomeException
  | -- | The producer was done and produced a final value
    Close r
  deriving ((forall a b. (a -> b) -> Msg p a -> Msg p b)
-> (forall a b. a -> Msg p b -> Msg p a) -> Functor (Msg p)
forall a b. a -> Msg p b -> Msg p a
forall a b. (a -> b) -> Msg p a -> Msg p b
forall p a b. a -> Msg p b -> Msg p a
forall p a b. (a -> b) -> Msg p a -> Msg p b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall p a b. (a -> b) -> Msg p a -> Msg p b
fmap :: forall a b. (a -> b) -> Msg p a -> Msg p b
$c<$ :: forall p a b. a -> Msg p b -> Msg p a
<$ :: forall a b. a -> Msg p b -> Msg p a
Functor)

-- | @forkProducer f@ produces a computation that forks a thread for @f@, which
-- receives a function for returning payloads @p@.
--
-- @f@ may produce a final result value @r@ when it is done.
forkProducer :: forall m p r. (MonadUnliftIO m) => ((p -> m ()) -> m r) -> m (Producer p r)
forkProducer :: forall (m :: * -> *) p r.
MonadUnliftIO m =>
((p -> m ()) -> m r) -> m (Producer p r)
forkProducer (p -> m ()) -> m r
f = do
  TQueue (Msg p r)
q <- IO (TQueue (Msg p r)) -> m (TQueue (Msg p r))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TQueue (Msg p r))
forall a. IO (TQueue a)
newTQueueIO
  let write :: (MonadIO m') => Msg p r -> m' ()
      write :: forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write = IO () -> m' ()
forall a. IO a -> m' a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m' ()) -> (Msg p r -> IO ()) -> Msg p r -> m' ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (Msg p r -> STM ()) -> Msg p r -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TQueue (Msg p r) -> Msg p r -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Msg p r)
q
  IO r
f' <- m r -> m (IO r)
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (IO a)
toIO ((p -> m ()) -> m r
f (Msg p r -> m ()
forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write (Msg p r -> m ()) -> (p -> Msg p r) -> p -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. p -> Msg p r
forall p r. p -> Msg p r
Payload))
  ThreadId
t <- IO ThreadId -> m ThreadId
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ThreadId -> m ThreadId) -> IO ThreadId -> m ThreadId
forall a b. (a -> b) -> a -> b
$ IO r -> (Either SomeException r -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally IO r
f' (Msg p r -> IO ()
forall (m' :: * -> *). MonadIO m' => Msg p r -> m' ()
write (Msg p r -> IO ())
-> (Either SomeException r -> Msg p r)
-> Either SomeException r
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Either SomeException r -> Msg p r
forall {r} {p}. Either SomeException r -> Msg p r
toResult)
  Producer p r -> m (Producer p r)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Producer p r -> m (Producer p r))
-> Producer p r -> m (Producer p r)
forall a b. (a -> b) -> a -> b
$ Producer {producerQueueRead :: STM (Msg p r)
producerQueueRead = TQueue (Msg p r) -> STM (Msg p r)
forall a. TQueue a -> STM a
readTQueue TQueue (Msg p r)
q, producerThread :: ThreadId
producerThread = ThreadId
t}
  where
    toResult :: Either SomeException r -> Msg p r
toResult (Left SomeException
e) = SomeException -> Msg p r
forall p r. SomeException -> Msg p r
Exception SomeException
e
    toResult (Right r
r) = r -> Msg p r
forall p r. r -> Msg p r
Close r
r

-- | Throws 'ProducerCancelled' as an async exception to the producer thread.
-- Blocks until exception is raised. See 'throwTo'.
cancel :: (MonadIO m) => Producer p r -> m ()
cancel :: forall (m :: * -> *) p r. MonadIO m => Producer p r -> m ()
cancel Producer p r
p = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> ProducerCancelled -> IO ()
forall e (m :: * -> *).
(Exception e, MonadIO m) =>
ThreadId -> e -> m ()
throwTo (Producer p r -> ThreadId
forall p r. Producer p r -> ThreadId
producerThread Producer p r
p) ProducerCancelled
ProducerCancelled

-- | Perform an computation while @withProducer@ takes care of forking and cleaning up.
--
-- @withProducer (\write -> write "a" >> write "b") $ \producer -> consume producer@
withProducer ::
  (MonadUnliftIO m) =>
  ((p -> m ()) -> m r) ->
  (Producer p r -> m a) ->
  m a
withProducer :: forall (m :: * -> *) p r a.
MonadUnliftIO m =>
((p -> m ()) -> m r) -> (Producer p r -> m a) -> m a
withProducer (p -> m ()) -> m r
f = m (Producer p r)
-> (Producer p r -> m ()) -> (Producer p r -> m a) -> m a
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket (((p -> m ()) -> m r) -> m (Producer p r)
forall (m :: * -> *) p r.
MonadUnliftIO m =>
((p -> m ()) -> m r) -> m (Producer p r)
forkProducer (p -> m ()) -> m r
f) Producer p r -> m ()
forall (m :: * -> *) p r. MonadIO m => Producer p r -> m ()
cancel

listen ::
  (MonadIO m) =>
  Producer p r ->
  (p -> m a) ->
  (r -> m a) ->
  STM (m a)
listen :: forall (m :: * -> *) p r a.
MonadIO m =>
Producer p r -> (p -> m a) -> (r -> m a) -> STM (m a)
listen Producer p r
p p -> m a
fPayload r -> m a
fResult =
  (Msg p r -> m a) -> STM (Msg p r) -> STM (m a)
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Msg p r -> m a
f (Producer p r -> STM (Msg p r)
forall p r. Producer p r -> STM (Msg p r)
producerQueueRead Producer p r
p)
  where
    f :: Msg p r -> m a
f (Payload p
payload) = p -> m a
fPayload p
payload
    f (Exception SomeException
e) = SomeException -> m a
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e
    f (Close r
r) = r -> m a
fResult r
r

joinSTM :: (MonadIO m) => STM (m a) -> m a
joinSTM :: forall (m :: * -> *) a. MonadIO m => STM (m a) -> m a
joinSTM = m (m a) -> m a
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m a) -> m a) -> (STM (m a) -> m (m a)) -> STM (m a) -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (m a) -> m (m a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (m a) -> m (m a))
-> (STM (m a) -> IO (m a)) -> STM (m a) -> m (m a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (m a) -> IO (m a)
forall a. STM a -> IO a
atomically

data Syncing a = Syncable a | Syncer (Maybe SomeException -> STM ())

-- | Sends sync notifications after the whole computation succeeds (or fails)
-- Note: not exception safe in the presence of pure exceptions.
withSync ::
  (MonadUnliftIO m, Traversable t) =>
  t (Syncing a) ->
  (t (Maybe a) -> m b) ->
  m b
withSync :: forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
t (Syncing a) -> (t (Maybe a) -> m b) -> m b
withSync t (Syncing a)
t t (Maybe a) -> m b
f = do
  let (t (Maybe a)
t', Maybe SomeException -> STM ()
syncs) =
        State (Maybe SomeException -> STM ()) (t (Maybe a))
-> (Maybe SomeException -> STM ())
-> (t (Maybe a), Maybe SomeException -> STM ())
forall s a. State s a -> s -> (a, s)
runState
          ( t (Syncing a)
-> (Syncing a
    -> StateT (Maybe SomeException -> STM ()) Identity (Maybe a))
-> State (Maybe SomeException -> STM ()) (t (Maybe a))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for t (Syncing a)
t ((Syncing a
  -> StateT (Maybe SomeException -> STM ()) Identity (Maybe a))
 -> State (Maybe SomeException -> STM ()) (t (Maybe a)))
-> (Syncing a
    -> StateT (Maybe SomeException -> STM ()) Identity (Maybe a))
-> State (Maybe SomeException -> STM ()) (t (Maybe a))
forall a b. (a -> b) -> a -> b
$ \case
              Syncable a
a -> Maybe a
-> StateT (Maybe SomeException -> STM ()) Identity (Maybe a)
forall a. a -> StateT (Maybe SomeException -> STM ()) Identity a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
              Syncer Maybe SomeException -> STM ()
s -> Maybe a
forall a. Maybe a
Nothing Maybe a
-> StateT (Maybe SomeException -> STM ()) Identity ()
-> StateT (Maybe SomeException -> STM ()) Identity (Maybe a)
forall a b.
a
-> StateT (Maybe SomeException -> STM ()) Identity b
-> StateT (Maybe SomeException -> STM ()) Identity a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ ((Maybe SomeException -> STM ()) -> Maybe SomeException -> STM ())
-> StateT (Maybe SomeException -> STM ()) Identity ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify ((Maybe SomeException -> STM ())
-> (Maybe SomeException -> STM ()) -> Maybe SomeException -> STM ()
forall a b.
(Maybe SomeException -> a)
-> (Maybe SomeException -> b) -> Maybe SomeException -> b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Maybe SomeException -> STM ()
s)
          )
          (\Maybe SomeException
_ -> () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
  b
b <- t (Maybe a) -> m b
f t (Maybe a)
t' m b -> (SomeException -> m ()) -> m b
forall (m :: * -> *) e a b.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m b) -> m a
`withException` (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (SomeException -> IO ()) -> SomeException -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ())
-> (SomeException -> STM ()) -> SomeException -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe SomeException -> STM ()
syncs (Maybe SomeException -> STM ())
-> (SomeException -> Maybe SomeException)
-> SomeException
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just)
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe SomeException -> STM ()
syncs Maybe SomeException
forall a. Maybe a
Nothing
  b -> m b
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
b

--  where trav =
--  deriving (Functor)
-- instance Applicative Syncing where
--  pure = Synced Nothing
--  Synced sf f <*> Synced af a = Synced (sf <> af) (f a)

-- Potential improvements:
--  - Performance: Get rid of the producer thread by doing the batching in STM.
--                 (pinging thread still required)
--  - Performance: Multiple elements as input
--  - Idle footprint: The pinger can be made to wait for the queue to be non-empty before starting the delay.
--     - Add a tryPeek function to Producer
--     - Make sure it is not woken up after the queue has become non-empty
--     - Alternatively, maybe use stm-delay (which uses GHC.Event for efficiency)
--       https://hackage.haskell.org/package/stm-delay-0.1.1.1/docs/Control-Concurrent-STM-Delay.html
withBoundedDelayBatchProducer ::
  (MonadUnliftIO m) =>
  -- | Max time before flushing in microseconds
  Int ->
  -- | Max number of items in batch
  Int ->
  Producer p r ->
  (Producer [p] r -> m a) ->
  m a
withBoundedDelayBatchProducer :: forall (m :: * -> *) p r a.
MonadUnliftIO m =>
Int -> Int -> Producer p r -> (Producer [p] r -> m a) -> m a
withBoundedDelayBatchProducer Int
maxDelay Int
maxItems Producer p r
sourceP Producer [p] r -> m a
f = do
  UnliftIO {unliftIO :: forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO = forall a. m a -> IO a
unlift} <- m (UnliftIO m)
forall (m :: * -> *). MonadUnliftIO m => m (UnliftIO m)
askUnliftIO
  TQueue ()
flushes <- IO (TQueue ()) -> m (TQueue ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TQueue ())
forall a. IO (TQueue a)
newTQueueIO
  let producer :: ([p] -> f ()) -> f r
producer [p] -> f ()
writeBatch =
        let beginReading :: f r
beginReading = Int -> [p] -> f r
readItems (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 Int
maxItems) []
            doPerformBatch :: [p] -> f ()
doPerformBatch [] = () -> f ()
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            doPerformBatch [p]
buf = [p] -> f ()
writeBatch ([p] -> [p]
forall a. [a] -> [a]
reverse [p]
buf)
            readItems :: Int -> [p] -> f r
readItems Int
0 [p]
buf = do
              -- logLocM DebugS "batch on full"
              [p] -> f ()
doPerformBatch [p]
buf
              f r
beginReading
            readItems Int
bufferRemaining [p]
buf =
              STM (f r) -> f r
forall (m :: * -> *) a. MonadIO m => STM (m a) -> m a
joinSTM
                ( Msg p r -> f r
onQueueRead (Msg p r -> f r) -> STM (Msg p r) -> STM (f r)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Producer p r -> STM (Msg p r)
forall p r. Producer p r -> STM (Msg p r)
producerQueueRead Producer p r
sourceP
                    STM (f r) -> STM (f r) -> STM (f r)
forall a. STM a -> STM a -> STM a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> f r
onFlush
                      f r -> STM () -> STM (f r)
forall a b. a -> STM b -> STM a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ TQueue () -> STM ()
forall a. TQueue a -> STM a
readTQueue TQueue ()
flushes
                )
              where
                onQueueRead :: Msg p r -> f r
onQueueRead (Payload p
a) =
                  Int -> [p] -> f r
readItems (Int
bufferRemaining Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) (p
a p -> [p] -> [p]
forall a. a -> [a] -> [a]
: [p]
buf)
                onQueueRead (Close r
r) = do
                  -- logLocM DebugS $ "batch on close: " <> logStr (show (length buf))
                  [p] -> f ()
doPerformBatch [p]
buf
                  r -> f r
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure r
r
                onQueueRead (Exception SomeException
e) = do
                  -- logLocM DebugS $ "batch on exception: " <> logStr (show (length buf))
                  [p] -> f ()
doPerformBatch [p]
buf
                  IO r -> f r
forall a. IO a -> f a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO r -> f r) -> IO r -> f r
forall a b. (a -> b) -> a -> b
$ SomeException -> IO r
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e
                onFlush :: f r
onFlush = do
                  -- logLocM DebugS $ "batch on flush: " <> logStr (show (length buf))
                  [p] -> f ()
doPerformBatch [p]
buf
                  f r
beginReading
         in f r
beginReading
  IO a -> m a
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ IO Any -> (Async Any -> IO a) -> IO a
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync
      ( IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
          Int -> IO ()
threadDelay Int
maxDelay
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue () -> () -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue ()
flushes ()
      )
    ((Async Any -> IO a) -> IO a) -> (Async Any -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Async Any
_flusher -> m a -> IO a
forall a. m a -> IO a
unlift (m a -> IO a) -> m a -> IO a
forall a b. (a -> b) -> a -> b
$ (([p] -> m ()) -> m r) -> (Producer [p] r -> m a) -> m a
forall (m :: * -> *) p r a.
MonadUnliftIO m =>
((p -> m ()) -> m r) -> (Producer p r -> m a) -> m a
withProducer ([p] -> m ()) -> m r
forall {f :: * -> *}. MonadIO f => ([p] -> f ()) -> f r
producer Producer [p] r -> m a
f

syncer :: (MonadIO m) => (Syncing a -> m ()) -> m ()
syncer :: forall (m :: * -> *) a. MonadIO m => (Syncing a -> m ()) -> m ()
syncer Syncing a -> m ()
writer = do
  TMVar (Maybe SomeException)
v <- IO (TMVar (Maybe SomeException)) -> m (TMVar (Maybe SomeException))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TMVar (Maybe SomeException))
forall a. IO (TMVar a)
newEmptyTMVarIO
  Syncing a -> m ()
writer ((Maybe SomeException -> STM ()) -> Syncing a
forall a. (Maybe SomeException -> STM ()) -> Syncing a
Syncer ((Maybe SomeException -> STM ()) -> Syncing a)
-> (Maybe SomeException -> STM ()) -> Syncing a
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe SomeException) -> Maybe SomeException -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe SomeException)
v)
  Maybe SomeException
mexc <- IO (Maybe SomeException) -> m (Maybe SomeException)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe SomeException) -> m (Maybe SomeException))
-> IO (Maybe SomeException) -> m (Maybe SomeException)
forall a b. (a -> b) -> a -> b
$ STM (Maybe SomeException) -> IO (Maybe SomeException)
forall a. STM a -> IO a
atomically (STM (Maybe SomeException) -> IO (Maybe SomeException))
-> STM (Maybe SomeException) -> IO (Maybe SomeException)
forall a b. (a -> b) -> a -> b
$ TMVar (Maybe SomeException) -> STM (Maybe SomeException)
forall a. TMVar a -> STM a
readTMVar TMVar (Maybe SomeException)
v
  Maybe SomeException -> (SomeException -> m Any) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe SomeException
mexc (IO Any -> m Any
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Any -> m Any)
-> (SomeException -> IO Any) -> SomeException -> m Any
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeException -> IO Any
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO)