{-# LANGUAGE ScopedTypeVariables, FlexibleContexts, TupleSections #-}
module Data.Conduit.Algorithms.Async.ByteString
( asyncMapLineGroupsC
, asyncFilterLinesC
) where
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import qualified Data.Conduit.Algorithms.Async as CAlg
import qualified Data.Conduit.List as CL
import qualified Data.Conduit as C
import Data.Conduit ((.|))
import Control.Monad (unless)
import Control.Monad.IO.Class (MonadIO)
import Control.DeepSeq
asyncMapLineGroupsC :: (MonadIO m, NFData a) => Int -> ([B.ByteString] -> a) -> C.ConduitT B.ByteString a m ()
asyncMapLineGroupsC nthreads f = breakAtLineBoundary .| CAlg.asyncMapC nthreads (f . asLines)
where
asLines :: BL.ByteString -> [B.ByteString]
asLines = fmap BL.toStrict . BL.split 10
breakAtLineBoundary :: Monad m => C.ConduitT B.ByteString BL.ByteString m ()
breakAtLineBoundary = continue BL.empty
continue prev = C.await >>= \case
Nothing -> unless (BL.null prev) $
C.yield prev
Just n -> case B.elemIndexEnd 10 n of
Nothing -> continue (BL.append prev (BL.fromStrict n))
Just p -> do
let (first, rest) = B.splitAt p n
C.yield (BL.append prev (BL.fromStrict first))
continue (BL.fromStrict $ B.drop 1 rest)
asyncFilterLinesC :: MonadIO m => Int -> (B.ByteString -> Bool) -> C.ConduitT B.ByteString B.ByteString m ()
asyncFilterLinesC n f = asyncMapLineGroupsC n (filter f) .| CL.concat
{-# INLINE asyncFilterLinesC #-}