Module      : Data.Conduit.Algorithms.Async
Copyright   : 2013-2017 Luis Pedro Coelho
License     : MIT
Maintainer  : luis@luispedro.org

Higher level async processing interfaces.
{-# LANGUAGE ScopedTypeVariables, FlexibleContexts, CPP #-}

module Data.Conduit.Algorithms.Async
    ( conduitPossiblyCompressedFile
    , asyncMapC
    , asyncMapEitherC
    , asyncGzipTo
    , asyncGzipToFile
    , asyncGzipFrom
    , asyncGzipFromFile
    ) where

import qualified Data.ByteString as B
import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.STM.TBQueue as TQ
import           Control.Concurrent.STM (atomically)

import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.Async as CA
import qualified Data.Conduit.TQueue as CA
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Zlib as CZ
#ifndef WINDOWS
-- bzlib cannot compile on Windows (as of 2016/07/05)
import qualified Data.Conduit.BZlib as CZ
import qualified Data.Conduit as C
import           Data.Conduit ((.|))

import qualified Data.Sequence as Seq
import           Data.Sequence ((|>), ViewL(..))
import           Control.Monad (forM_)
import           Control.Monad.IO.Class (MonadIO, liftIO)
import           Control.Monad.Error.Class (MonadError(..))
import           Control.Monad.Trans.Resource (MonadResource, MonadBaseControl)
import           Control.Exception (evaluate)
import           Control.DeepSeq
import           System.IO
import           Data.List (isSuffixOf)
import           Data.Conduit.Algorithms.Utils (awaitJust)

-- | This is like Data.Conduit.List.map, except that each element is processed
-- in a separate thread (up to maxSize can be queued up at any one time).
-- Results are evaluated to normal form (not WHNF!) to ensure that the
-- computation is fully evaluated before being yielded to the next conduit.
asyncMapC :: forall a m b . (MonadIO m, NFData b) => Int -> (a -> b) -> C.Conduit a m b
asyncMapC maxSize f = initLoop (0 :: Int) (Seq.empty :: Seq.Seq (A.Async b))
        initLoop :: Int -> Seq.Seq (A.Async b) -> C.Conduit a m b
        initLoop size q
            | size == maxSize = loop q
            | otherwise = C.await >>= \case
                Nothing -> yAll q
                Just v -> do
                        v' <- sched v
                        initLoop (size + 1) (q |> v')
        sched :: a -> C.ConduitM a b m (A.Async b)
        sched v = liftIO . A.async . evaluate . force $ f v

        -- | yield all
        yAll :: Seq.Seq (A.Async b) -> C.Conduit a m b
        yAll q = case Seq.viewl q of
            EmptyL -> return ()
            v :< rest -> (liftIO (A.wait v) >>= yieldOrCleanup rest) >> yAll rest

        loop :: Seq.Seq (A.Async b) -> C.Conduit a m b
        loop q = C.await >>= \case
                Nothing -> yAll q
                Just v -> do
                    v' <- sched v
                    case Seq.viewl q of
                        (r :< rest) -> do
                            yieldOrCleanup rest =<< liftIO (A.wait r)
                            loop (rest |> v')
                        _ -> error "should never happen"
        cleanup :: Seq.Seq (A.Async b) -> m ()
        cleanup q = liftIO $ forM_ q A.cancel
        yieldOrCleanup q = flip C.yieldOr (cleanup q)

-- | asyncMapC with error handling. The inner function can now return an error
-- (as a 'Left'). When the first error is seen, it 'throwError's in the main
-- monad. Note that 'f' may be evaluated for arguments beyond the first error
-- (as some threads may be running in the background and already processing
-- elements after the first error).
asyncMapEitherC :: forall a m b e . (MonadIO m, NFData b, NFData e, MonadError e m) => Int -> (a -> Either e b) -> C.Conduit a m b
asyncMapEitherC maxSize f = asyncMapC maxSize f .| (C.awaitForever $ \case
                                Right v -> C.yield v
                                Left err -> throwError err)

-- | concatenates input into larger chunks and yields it. Its indended use is
-- to build up larger blocks from smaller ones so that they can be sent across
-- thread barriers with little overhead.
-- the chunkSize parameter is a hint, not an exact element. In particular,
-- larger chunks are not split up and smaller chunks can be yielded too.
bsConcatTo :: MonadIO m => Int -- ^ chunk hint
                            -> C.Conduit B.ByteString m [B.ByteString]
bsConcatTo chunkSize = awaitJust start
        start v
            | B.length v >= chunkSize = C.yield [v] >> bsConcatTo chunkSize
            | otherwise = continue [v] (B.length v)
        continue chunks s = C.await >>= \case
            Nothing -> C.yield chunks
            Just v
                | B.length v + s > chunkSize -> C.yield chunks >> start v
                | otherwise -> continue (v:chunks) (s + B.length v)

untilNothing :: forall m i. (Monad m) => C.Conduit (Maybe i) m i
untilNothing = C.await >>= \case
    Just (Just val) -> do
        C.yield val
    _ -> return ()

-- | A simple sink which performs gzip in a separate thread and writes the results to `h`.
-- See also 'asyncGzipToFile'
asyncGzipTo :: forall m. (MonadIO m, MonadBaseControl IO m) => Handle -> C.Sink B.ByteString m ()
asyncGzipTo h = do
    let drain q = liftIO . C.runConduit $
                CA.sourceTBQueue q
                    .| untilNothing
                    .| CL.map (B.concat . reverse)
                    .| CZ.gzip
                    .| C.sinkHandle h
    bsConcatTo ((2 :: Int) ^ (15 :: Int))
        .| CA.drainTo 8 drain

-- | Compresses the output and writes to the given file with compression being
-- performed in a separate thread.
-- See also 'asyncGzipTo'
asyncGzipToFile :: forall m. (MonadResource m, MonadBaseControl IO m) => FilePath -> C.Sink B.ByteString m ()
asyncGzipToFile fname = C.bracketP
    (openFile fname WriteMode)

-- | A source which produces the ungzipped content from the the given handle.
-- Note that this "reads ahead" so if you do not use all the input, the Handle
-- will probably be left at an undefined position in the file.
-- See also 'asyncGzipFromFile'
asyncGzipFrom :: forall m. (MonadIO m, MonadResource m, MonadBaseControl IO m) => Handle -> C.Source m B.ByteString
asyncGzipFrom h = do
    let prod q = liftIO $ do
                    C.runConduit $
                        C.sourceHandle h
                            .| CZ.multiple CZ.ungzip
                            .| CL.map Just
                            .| CA.sinkTBQueue q
                    atomically (TQ.writeTBQueue q Nothing)
    CA.gatherFrom 8 prod
        .| untilNothing

-- | Open and read a gzip file with the uncompression being performed in a
-- separate thread.
-- See also 'asyncGzipFrom'
asyncGzipFromFile :: forall m. (MonadResource m, MonadBaseControl IO m) => FilePath -> C.Source m B.ByteString
asyncGzipFromFile fname = C.bracketP
    (openFile fname ReadMode)

-- | If the filename indicates a gzipped file (or, on Unix, also a bz2 file),
-- then it reads it and uncompresses it.
-- For the case of gzip, 'asyncGzipFromFile' is used.
conduitPossiblyCompressedFile :: (MonadBaseControl IO m, MonadResource m) => FilePath -> C.Source m B.ByteString
conduitPossiblyCompressedFile fname
    | ".gz" `isSuffixOf` fname = asyncGzipFromFile fname
#ifndef WINDOWS
    | ".bz2" `isSuffixOf` fname = C.sourceFile fname .| CZ.bunzip2
    | ".bz2" `isSuffixOf` fname = error "bzip2 decompression is not available on Windows"
    | otherwise = C.sourceFile fname