{-# LANGUAGE ScopedTypeVariables #-}
module Control.Concurrent.Async.Extra
    ( -- * concurrent mapping
      mapConcurrentlyBounded
    , mapConcurrentlyBounded_
    , mapConcurrentlyBatched
    , mapConcurrentlyBatched_
    , mapConcurrentlyChunks
    , mapConcurrentlyChunks_
      -- * merge strategies
    , mergeConcatAll
    )
where

import Control.Concurrent.Async
import Control.DeepSeq
import Control.Exception
import Control.Monad
import Data.List.Split (chunksOf)
import qualified Control.Concurrent.QSem as S
import qualified Data.Foldable as F

-- | Span a green thread for each task, but only execute N tasks
-- concurrently. Ignore the result
mapConcurrentlyBounded_ :: Traversable t => Int -> (a -> IO ()) -> t a -> IO ()
mapConcurrentlyBounded_ bound action =
    void . mapConcurrentlyBounded bound action

-- | Span a green thread for each task, but only execute N tasks
-- concurrently.
mapConcurrentlyBounded :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b)
mapConcurrentlyBounded bound action items =
    do qs <- S.newQSem bound
       let wrappedAction x =
               bracket_ (S.waitQSem qs) (S.signalQSem qs) (action x)
       mapConcurrently wrappedAction items

-- | Span green threads to perform N (batch size) tasks in one thread
-- and ignore results
mapConcurrentlyBatched_ ::
    (Foldable t) => Int -> (a -> IO ()) -> t a -> IO ()
mapConcurrentlyBatched_ batchSize =
    mapConcurrentlyBatched batchSize (const $ pure ())

-- | Span green threads to perform N (batch size) tasks in one thread
-- and merge results using provided merge function
mapConcurrentlyBatched ::
    (NFData b, Foldable t)
    => Int -> ([[b]] -> IO r) -> (a -> IO b) -> t a -> IO r
mapConcurrentlyBatched batchSize merge action items =
    do let chunks = chunksOf batchSize $ F.toList items
       r <- mapConcurrently (\x -> force <$> mapM action x) chunks
       merge r

-- | Split input into N chunks with equal length and work on
-- each chunk in a dedicated green thread. Ignore results
mapConcurrentlyChunks_ :: (Foldable t) => Int -> (a -> IO ()) -> t a -> IO ()
mapConcurrentlyChunks_ chunkCount =
    mapConcurrentlyChunks chunkCount (const $ pure ())

-- | Split input into N chunks with equal length and work on
-- each chunk in a dedicated green thread. Then merge results using provided merge function
mapConcurrentlyChunks ::
    (NFData b, Foldable t)
    => Int -> ([[b]] -> IO r) -> (a -> IO b) -> t a -> IO r
mapConcurrentlyChunks chunkCount merge action items =
    do let listSize = F.length items
           batchSize :: Double
           batchSize = fromIntegral listSize / fromIntegral chunkCount
       mapConcurrentlyBatched (ceiling batchSize) merge action items

-- | Merge all chunks by combining to one list. (Equiv to 'join')
mergeConcatAll :: [[a]] -> [a]
mergeConcatAll = join