{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Data.Conduit.ConcurrentMap
(
concurrentMapM_
, concurrentMapM_numCaps
) where
import Control.Monad (when)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.IO.Unlift (MonadUnliftIO, UnliftIO, unliftIO, askUnliftIO)
import Control.Monad.Trans (lift)
import Control.Monad.Trans.Resource (MonadResource)
import Data.Conduit (ConduitT, await, bracketP)
import qualified Data.Conduit as C
import Data.Foldable (for_)
import Data.Maybe (fromMaybe)
import Data.Sequence (Seq, ViewL((:<)), (|>))
import qualified Data.Sequence as Seq
import Data.Vector ((!))
import qualified Data.Vector as V
import GHC.Conc (getNumCapabilities)
import UnliftIO.MVar (MVar, newEmptyMVar, takeMVar, tryTakeMVar, putMVar)
import UnliftIO.Async (Async, async, forConcurrently_, wait, link, uninterruptibleCancel)
import UnliftIO.IORef (IORef, newIORef, readIORef, atomicModifyIORef')
atomicModifyIORef_' :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_' :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_' IORef a
ref a -> a
f = IORef a -> (a -> (a, ())) -> IO ()
forall (m :: * -> *) a b.
MonadIO m =>
IORef a -> (a -> (a, b)) -> m b
atomicModifyIORef' IORef a
ref ((a -> (a, ())) -> IO ()) -> (a -> (a, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
a -> (a -> a
f a
a, ())
seqUncons :: Seq a -> (Seq a, Maybe a)
seqUncons :: Seq a -> (Seq a, Maybe a)
seqUncons Seq a
s = case Seq a -> ViewL a
forall a. Seq a -> ViewL a
Seq.viewl Seq a
s of
ViewL a
Seq.EmptyL -> (Seq a
s, Maybe a
forall a. Maybe a
Nothing)
a
a :< Seq a
s' -> (Seq a
s', a -> Maybe a
forall a. a -> Maybe a
Just a
a)
seqHeadMaybe :: Seq a -> Maybe a
seqHeadMaybe :: Seq a -> Maybe a
seqHeadMaybe Seq a
s = case Seq a -> ViewL a
forall a. Seq a -> ViewL a
Seq.viewl Seq a
s of
ViewL a
Seq.EmptyL -> Maybe a
forall a. Maybe a
Nothing
a
a :< Seq a
_ -> a -> Maybe a
forall a. a -> Maybe a
Just a
a
concurrentMapM_ :: (MonadUnliftIO m, MonadResource m) => Int -> Int -> (a -> m b) -> ConduitT a b m ()
concurrentMapM_ :: Int -> Int -> (a -> m b) -> ConduitT a b m ()
concurrentMapM_ Int
numThreads Int
workerOutputBufferSize a -> m b
f = do
Bool -> ConduitT a b m () -> ConduitT a b m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
workerOutputBufferSize Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1) (ConduitT a b m () -> ConduitT a b m ())
-> ConduitT a b m () -> ConduitT a b m ()
forall a b. (a -> b) -> a -> b
$ do
[Char] -> ConduitT a b m ()
forall a. HasCallStack => [Char] -> a
error ([Char] -> ConduitT a b m ()) -> [Char] -> ConduitT a b m ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Data.Conduit.Concurrent.concurrentMapM_ requires workerOutputBufferSize < 1, got " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
workerOutputBufferSize
MVar (Maybe a)
inVar :: MVar (Maybe a) <- ConduitT a b m (MVar (Maybe a))
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
MVar ()
inVarEnqueued :: MVar () <- ConduitT a b m (MVar ())
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
IORef (Seq (MVar b))
outQueueRef :: IORef (Seq (MVar b)) <- Seq (MVar b) -> ConduitT a b m (IORef (Seq (MVar b)))
forall (m :: * -> *) a. MonadIO m => a -> m (IORef a)
newIORef Seq (MVar b)
forall a. Seq a
Seq.empty
let putInVar :: Maybe a -> m ()
putInVar Maybe a
x = MVar (Maybe a) -> Maybe a -> m ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar (Maybe a)
inVar Maybe a
x
let signal :: MVar () -> m ()
signal MVar ()
mv = MVar () -> () -> m ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar ()
mv ()
let waitForSignal :: MVar a -> ConduitT a b m a
waitForSignal = MVar a -> ConduitT a b m a
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar
UnliftIO m
u :: UnliftIO m <- m (UnliftIO m) -> ConduitT a b m (UnliftIO m)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m (UnliftIO m)
forall (m :: * -> *). MonadUnliftIO m => m (UnliftIO m)
askUnliftIO
let spawnWorkers :: IO (Async ())
spawnWorkers :: IO (Async ())
spawnWorkers = do
Async ()
workersAsync <- IO () -> IO (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
UnliftIO m -> forall a. m a -> IO a
forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO UnliftIO m
u (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
[Int] -> (Int -> IO ()) -> IO ()
forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
f a -> (a -> m b) -> m ()
forConcurrently_ [Int
1..Int
numThreads] ((Int -> IO ()) -> IO ()) -> (Int -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
_i_worker -> do
Vector (MVar b)
workerOutVars <- Int -> IO (MVar b) -> IO (Vector (MVar b))
forall (m :: * -> *) a. Monad m => Int -> m a -> m (Vector a)
V.replicateM Int
workerOutputBufferSize IO (MVar b)
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
let loop :: Int -> IO ()
loop :: Int -> IO ()
loop !Int
i_outVarSlot = do
Maybe a
m'a <- MVar (Maybe a) -> IO (Maybe a)
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar (Maybe a)
inVar
case Maybe a
m'a of
Maybe a
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just a
a -> do
let workerOutVar :: MVar b
workerOutVar = Vector (MVar b)
workerOutVars Vector (MVar b) -> Int -> MVar b
forall a. Vector a -> Int -> a
! Int
i_outVarSlot
IORef (Seq (MVar b)) -> (Seq (MVar b) -> Seq (MVar b)) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef_' IORef (Seq (MVar b))
outQueueRef (Seq (MVar b) -> MVar b -> Seq (MVar b)
forall a. Seq a -> a -> Seq a
|> MVar b
workerOutVar)
MVar () -> IO ()
forall (m :: * -> *). MonadIO m => MVar () -> m ()
signal MVar ()
inVarEnqueued
!b
b <- UnliftIO m -> m b -> IO b
forall (m :: * -> *). UnliftIO m -> forall a. m a -> IO a
unliftIO UnliftIO m
u (a -> m b
f a
a)
MVar b -> b -> IO ()
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m ()
putMVar MVar b
workerOutVar b
b
Int -> IO ()
loop ((Int
i_outVarSlot Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`rem` Int
workerOutputBufferSize)
Int -> IO ()
loop Int
0
Async () -> IO ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
workersAsync
Async () -> IO (Async ())
forall (m :: * -> *) a. Monad m => a -> m a
return Async ()
workersAsync
IO (Async ())
-> (Async () -> IO ())
-> (Async () -> ConduitT a b m ())
-> ConduitT a b m ()
forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
bracketP
IO (Async ())
spawnWorkers
(\Async ()
workersAsync -> Async () -> IO ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
uninterruptibleCancel Async ()
workersAsync)
((Async () -> ConduitT a b m ()) -> ConduitT a b m ())
-> (Async () -> ConduitT a b m ()) -> ConduitT a b m ()
forall a b. (a -> b) -> a -> b
$ \Async ()
workersAsync -> do
let mustBeNonempty :: Maybe a -> a
mustBeNonempty = a -> Maybe a -> a
forall a. a -> Maybe a -> a
fromMaybe ([Char] -> a
forall a. HasCallStack => [Char] -> a
error [Char]
"Data.Conduit.Concurrent.concurrentMapM_: outQueue cannot be empty")
let yieldQueueHead :: ConduitT i b m ()
yieldQueueHead = do
MVar b
workerVar <- Maybe (MVar b) -> MVar b
forall a. Maybe a -> a
mustBeNonempty (Maybe (MVar b) -> MVar b)
-> ConduitT i b m (Maybe (MVar b)) -> ConduitT i b m (MVar b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>
IORef (Seq (MVar b))
-> (Seq (MVar b) -> (Seq (MVar b), Maybe (MVar b)))
-> ConduitT i b m (Maybe (MVar b))
forall (m :: * -> *) a b.
MonadIO m =>
IORef a -> (a -> (a, b)) -> m b
atomicModifyIORef' IORef (Seq (MVar b))
outQueueRef Seq (MVar b) -> (Seq (MVar b), Maybe (MVar b))
forall a. Seq a -> (Seq a, Maybe a)
seqUncons
b
b <- MVar b -> ConduitT i b m b
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar b
workerVar
b -> ConduitT i b m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield b
b
let tryYieldQueueHead :: ConduitT i b m Bool
tryYieldQueueHead = do
Maybe (MVar b)
m'workerVar <- Seq (MVar b) -> Maybe (MVar b)
forall a. Seq a -> Maybe a
seqHeadMaybe (Seq (MVar b) -> Maybe (MVar b))
-> ConduitT i b m (Seq (MVar b)) -> ConduitT i b m (Maybe (MVar b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Seq (MVar b)) -> ConduitT i b m (Seq (MVar b))
forall (m :: * -> *) a. MonadIO m => IORef a -> m a
readIORef IORef (Seq (MVar b))
outQueueRef
case Maybe (MVar b)
m'workerVar of
Maybe (MVar b)
Nothing -> Bool -> ConduitT i b m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Just MVar b
workerVar -> do
Maybe b
m'b <- MVar b -> ConduitT i b m (Maybe b)
forall (m :: * -> *) a. MonadIO m => MVar a -> m (Maybe a)
tryTakeMVar MVar b
workerVar
case Maybe b
m'b of
Maybe b
Nothing -> Bool -> ConduitT i b m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Just b
b -> do
MVar b
_ <- Maybe (MVar b) -> MVar b
forall a. Maybe a -> a
mustBeNonempty (Maybe (MVar b) -> MVar b)
-> ConduitT i b m (Maybe (MVar b)) -> ConduitT i b m (MVar b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef (Seq (MVar b))
-> (Seq (MVar b) -> (Seq (MVar b), Maybe (MVar b)))
-> ConduitT i b m (Maybe (MVar b))
forall (m :: * -> *) a b.
MonadIO m =>
IORef a -> (a -> (a, b)) -> m b
atomicModifyIORef' IORef (Seq (MVar b))
outQueueRef Seq (MVar b) -> (Seq (MVar b), Maybe (MVar b))
forall a. Seq a -> (Seq a, Maybe a)
seqUncons
b -> ConduitT i b m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield b
b
Bool -> ConduitT i b m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
let loop :: Int -> Int -> ConduitT a b m ()
loop :: Int -> Int -> ConduitT a b m ()
loop Int
numWorkersRampedUp Int
numInQueue = do
ConduitT a b m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a b m (Maybe a)
-> (Maybe a -> ConduitT a b m ()) -> ConduitT a b m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe a
Nothing -> do
[Int] -> (Int -> ConduitT a b m ()) -> ConduitT a b m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [Int
1..Int
numInQueue] ((Int -> ConduitT a b m ()) -> ConduitT a b m ())
-> (Int -> ConduitT a b m ()) -> ConduitT a b m ()
forall a b. (a -> b) -> a -> b
$ \Int
_ -> do
ConduitT a b m ()
forall i. ConduitT i b m ()
yieldQueueHead
[Int] -> (Int -> ConduitT a b m ()) -> ConduitT a b m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [Int
1..Int
numThreads] ((Int -> ConduitT a b m ()) -> ConduitT a b m ())
-> (Int -> ConduitT a b m ()) -> ConduitT a b m ()
forall a b. (a -> b) -> a -> b
$ \Int
_ -> do
Maybe a -> ConduitT a b m ()
forall (m :: * -> *). MonadIO m => Maybe a -> m ()
putInVar Maybe a
forall a. Maybe a
Nothing
Async () -> ConduitT a b m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m a
wait Async ()
workersAsync
Just a
a
| Int
numWorkersRampedUp Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
numThreads -> do
Maybe a -> ConduitT a b m ()
forall (m :: * -> *). MonadIO m => Maybe a -> m ()
putInVar (a -> Maybe a
forall a. a -> Maybe a
Just a
a) ConduitT a b m () -> ConduitT a b m () -> ConduitT a b m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar () -> ConduitT a b m ()
forall a. MVar a -> ConduitT a b m a
waitForSignal MVar ()
inVarEnqueued
Int -> Int -> ConduitT a b m ()
loop (Int
numWorkersRampedUp Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Int
numInQueue Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
| Bool
otherwise -> do
Maybe a -> ConduitT a b m ()
forall (m :: * -> *). MonadIO m => Maybe a -> m ()
putInVar (a -> Maybe a
forall a. a -> Maybe a
Just a
a) ConduitT a b m () -> ConduitT a b m () -> ConduitT a b m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar () -> ConduitT a b m ()
forall a. MVar a -> ConduitT a b m a
waitForSignal MVar ()
inVarEnqueued
let numInQueueAfterEnqueued :: Int
numInQueueAfterEnqueued = Int
numInQueue Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
let popAsManyAsPossible :: Int -> ConduitT i b m Int
popAsManyAsPossible !Int
remainingInQueue
| Int
remainingInQueue Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
numWorkersRampedUp = [Char] -> ConduitT i b m Int
forall a. HasCallStack => [Char] -> a
error [Char]
"Data.Conduit.Concurrent.concurrentMapM_: remainingInQueue < numWorkersRampedUp"
| Int
remainingInQueue Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
numWorkersRampedUp = Int -> ConduitT i b m Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
remainingInQueue
| Bool
otherwise = do
Bool
popped <- ConduitT i b m Bool
forall i. ConduitT i b m Bool
tryYieldQueueHead
if Bool -> Bool
not Bool
popped
then Int -> ConduitT i b m Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
remainingInQueue
else Int -> ConduitT i b m Int
popAsManyAsPossible (Int
remainingInQueue Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
Int
remainingInQueue <- Int -> ConduitT a b m Int
forall i. Int -> ConduitT i b m Int
popAsManyAsPossible Int
numInQueueAfterEnqueued
Int -> Int -> ConduitT a b m ()
loop Int
numWorkersRampedUp Int
remainingInQueue
Int -> Int -> ConduitT a b m ()
loop Int
0 Int
0
concurrentMapM_numCaps :: (MonadUnliftIO m, MonadResource m) => Int -> (a -> m b) -> ConduitT a b m ()
concurrentMapM_numCaps :: Int -> (a -> m b) -> ConduitT a b m ()
concurrentMapM_numCaps Int
workerOutputBufferSize a -> m b
f = do
Int
numCaps <- IO Int -> ConduitT a b m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Int
getNumCapabilities
Int -> Int -> (a -> m b) -> ConduitT a b m ()
forall (m :: * -> *) a b.
(MonadUnliftIO m, MonadResource m) =>
Int -> Int -> (a -> m b) -> ConduitT a b m ()
concurrentMapM_ Int
numCaps Int
workerOutputBufferSize a -> m b
f