{-|
Module      : Data.Conduit.Algorithms.Async.ByteString
Copyright   : 2018 Luis Pedro Coelho
License     : MIT
Maintainer  : luis@luispedro.org

Higher level async processing interfaces for handling 'ByteString' objects.
-}
{-# LANGUAGE ScopedTypeVariables, FlexibleContexts, TupleSections #-}

module Data.Conduit.Algorithms.Async.ByteString
    ( asyncMapLineGroupsC
    , asyncFilterLinesC
    ) where


import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import qualified Data.Conduit.Algorithms.Async as CAlg
import qualified Data.Conduit.List as CL
import qualified Data.Conduit as C
import           Data.Conduit ((.|))

import           Control.Monad (unless)
import           Control.Monad.IO.Class (MonadIO)
import           Control.DeepSeq


-- | Apply a function to groups of lines
--
-- Note that this is much more efficient than the (more or less equivalent,
-- except that the intermediate lists can be of varying sizes):
--
-- @
--      CB.lines .| CC.conduitVector N .| CAlg.asyncMapC nthreads (f . V.toList)
-- @
--
-- The reason being that splitting into lines then becomes the bottleneck and
-- processing a single line is typically a tiny chunk of work so that the
-- threading overhead overwhelms the advantage of using multiple cores.
-- Instead, 'asyncMapLineGroupsC' will pass big chunks to the worker thread and
-- perform most of the line splitting _in the worker thread_.
--
-- Only Unix-style ASCII lines are supported (splitting at Bytes with value
-- 10, i.e., \\n). When Windows lines (\\r\\n) are passed to this function, this
-- results in each element having an extra \\r at the end.
asyncMapLineGroupsC :: (MonadIO m, NFData a) => Int -> ([B.ByteString] -> a) -> C.ConduitT B.ByteString a m ()
asyncMapLineGroupsC :: forall (m :: * -> *) a.
(MonadIO m, NFData a) =>
Int -> ([ByteString] -> a) -> ConduitT ByteString a m ()
asyncMapLineGroupsC Int
nthreads [ByteString] -> a
f = ConduitT ByteString ByteString m ()
forall (m :: * -> *).
Monad m =>
ConduitT ByteString ByteString m ()
breakAtLineBoundary ConduitT ByteString ByteString m ()
-> ConduitT ByteString a m () -> ConduitT ByteString a m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| Int -> (ByteString -> a) -> ConduitT ByteString a m ()
forall a (m :: * -> *) b.
(MonadIO m, NFData b) =>
Int -> (a -> b) -> ConduitT a b m ()
CAlg.asyncMapC Int
nthreads ([ByteString] -> a
f ([ByteString] -> a)
-> (ByteString -> [ByteString]) -> ByteString -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> [ByteString]
asLines)
    where
        asLines :: BL.ByteString -> [B.ByteString]
        asLines :: ByteString -> [ByteString]
asLines = (ByteString -> ByteString) -> [ByteString] -> [ByteString]
forall a b. (a -> b) -> [a] -> [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ByteString -> ByteString
BL.toStrict ([ByteString] -> [ByteString])
-> (ByteString -> [ByteString]) -> ByteString -> [ByteString]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Word8 -> ByteString -> [ByteString]
BL.split Word8
10

        -- The purpose is to break input blocks at a line boundary
        breakAtLineBoundary :: Monad m => C.ConduitT B.ByteString BL.ByteString m ()
        breakAtLineBoundary :: forall (m :: * -> *).
Monad m =>
ConduitT ByteString ByteString m ()
breakAtLineBoundary = ByteString -> ConduitT ByteString ByteString m ()
forall {m :: * -> *}.
Monad m =>
ByteString -> ConduitT ByteString ByteString m ()
continue ByteString
BL.empty
        continue :: ByteString -> ConduitT ByteString ByteString m ()
continue ByteString
prev = ConduitT ByteString ByteString m (Maybe ByteString)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await ConduitT ByteString ByteString m (Maybe ByteString)
-> (Maybe ByteString -> ConduitT ByteString ByteString m ())
-> ConduitT ByteString ByteString m ()
forall a b.
ConduitT ByteString ByteString m a
-> (a -> ConduitT ByteString ByteString m b)
-> ConduitT ByteString ByteString m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                    Maybe ByteString
Nothing -> Bool
-> ConduitT ByteString ByteString m ()
-> ConduitT ByteString ByteString m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString -> Bool
BL.null ByteString
prev) (ConduitT ByteString ByteString m ()
 -> ConduitT ByteString ByteString m ())
-> ConduitT ByteString ByteString m ()
-> ConduitT ByteString ByteString m ()
forall a b. (a -> b) -> a -> b
$
                                ByteString -> ConduitT ByteString ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield ByteString
prev
                    Just ByteString
n -> case Word8 -> ByteString -> Maybe Int
B.elemIndexEnd Word8
10 ByteString
n of
                        Maybe Int
Nothing -> ByteString -> ConduitT ByteString ByteString m ()
continue (ByteString -> ByteString -> ByteString
BL.append ByteString
prev (ByteString -> ByteString
BL.fromStrict ByteString
n))
                        Just Int
p -> do
                            let (ByteString
first, ByteString
rest) = Int -> ByteString -> (ByteString, ByteString)
B.splitAt Int
p ByteString
n
                            ByteString -> ConduitT ByteString ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield (ByteString -> ByteString -> ByteString
BL.append ByteString
prev (ByteString -> ByteString
BL.fromStrict ByteString
first))
                            ByteString -> ConduitT ByteString ByteString m ()
continue (ByteString -> ByteString
BL.fromStrict (ByteString -> ByteString) -> ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> ByteString
B.drop Int
1 ByteString
rest) -- skip \n char

-- | Filter lines using multiple threads
--
-- It is not clear from the types but the input is taken to unbroken lines,
-- while the output will be yielded line by line. This conduit is equivalent to
--
-- @
--      CB.lines .| CL.filer f
-- @
--
asyncFilterLinesC :: MonadIO m => Int -> (B.ByteString -> Bool) -> C.ConduitT B.ByteString B.ByteString m ()
asyncFilterLinesC :: forall (m :: * -> *).
MonadIO m =>
Int -> (ByteString -> Bool) -> ConduitT ByteString ByteString m ()
asyncFilterLinesC Int
n ByteString -> Bool
f = Int
-> ([ByteString] -> [ByteString])
-> ConduitT ByteString [ByteString] m ()
forall (m :: * -> *) a.
(MonadIO m, NFData a) =>
Int -> ([ByteString] -> a) -> ConduitT ByteString a m ()
asyncMapLineGroupsC Int
n ((ByteString -> Bool) -> [ByteString] -> [ByteString]
forall a. (a -> Bool) -> [a] -> [a]
filter ByteString -> Bool
f) ConduitT ByteString [ByteString] m ()
-> ConduitT [ByteString] ByteString m ()
-> ConduitT ByteString ByteString m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT [ByteString] ByteString m ()
forall (m :: * -> *) (f :: * -> *) a.
(Monad m, Foldable f) =>
ConduitT (f a) a m ()
CL.concat
{-# INLINE asyncFilterLinesC #-}