{-# LANGUAGE ScopedTypeVariables, FlexibleContexts, CPP, TupleSections #-}
module Data.Conduit.Algorithms.Async
( conduitPossiblyCompressedFile
, conduitPossiblyCompressedToFile
, withPossiblyCompressedFile
, asyncMapC
, asyncMapEitherC
, asyncGzipTo
, asyncGzipToFile
, asyncGzipFrom
, asyncGzipFromFile
, asyncBzip2To
, asyncBzip2ToFile
, asyncBzip2From
, asyncBzip2FromFile
, asyncXzTo
, asyncXzToFile
, asyncXzFrom
, asyncXzFromFile
, unorderedAsyncMapC
) where
import qualified Data.ByteString as B
import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.STM.TBQueue as TQ
import Control.Concurrent.STM (atomically)
import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.Async as CA
import qualified Data.Conduit.TQueue as CA
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Zlib as CZ
import qualified Data.Conduit.Lzma as CX
import qualified Data.Streaming.Zlib as SZ
import qualified Data.Conduit.BZlib as CZ
import qualified Data.Conduit as C
import Data.Conduit ((.|))
import qualified Data.Sequence as Seq
import Data.Sequence ((|>), ViewL(..))
import Data.Foldable (toList)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Error.Class (MonadError(..))
import Control.Monad.IO.Unlift (MonadUnliftIO, withRunInIO)
import Control.Monad.Trans.Resource (MonadResource)
import Control.Monad.Catch (MonadThrow)
import Control.Exception (evaluate, displayException)
import Control.DeepSeq (NFData, force)
import System.IO.Error (mkIOError, userErrorType)
import System.IO
import qualified System.IO as IO
import Data.List (isSuffixOf)
import Data.Conduit.Algorithms.Utils (awaitJust)
asyncMapC :: forall a m b . (MonadIO m, NFData b) =>
Int
-> (a -> b)
-> C.ConduitT a b m ()
asyncMapC = asyncMapCHelper True
unorderedAsyncMapC :: forall a m b . (MonadIO m, NFData b) =>
Int
-> (a -> b)
-> C.ConduitT a b m ()
unorderedAsyncMapC = asyncMapCHelper False
asyncMapCHelper :: forall a m b . (MonadIO m, NFData b) =>
Bool
-> Int
-> (a -> b)
-> C.ConduitT a b m ()
asyncMapCHelper isSynchronous maxThreads f = initLoop (0 :: Int) (Seq.empty :: Seq.Seq (A.Async b))
where
initLoop :: Int -> Seq.Seq (A.Async b) -> C.ConduitT a b m ()
initLoop size q
| size == maxThreads = loop q
| otherwise = C.await >>= \case
Nothing -> yAll q
Just v -> do
v' <- sched v
initLoop (size + 1) (q |> v')
sched :: a -> C.ConduitM a b m (A.Async b)
sched v = liftIO . A.async . evaluate . force $ f v
yAll :: Seq.Seq (A.Async b) -> C.ConduitT a b m ()
yAll q
| Seq.null q = return ()
| otherwise = do
(r, q') <- liftIO $ retrieveResult q
C.yield r
yAll q'
loop :: Seq.Seq (A.Async b) -> C.ConduitT a b m ()
loop q = C.await >>= \case
Nothing -> yAll q
Just v -> do
v' <- sched v
(r, q') <- liftIO $ retrieveResult q
C.yield r
loop (q' |> v')
retrieveResult :: Seq.Seq (A.Async b) -> IO (b, Seq.Seq (A.Async b))
retrieveResult q
| isSynchronous = case Seq.viewl q of
(r :< rest) -> (, rest) <$> A.wait r
_ -> error "Impossible situation"
| otherwise = do
(k, r) <- liftIO (A.waitAny (toList q))
return (r, Seq.filter (/= k) q)
asyncMapEitherC :: forall a m b e . (MonadIO m, NFData b, NFData e, MonadError e m) => Int -> (a -> Either e b) -> C.ConduitT a b m ()
asyncMapEitherC maxThreads f = asyncMapC maxThreads f .| (C.awaitForever $ \case
Right v -> C.yield v
Left err -> throwError err)
bsConcatTo :: MonadIO m => Int
-> C.ConduitT B.ByteString [B.ByteString] m ()
bsConcatTo chunkSize = awaitJust start
where
start v
| B.length v >= chunkSize = C.yield [v] >> bsConcatTo chunkSize
| otherwise = continue [v] (B.length v)
continue chunks s = C.await >>= \case
Nothing -> C.yield chunks
Just v
| B.length v + s > chunkSize -> C.yield chunks >> start v
| otherwise -> continue (v:chunks) (s + B.length v)
untilNothing :: forall m i. (Monad m) => C.ConduitT (Maybe i) i m ()
untilNothing = awaitJust $ \case
Just val -> do
C.yield val
untilNothing
_ -> return ()
asyncGzipTo :: forall m. (MonadIO m, MonadUnliftIO m) => Handle -> C.ConduitT B.ByteString C.Void m ()
asyncGzipTo h = do
let drain q = liftIO . C.runConduit $
CA.sourceTBQueue q
.| untilNothing
.| CL.map (B.concat . reverse)
.| CZ.gzip
.| C.sinkHandle h
bsConcatTo ((2 :: Int) ^ (15 :: Int))
.| CA.drainTo 8 drain
asyncGzipToFile :: forall m. (MonadResource m, MonadUnliftIO m) => FilePath -> C.ConduitT B.ByteString C.Void m ()
asyncGzipToFile fname = C.bracketP
(openFile fname WriteMode)
hClose
asyncGzipTo
asyncGzipFrom :: forall m. (MonadIO m, MonadResource m, MonadUnliftIO m) => Handle -> C.ConduitT () B.ByteString m ()
asyncGzipFrom h = do
let prod q = liftIO $ do
C.runConduit $
C.sourceHandle h
.| CZ.multiple CZ.ungzip
.| CL.map Just
.| CA.sinkTBQueue q
atomically (TQ.writeTBQueue q Nothing)
(CA.gatherFrom 8 prod .| untilNothing)
`C.catchC`
(\(e :: SZ.ZlibException) -> liftIO . ioError $ mkIOError userErrorType ("Error reading gzip file: "++displayException e) (Just h) Nothing)
asyncGzipFromFile :: forall m. (MonadResource m, MonadUnliftIO m) => FilePath -> C.ConduitT () B.ByteString m ()
asyncGzipFromFile fname = C.bracketP
(openFile fname ReadMode)
hClose
asyncGzipFrom
asyncBzip2To :: forall m. (MonadIO m, MonadResource m, MonadUnliftIO m) => Handle -> C.ConduitT B.ByteString C.Void m ()
asyncBzip2To h = do
let drain q = C.runConduit $
CA.sourceTBQueue q
.| untilNothing
.| CL.map (B.concat . reverse)
.| CZ.bzip2
.| C.sinkHandle h
bsConcatTo ((2 :: Int) ^ (15 :: Int))
.| CA.drainTo 8 drain
asyncBzip2ToFile :: forall m. (MonadResource m, MonadUnliftIO m) => FilePath -> C.ConduitT B.ByteString C.Void m ()
asyncBzip2ToFile fname = C.bracketP
(openFile fname WriteMode)
hClose
asyncBzip2To
asyncBzip2From :: forall m. (MonadIO m, MonadResource m, MonadUnliftIO m) => Handle -> C.ConduitT () B.ByteString m ()
asyncBzip2From h = do
let prod q = do
C.runConduit $
C.sourceHandle h
.| CZ.multiple CZ.bunzip2
.| CL.map Just
.| CA.sinkTBQueue q
liftIO $ atomically (TQ.writeTBQueue q Nothing)
CA.gatherFrom 8 prod .| untilNothing
asyncBzip2FromFile :: forall m. (MonadResource m, MonadUnliftIO m) => FilePath -> C.ConduitT () B.ByteString m ()
asyncBzip2FromFile fname = C.bracketP
(openFile fname ReadMode)
hClose
asyncBzip2From
asyncXzTo :: forall m. (MonadIO m, MonadResource m, MonadUnliftIO m) => Handle -> C.ConduitT B.ByteString C.Void m ()
asyncXzTo h = do
let drain q = C.runConduit $
CA.sourceTBQueue q
.| untilNothing
.| CL.map (B.concat . reverse)
.| CX.compress Nothing
.| C.sinkHandle h
bsConcatTo ((2 :: Int) ^ (15 :: Int))
.| CA.drainTo 8 drain
asyncXzToFile :: forall m. (MonadResource m, MonadUnliftIO m) => FilePath -> C.ConduitT B.ByteString C.Void m ()
asyncXzToFile fname = C.bracketP
(openFile fname WriteMode)
hClose
asyncXzTo
asyncXzFrom :: forall m. (MonadIO m, MonadResource m, MonadUnliftIO m, MonadThrow m) => Handle -> C.ConduitT () B.ByteString m ()
asyncXzFrom h = do
let oneGBmembuffer = Just $ 1024 ^ (3 :: Integer)
prod q = do
C.runConduit $
C.sourceHandle h
.| CZ.multiple (CX.decompress oneGBmembuffer)
.| CL.map Just
.| CA.sinkTBQueue q
liftIO $ atomically (TQ.writeTBQueue q Nothing)
CA.gatherFrom 8 prod .| untilNothing
asyncXzFromFile :: forall m. (MonadResource m, MonadUnliftIO m, MonadThrow m) => FilePath -> C.ConduitT () B.ByteString m ()
asyncXzFromFile fname = C.bracketP
(openFile fname ReadMode)
hClose
asyncXzFrom
withPossiblyCompressedFile :: (MonadUnliftIO m, MonadResource m, MonadThrow m) => FilePath -> (C.ConduitT () B.ByteString m () -> m a) -> m a
withPossiblyCompressedFile fname inner = withRunInIO $ \run -> do
IO.withBinaryFile fname IO.ReadMode $
run . inner . withPossiblyCompressedFile' fname
withPossiblyCompressedFile' :: (MonadUnliftIO m, MonadResource m, MonadThrow m) => FilePath -> Handle -> C.ConduitT () B.ByteString m ()
withPossiblyCompressedFile' fname
| ".gz" `isSuffixOf` fname = asyncGzipFrom
| ".xz" `isSuffixOf` fname = asyncXzFrom
| ".bz2" `isSuffixOf` fname = asyncBzip2From
| otherwise = C.sourceHandle
conduitPossiblyCompressedFile :: (MonadUnliftIO m, MonadResource m, MonadThrow m) => FilePath -> C.ConduitT () B.ByteString m ()
conduitPossiblyCompressedFile fname
| ".gz" `isSuffixOf` fname = asyncGzipFromFile fname
| ".xz" `isSuffixOf` fname = asyncXzFromFile fname
| ".bz2" `isSuffixOf` fname = asyncBzip2FromFile fname
| otherwise = C.sourceFile fname
conduitPossiblyCompressedToFile :: (MonadUnliftIO m, MonadResource m) => FilePath -> C.ConduitT B.ByteString C.Void m ()
conduitPossiblyCompressedToFile fname
| ".gz" `isSuffixOf` fname = asyncGzipToFile fname
| ".xz" `isSuffixOf` fname = asyncXzToFile fname
| ".bz2" `isSuffixOf` fname = asyncBzip2ToFile fname
| otherwise = C.sinkFile fname