module Control.Concurrent.Async.Extra
(
mapConcurrentlyBounded
, mapConcurrentlyBounded_
, mapConcurrentlyBatched
, mapConcurrentlyBatched_
, mapConcurrentlyChunks
, mapConcurrentlyChunks_
, mergeConcatAll
)
where
import Control.Concurrent.Async
import Control.DeepSeq
import Control.Exception
import Control.Monad
import Data.List.Split (chunksOf)
import qualified Control.Concurrent.QSem as S
import qualified Data.Foldable as F
mapConcurrentlyBounded_ :: Traversable t => Int -> (a -> IO ()) -> t a -> IO ()
mapConcurrentlyBounded_ bound action =
void . mapConcurrentlyBounded bound action
mapConcurrentlyBounded :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b)
mapConcurrentlyBounded bound action items =
do qs <- S.newQSem bound
let wrappedAction x =
bracket_ (S.waitQSem qs) (S.signalQSem qs) (action x)
mapConcurrently wrappedAction items
mapConcurrentlyBatched_ ::
(Foldable t) => Int -> (a -> IO ()) -> t a -> IO ()
mapConcurrentlyBatched_ batchSize =
mapConcurrentlyBatched batchSize (const $ pure ())
mapConcurrentlyBatched ::
(NFData b, Foldable t)
=> Int -> ([[b]] -> IO r) -> (a -> IO b) -> t a -> IO r
mapConcurrentlyBatched batchSize merge action items =
do let chunks = chunksOf batchSize $ F.toList items
r <- mapConcurrently (\x -> force <$> mapM action x) chunks
merge r
mapConcurrentlyChunks_ :: (Foldable t) => Int -> (a -> IO ()) -> t a -> IO ()
mapConcurrentlyChunks_ chunkCount =
mapConcurrentlyChunks chunkCount (const $ pure ())
mapConcurrentlyChunks ::
(NFData b, Foldable t)
=> Int -> ([[b]] -> IO r) -> (a -> IO b) -> t a -> IO r
mapConcurrentlyChunks chunkCount merge action items =
do let listSize = F.length items
batchSize :: Double
batchSize = fromIntegral listSize / fromIntegral chunkCount
mapConcurrentlyBatched (ceiling batchSize) merge action items
mergeConcatAll :: [[a]] -> [a]
mergeConcatAll = join