{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- | Functions for concurrent mapping over Conduits.
module Data.Conduit.ConcurrentMap
  ( -- * Explicit number of threads
    concurrentMapM_
    -- * CPU-bound use case
  , concurrentMapM_numCaps
  ) where

import           Control.Monad (when)
import           Control.Monad.IO.Class (liftIO)
import           Control.Monad.IO.Unlift (MonadUnliftIO, UnliftIO, unliftIO, askUnliftIO)
import           Control.Monad.Trans (lift)
import           Control.Monad.Trans.Resource (MonadResource)
import           Data.Conduit (ConduitT, await, bracketP)
import qualified Data.Conduit as C
import           Data.Foldable (for_)
import           Data.Maybe (fromMaybe)
import           Data.Sequence (Seq, ViewL((:<)), (|>))
import qualified Data.Sequence as Seq
import           Data.Vector ((!))
import qualified Data.Vector as V
import           GHC.Conc (getNumCapabilities)
import           UnliftIO.MVar (MVar, newEmptyMVar, takeMVar, tryTakeMVar, putMVar)
import           UnliftIO.Async (Async, async, forConcurrently_, wait, link, uninterruptibleCancel)
import           UnliftIO.IORef (IORef, newIORef, readIORef, atomicModifyIORef')


atomicModifyIORef_' :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_' :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_' IORef a
ref a -> a
f = IORef a -> (a -> (a, ())) -> IO ()
forall (m :: * -> *) a b.
MonadIO m =>
IORef a -> (a -> (a, b)) -> m b
atomicModifyIORef' IORef a
ref ((a -> (a, ())) -> IO ()) -> (a -> (a, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
a -> (a -> a
f a
a, ())


seqUncons :: Seq a -> (Seq a, Maybe a)
seqUncons :: Seq a -> (Seq a, Maybe a)
seqUncons Seq a
s = case Seq a -> ViewL a
forall a. Seq a -> ViewL a
Seq.viewl Seq a
s of
  ViewL a
Seq.EmptyL -> (Seq a
s, Maybe a
forall a. Maybe a
Nothing)
  a
a :< Seq a
s'    -> (Seq a
s', a -> Maybe a
forall a. a -> Maybe a
Just a
a)


seqHeadMaybe :: Seq a -> Maybe a
seqHeadMaybe :: Seq a -> Maybe a
seqHeadMaybe Seq a
s = case Seq a -> ViewL a
forall a. Seq a -> ViewL a
Seq.viewl Seq a
s of
  ViewL a
Seq.EmptyL -> Maybe a
forall a. Maybe a
Nothing
  a
a :< Seq a
_     -> a -> Maybe a
forall a. a -> Maybe a
Just a
a


-- | @concurrentMapM_ numThreads workerOutputBufferSize f@
--
-- Concurrent, order-preserving conduit mapping function.
--
-- Like `Data.Conduit.mapM`, but runs in parallel with the given number of threads,
-- returns outputs in the order of inputs (like @mapM@, no reordering),
-- and allows defining a bounded size output buffer for elements of type @b@ to
-- maintain high parallelism despite head-of-line blocking.
--
-- Because of the no-reordering guarantee, there is head-of-line blocking:
-- When the conduit has to process a long-running computation and a short-running
-- computation in parallel, the result of short one cannot be yielded before
-- the long one is done.
-- Unless we buffer the queued result somewhere, the thread that finished the
-- short-running computation is now blocked and sits idle (low utilisation).
--
-- To cope with this, this function gives each
-- thread @workerOutputBufferSize@ output slots to store @b@s while they are blocked.
--
-- Use the convenience `concurrentMapM_numCaps` when @f@ is CPU-bound.
--
-- @workerOutputBufferSize@ must be given >= 1.
--
-- The @workerOutputBufferSize@ keeps the memory usage of the conduit bounded,
-- namely to @numThreads * (workerOutputBufferSize + 1)@ many @b@s at any
-- given time (the @+ 1@ is for the currently processing ones).
--
-- To achieve maximum parallelism/utilisation, you should choose
-- @workerOutputBufferSize@ ideally as the time factor between the fastest
-- and slowest @f@ that will likely pass through the conduit; for example,
-- if most @f@s take 3 seconds, but some take 15 seconds, choose
-- @workerOutputBufferSize = 5@ to avoid an earlier 15-second @f@ blocking
-- a later 3-second @f@.
--
-- The threads inside the conduit will evaluate the results of the @f@ to
-- WHNF, as in @!b <- f a@, so don't forget to make @f@ itself `deepseq` the
-- result if there is any lazy data structure involved and you want to make
-- sure that they are evaluated *inside* the conduit (fully in parallel)
-- as opposed to the lazy parts of them being evaluated after being yielded.
--
-- As @f@s happen concurrently, they cannot depend on each other's monadic
-- state. This is enforced by the `MonadUnliftIO` constraint.
-- This means the function cannot be used with e.g. `StateT`.
--
-- Properties:
--
-- * Ordering / head of line blocking for outputs: The `b`s will come out in
--   the same order as their corresponding `a`s came in (the parallelism
--   doesn't change the order).
-- * Bounded memory: The conduit will only hold to
--   @numThreads * (workerOutputBufferSize + 1)@ as many @b@s.
-- * High utilisation: The conduit will try to keep all cores busy as much as
--   it can. This means that after `await`ing an input, it will only block
--   to wait for an output from a worker thread if it has to because
--   we're at the `workerOutputBufferSize` output buffer bound of `b` elements.
--   (It may, however, `yield` even if the queue is not full.
--   Since `yield` will block the conduit's thread until downstream
--   conduits in the pipeline `await`, utilisation will be poor if other
--   conduits in the pipeline have low throughput.
--   This makes sense because a conduit pipeline's total throughput
--   is bottlenecked by the segment in the pipeline.)
--   It also ensures that any worker running for longer than others does not
--   prevent other free workers from starting new work, except from when
--   we're at the `workerOutputBufferSize` output buffer bound of `b` elements.
-- * Prompt starting: The conduit will start each `await`ed value immediately,
--   it will not batch up multiple `await`s before starting.
-- * Async exception safety: When then conduit is killed, the worker threads
--   will be killed too.
--
-- Example:
--
-- > puts :: (MonadIO m) => String -> m () -- for non-interleaved output
-- > puts s = liftIO $ BS8.putStrLn (BS8.pack s)
-- > runConduitRes (CL.sourceList [1..6] .| concurrentMapM_ 4 (\i -> liftIO $ puts (show i ++ " before") >> threadDelay (i * 1000000) >> puts (show i ++ " after") >> return (i*2)) .| CL.consume )
concurrentMapM_ :: (MonadUnliftIO m, MonadResource m) => Int -> Int -> (a -> m b) -> ConduitT a b m ()
concurrentMapM_ :: Int -> Int -> (a -> m b) -> ConduitT a b m ()
concurrentMapM_ Int
numThreads Int
workerOutputBufferSize a -> m b
f = do
  Bool -> ConduitT a b m () -> ConduitT a b m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
workerOutputBufferSize Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1) (ConduitT a b m () -> ConduitT a b m ())
-> ConduitT a b m () -> ConduitT a b m ()
forall a b. (a -> b) -> a -> b
$ do
    [Char] -> ConduitT a b m ()
forall a. HasCallStack => [Char] -> a
error ([Char] -> ConduitT a b m ()) -> [Char] -> ConduitT a b m ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Data.Conduit.Concurrent.concurrentMapM_ requires workerOutputBufferSize < 1, got " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
workerOutputBufferSize

  -- Diagram:
  --
  --    cyclic buffers with `workerOutputBufferSize` many slots {a,b,c,...} for each of N threads
  --                                               |
  --                            [ workerOutVar( 1 )a  workerOutVar( 1 )b  ... ] <- f  \
  -- -------------------------  [ workerOutVar( 2 )a  workerOutVar( 2 )b  ... ] <- f   \
  -- outQueue of workerOutVars                          ...                             - inVar
  -- -------------------------  [ workerOutVar(N-1)a  workerOutVar(N-1)b  ... ] <- f   /
  --                            [ workerOutVar(N  )a  workerOutVar(N  )b  ... ] <- f  /
  --                                                                                      o <- button to signal
  --                                                                                           inVarEnqueued
  --
  -- Any worker that's not busy is hanging onto `inVar`, grabbing
  -- its contents as soon as `inVar` is filled.
  -- The conduit ("foreman") `awaits` upstream work, and when it gets
  -- some, puts it into the `inVar`.
  -- When a worker manages to grab it, the worker immediately puts
  -- its `workerOutVar` onto the `outQueue`, and then presses the
  -- `inVarEnqueued` button to tell the foreman that it has completed
  -- taking the work and placing its `workerOutVar` onto the queue.
  -- The foreman will wait for the signal button to be pressed before
  -- continuing their job; this guarantees that the take-inVar-queue-workerOutVar
  -- action is atomic, which guarantees input order = output order.
  --
  -- As visible in the diagram, maximally N invocations of `f` can happen at
  -- the same time, and since the `workerOutVar`s are storage places for
  -- f's outputs (`b`), maximally N*workerOutputBufferSize many `b`s are are
  -- buffered in there while the workers are working.
  -- When all storage places are full, `f`s that finish processing
  -- block on putting their `b`s in, so there are maximally
  -- `N * (workerOutputBufferSize + 1)` many `b`s held alive
  -- by this function.
  --
  -- Note that as per this "+ 1" logic, for each worker there may up to 1
  -- `workerOutVar` that is in in the `outQueue` twice.
  -- For example, for `numThreads = 2` and `workerOutputBufferSize = 2`,
  -- we may have:
  --
  -- -------------------------  [ worker1OutVarSlotA  worker1OutVarSlotB ] <- f   \
  -- outQueue of workerOutVars                                                     - inVar
  -- -------------------------  [ worker2OutVarSlotA  worker2OutVarSlotB ] <- f   /
  --
  -- with an input conduit streaming elements
  --     [A, B, C, D]
  -- with processing times
  --     [9, 0, 0, 0]
  -- this may lead to an `outQueue` as follows:
  --
  --  +-----------------------------------+
  --  |                                   |
  --  V                                   |
  -- -------------------------  [ worker1OutVarSlot_a  worker1OutVarSlot_a ] <- f   \
  --  A  B  C                                                                        - inVar (containing element D)
  -- -------------------------  [ worker2OutVarSlot_b  worker2OutVarSlot_b ] <- f   /
  --     ^  ^                             |                    |
  --     +--|-----------------------------+                    |
  --        |                                                  |
  --        +--------------------------------------------------+
  --
  -- where worker 1 is still processing work item A, and worker 2 has just finished
  -- processing work items B and C.
  -- Now worker 2 is idle, pops element D as the next work item from the `inVar`,
  -- and enqueues enqueues MVar `worker2OutVarSlot_b` into `outQueue`,
  -- processes element D, and runs `putMVar worker2OutVarSlot_b (f D)`;
  -- it is at this time that worker 2 blocks until `worker2OutVarSlot_b`
  -- is emptied when the conduit `yield`s the result.
  -- Thus we have this situation:
  --
  --  +-----------------------------------+
  --  |                                   |
  --  V                                   |
  -- -------------------------  [ worker1OutVarSlot_a  worker1OutVarSlot_a ] <- f   \
  --  A  B  C  D                                                                     - inVar
  -- -------------------------  [ worker2OutVarSlot_b  worker2OutVarSlot_b ] <- f   /
  --     ^  ^  ^                          |  |                 |
  --     +--|--|--------------------------+  |                 |
  --        |  |                             |                 |
  --        +--|-----------------------------|-----------------+
  --           |                             |
  --           +-----------------------------+
  --
  -- It is thus NOT an invariant that every `outVar` is in the `outQueue` only once.
  --
  -- TODO: This whole design has producing the "+ 1" logic has a bit of an ugliness
  --       in that it's not possible to make each worker use at max 1 `b`; only
  --       2 or more `b`s are possible.
  --       The whole design might be simplified by changing it so that instead
  --       of each worker having a fixed number of `workerOutVar`s,
  --       workers make up new `workerOutVar`s on demand (enqueuing them
  --       into `outQueue` as before), and the conduit keeping track of
  --       how many work items are between `inVar` and being yielded
  --       (this is currently `numInQueue`), and ensuring that this number
  --       is < than some maximum number M (blocking on `takeMVar` of the
  --       front MVar in `outQueue` when the M limit is reached).
  MVar (Maybe a)
inVar         :: MVar (Maybe a)       <- ConduitT a b m (MVar (Maybe a))
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
  MVar ()
inVarEnqueued :: MVar ()              <- ConduitT a b m (MVar ())
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
  IORef (Seq (MVar b))
outQueueRef   :: IORef (Seq (MVar b)) <- Seq (MVar b) -> ConduitT a b m (IORef (Seq (MVar b)))
forall (m :: * -> *) a. MonadIO m => a -> m (IORef a)
newIORef Seq (MVar b)
forall a. Seq a
Seq.empty

  let putInVar :: Maybe a -> m ()
putInVar Maybe a
x = MVar (Maybe a) -> Maybe a -> m ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar (Maybe a)
inVar Maybe a
x

  let signal :: MVar () -> m ()
signal MVar ()
mv     = MVar () -> () -> m ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar ()
mv ()
  let waitForSignal :: MVar a -> ConduitT a b m a
waitForSignal = MVar a -> ConduitT a b m a
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar

  -- We use `MonadUnliftIO` to make `f` run in `IO` instead of `m`, so that
  -- we can use it in conduit `bracketP`'s IO-based resource acquisition
  -- function (where we have to spawn our workers to guarantee they shut down
  -- when somebody async-kills the conduit).
  UnliftIO m
u :: UnliftIO m <- m (UnliftIO m) -> ConduitT a b m (UnliftIO m)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m (UnliftIO m)
forall (m :: * -> *). MonadUnliftIO m => m (UnliftIO m)
askUnliftIO -- `lift` here brings us into `m`

  -- `spawnWorkers` uses `async` and thus MUST be run with interrupts disabled
  -- (e.g. as initialisation function of `bracket`) to be async exception safe.
  --
  -- Note `async` does not unmask, but `unliftIO u` will restore the original
  -- masking state (thus typically unmask).
  let spawnWorkers :: IO (Async ())
      spawnWorkers :: IO (Async ())
spawnWorkers = do
        Async ()
workersAsync <- IO () -> IO (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do -- see comment above for exception safety
          UnliftIO m -> forall a. m a -> IO a
forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO UnliftIO m
u (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do -- use `runInIO` to restore masking state
            [Int] -> (Int -> IO ()) -> IO ()
forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
f a -> (a -> m b) -> m ()
forConcurrently_ [Int
1..Int
numThreads] ((Int -> IO ()) -> IO ()) -> (Int -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
_i_worker -> do
              -- Each worker has `workerOutputBufferSize` many `workerOutVar`s
              -- in a ring buffer; until the shutdown signal is received, a worker
              -- loops to: grab an `a` from the `inVar`, pick its next `workerOutVar,
              -- put it into the `outQueue`, signal that it has atomically done these
              -- 2 actions, process `b <- f x`, and write the `b` to the `workerOutVar`.
              Vector (MVar b)
workerOutVars <- Int -> IO (MVar b) -> IO (Vector (MVar b))
forall (m :: * -> *) a. Monad m => Int -> m a -> m (Vector a)
V.replicateM Int
workerOutputBufferSize IO (MVar b)
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
              let loop :: Int -> IO ()
                  loop :: Int -> IO ()
loop !Int
i_outVarSlot = do

                    Maybe a
m'a <- MVar (Maybe a) -> IO (Maybe a)
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar (Maybe a)
inVar
                    case Maybe a
m'a of
                      Maybe a
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return () -- shutdown signal, worker quits
                      Just a
a -> do
                        let workerOutVar :: MVar b
workerOutVar = Vector (MVar b)
workerOutVars Vector (MVar b) -> Int -> MVar b
forall a. Vector a -> Int -> a
! Int
i_outVarSlot
                        IORef (Seq (MVar b)) -> (Seq (MVar b) -> Seq (MVar b)) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef_' IORef (Seq (MVar b))
outQueueRef (Seq (MVar b) -> MVar b -> Seq (MVar b)
forall a. Seq a -> a -> Seq a
|> MVar b
workerOutVar)
                        MVar () -> IO ()
forall (m :: * -> *). MonadIO m => MVar () -> m ()
signal MVar ()
inVarEnqueued
                        -- Important: Force WHNF here so that f gets evaluated inside the
                        -- worker; it's `f`'s job to decide whether to deepseq or not.
                        !b
b <- UnliftIO m -> m b -> IO b
forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO UnliftIO m
u (a -> m b
f a
a)
                        MVar b -> b -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar b
workerOutVar b
b
                        Int -> IO ()
loop ((Int
i_outVarSlot Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`rem` Int
workerOutputBufferSize)

              Int -> IO ()
loop Int
0

        Async () -> IO ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
workersAsync

        Async () -> IO (Async ())
forall (m :: * -> *) a. Monad m => a -> m a
return Async ()
workersAsync

  IO (Async ())
-> (Async () -> IO ())
-> (Async () -> ConduitT a b m ())
-> ConduitT a b m ()
forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
bracketP
    IO (Async ())
spawnWorkers
    (\Async ()
workersAsync -> Async () -> IO ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
uninterruptibleCancel Async ()
workersAsync)
    ((Async () -> ConduitT a b m ()) -> ConduitT a b m ())
-> (Async () -> ConduitT a b m ()) -> ConduitT a b m ()
forall a b. (a -> b) -> a -> b
$ \Async ()
workersAsync -> do

      let mustBeNonempty :: Maybe a -> a
mustBeNonempty = a -> Maybe a -> a
forall a. a -> Maybe a -> a
fromMaybe ([Char] -> a
forall a. HasCallStack => [Char] -> a
error [Char]
"Data.Conduit.Concurrent.concurrentMapM_: outQueue cannot be empty")

      let yieldQueueHead :: ConduitT i b m ()
yieldQueueHead = do
            MVar b
workerVar <- Maybe (MVar b) -> MVar b
forall a. Maybe a -> a
mustBeNonempty (Maybe (MVar b) -> MVar b)
-> ConduitT i b m (Maybe (MVar b)) -> ConduitT i b m (MVar b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
              IORef (Seq (MVar b))
-> (Seq (MVar b) -> (Seq (MVar b), Maybe (MVar b)))
-> ConduitT i b m (Maybe (MVar b))
forall (m :: * -> *) a b.
MonadIO m =>
IORef a -> (a -> (a, b)) -> m b
atomicModifyIORef' IORef (Seq (MVar b))
outQueueRef Seq (MVar b) -> (Seq (MVar b), Maybe (MVar b))
forall a. Seq a -> (Seq a, Maybe a)
seqUncons

            b
b <- MVar b -> ConduitT i b m b
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar b
workerVar
            b -> ConduitT i b m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield b
b

      let tryYieldQueueHead :: ConduitT i b m Bool
tryYieldQueueHead = do
            Maybe (MVar b)
m'workerVar <- Seq (MVar b) -> Maybe (MVar b)
forall a. Seq a -> Maybe a
seqHeadMaybe (Seq (MVar b) -> Maybe (MVar b))
-> ConduitT i b m (Seq (MVar b)) -> ConduitT i b m (Maybe (MVar b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Seq (MVar b)) -> ConduitT i b m (Seq (MVar b))
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef IORef (Seq (MVar b))
outQueueRef
            case Maybe (MVar b)
m'workerVar of
              Maybe (MVar b)
Nothing -> Bool -> ConduitT i b m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
              Just MVar b
workerVar -> do

                Maybe b
m'b <- MVar b -> ConduitT i b m (Maybe b)
forall (m :: * -> *) a. MonadIO m => MVar a -> m (Maybe a)
tryTakeMVar MVar b
workerVar

                case Maybe b
m'b of
                  Maybe b
Nothing -> Bool -> ConduitT i b m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
                  Just b
b -> do
                    MVar b
_ <- Maybe (MVar b) -> MVar b
forall a. Maybe a -> a
mustBeNonempty (Maybe (MVar b) -> MVar b)
-> ConduitT i b m (Maybe (MVar b)) -> ConduitT i b m (MVar b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Seq (MVar b))
-> (Seq (MVar b) -> (Seq (MVar b), Maybe (MVar b)))
-> ConduitT i b m (Maybe (MVar b))
forall (m :: * -> *) a b.
MonadIO m =>
IORef a -> (a -> (a, b)) -> m b
atomicModifyIORef' IORef (Seq (MVar b))
outQueueRef Seq (MVar b) -> (Seq (MVar b), Maybe (MVar b))
forall a. Seq a -> (Seq a, Maybe a)
seqUncons
                    b -> ConduitT i b m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield b
b
                    Bool -> ConduitT i b m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True


      -- There are 3 phases in the life of this conduit, which happen subsequentially:
      -- 1) Ramp-up phase,
      --      while we've received less inputs than we have `numThreads`.
      --      We remember how many elements were received (`numWorkersRampedUp`).
      -- 2) Cruise phase,
      --      during which we always have at least `numWorkersRampedUp` many
      --      `workerOutVar`s in the output queue (this is an invariant).
      --      At all times `numInQueue` keeps track of how many work units
      --      are under processing (that is, are after being read off the `inVar`
      --      and before being read off an `outVar`;
      --      so <= `N * (workerOutputBufferSize + 1)` many).
      --      Cruise phase doesn't happen if the conduit terminates before
      --      `numThreads` elements are awaited.
      -- 3) Drain phase,
      --      in which we drain off the `numInQueue` elements in the queue,
      --      send all workers the stop signal and wait for their orderly termination.

      let loop :: Int -> Int -> ConduitT a b m ()
          loop :: Int -> Int -> ConduitT a b m ()
loop Int
numWorkersRampedUp Int
numInQueue = do

            ConduitT a b m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a b m (Maybe a)
-> (Maybe a -> ConduitT a b m ()) -> ConduitT a b m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Maybe a
Nothing -> do -- Drain phase: Upstream conduit is done.
                [Int] -> (Int -> ConduitT a b m ()) -> ConduitT a b m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [Int
1..Int
numInQueue] ((Int -> ConduitT a b m ()) -> ConduitT a b m ())
-> (Int -> ConduitT a b m ()) -> ConduitT a b m ()
forall a b. (a -> b) -> a -> b
$ \Int
_ -> do
                  ConduitT a b m ()
forall i. ConduitT i b m ()
yieldQueueHead -- Drain the queue.
                [Int] -> (Int -> ConduitT a b m ()) -> ConduitT a b m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [Int
1..Int
numThreads] ((Int -> ConduitT a b m ()) -> ConduitT a b m ())
-> (Int -> ConduitT a b m ()) -> ConduitT a b m ()
forall a b. (a -> b) -> a -> b
$ \Int
_ -> do
                  Maybe a -> ConduitT a b m ()
forall (m :: * -> *). MonadIO m => Maybe a -> m ()
putInVar Maybe a
forall a. Maybe a
Nothing -- tell all workers to finish.
                Async () -> ConduitT a b m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m a
wait Async ()
workersAsync -- wait for workers to shut down

              Just a
a
                | Int
numWorkersRampedUp Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
numThreads -> do
                    -- Ramp-up phase: This branch is taken until all `numThreads`
                    -- are doing something or the upstream conduit is done;
                    -- after that it is never taken again.
                    Maybe a -> ConduitT a b m ()
forall (m :: * -> *). MonadIO m => Maybe a -> m ()
putInVar (a -> Maybe a
forall a. a -> Maybe a
Just a
a) ConduitT a b m () -> ConduitT a b m () -> ConduitT a b m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar () -> ConduitT a b m ()
forall a. MVar a -> ConduitT a b m a
waitForSignal MVar ()
inVarEnqueued
                    Int -> Int -> ConduitT a b m ()
loop (Int
numWorkersRampedUp Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Int
numInQueue Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)

                | Bool
otherwise -> do
                    -- Cruise phase:

                    Maybe a -> ConduitT a b m ()
forall (m :: * -> *). MonadIO m => Maybe a -> m ()
putInVar (a -> Maybe a
forall a. a -> Maybe a
Just a
a) ConduitT a b m () -> ConduitT a b m () -> ConduitT a b m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar () -> ConduitT a b m ()
forall a. MVar a -> ConduitT a b m a
waitForSignal MVar ()
inVarEnqueued
                    -- `waitForSignal` will not block forever because at least the worker
                    -- in the head of `outQueue` will always be able to take the value:
                    -- Either:
                    -- 1. it is currently running `f`, in which case its `workerOutVar`
                    --    is empty, it will eventually write the `b` into it, and then
                    --    be ready to take the `inVar`.
                    -- 2. or it has already done that and is currently doing `takeMVar invar`
                    --
                    -- At the time `waitForSignal inVarEnqueued` completes, we know
                    -- that there is a `workerOutVar` in the `outQueue` we can wait for.
                    --
                    -- If it was indeed the `workerOutVar` of the head worker,
                    -- Then we will take that `workerOutVar` below below, to restoring
                    -- the above invariant for the next head worker.

                    let numInQueueAfterEnqueued :: Int
numInQueueAfterEnqueued = Int
numInQueue Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1

                    let popAsManyAsPossible :: Int -> ConduitT i b m Int
popAsManyAsPossible !Int
remainingInQueue
                          | Int
remainingInQueue Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
numWorkersRampedUp = [Char] -> ConduitT i b m Int
forall a. HasCallStack => [Char] -> a
error [Char]
"Data.Conduit.Concurrent.concurrentMapM_: remainingInQueue < numWorkersRampedUp"
                          | Int
remainingInQueue Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
numWorkersRampedUp = Int -> ConduitT i b m Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
remainingInQueue
                          | Bool
otherwise = do
                              Bool
popped <- ConduitT i b m Bool
forall i. ConduitT i b m Bool
tryYieldQueueHead
                              if Bool -> Bool
not Bool
popped
                                then Int -> ConduitT i b m Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
remainingInQueue
                                else Int -> ConduitT i b m Int
popAsManyAsPossible (Int
remainingInQueue Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)

                    Int
remainingInQueue <- Int -> ConduitT a b m Int
forall i. Int -> ConduitT i b m Int
popAsManyAsPossible Int
numInQueueAfterEnqueued
                    Int -> Int -> ConduitT a b m ()
loop Int
numWorkersRampedUp Int
remainingInQueue
      Int -> Int -> ConduitT a b m ()
loop Int
0 Int
0


-- | `concurrentMapM_` with the number of threads set to `getNumCapabilities`.
--
-- Useful when `f` is CPU-bound.
--
-- If `f` is IO-bound, you probably want to use `concurrentMapM_` with
-- explicitly given amount of threads instead.
concurrentMapM_numCaps :: (MonadUnliftIO m, MonadResource m) => Int -> (a -> m b) -> ConduitT a b m ()
concurrentMapM_numCaps :: Int -> (a -> m b) -> ConduitT a b m ()
concurrentMapM_numCaps Int
workerOutputBufferSize a -> m b
f = do
  Int
numCaps <- IO Int -> ConduitT a b m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Int
getNumCapabilities
  Int -> Int -> (a -> m b) -> ConduitT a b m ()
forall (m :: * -> *) a b.
(MonadUnliftIO m, MonadResource m) =>
Int -> Int -> (a -> m b) -> ConduitT a b m ()
concurrentMapM_ Int
numCaps Int
workerOutputBufferSize a -> m b
f