{-|
Module      : Data.Conduit.Algorithms.Async
Copyright   : 2013-2022 Luis Pedro Coelho
License     : MIT
Maintainer  : luis@luispedro.org

Higher level async processing interfaces.
-}
{-# LANGUAGE ScopedTypeVariables, FlexibleContexts, CPP, TupleSections, LambdaCase , BangPatterns #-}

module Data.Conduit.Algorithms.Async
    ( conduitPossiblyCompressedFile
    , conduitPossiblyCompressedToFile
    , withPossiblyCompressedFile
    , withPossiblyCompressedFileOutput
    , asyncMapC
    , asyncMapEitherC
    , asyncGzipTo
    , asyncGzipTo'
    , asyncGzipToFile
    , asyncGzipFrom
    , asyncGzipFromFile
    , asyncBzip2To
    , asyncBzip2ToFile
    , asyncBzip2From
    , asyncBzip2FromFile
    , asyncXzTo
    , asyncXzTo'
    , asyncXzToFile
    , asyncXzFrom
    , asyncXzFromFile
    , asyncZstdTo
    , asyncZstdToFile
    , asyncZstdFrom
    , asyncZstdFromFile
    , unorderedAsyncMapC
    ) where


import qualified Data.ByteString as B
import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.STM.TBQueue as TQ
import           Control.Concurrent.STM (atomically)

import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.Async as CA
import qualified Data.Conduit.TQueue as CA
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Zlib as CZ
import qualified Data.Conduit.Lzma2 as CX
import qualified Data.Streaming.Zlib as SZ
import qualified Data.Conduit.BZlib as CBZ
import qualified Data.Conduit.Zstd as CZstd
import qualified Control.Monad.Trans.Resource as R
import qualified Data.Conduit as C
import           Data.Conduit ((.|))

import qualified Data.Sequence as Seq
import           Data.Sequence ((|>), ViewL(..))
import           Data.Foldable (toList)
import           Control.Monad.IO.Class (MonadIO, liftIO)
import           Control.Monad.Error.Class (MonadError(..))
import           Control.Monad.IO.Unlift (MonadUnliftIO, withRunInIO)
import           Control.Monad.Trans.Resource (MonadResource)
import           Control.Monad.Catch (MonadThrow)
import           Control.Exception (evaluate, displayException)
import           Control.DeepSeq (NFData, force)
import           System.IO.Error (mkIOError, userErrorType)
import           System.IO
import qualified System.IO as IO
import           Data.List (isSuffixOf)
import           Data.Conduit.Algorithms.Utils (awaitJust)



-- | This is like 'Data.Conduit.List.map', except that each element is processed
-- in a separate thread (up to 'maxThreads' can be queued up at any one time).
-- Results are evaluated to normal form (not weak-head normal form!, i.e., the
-- structure is deeply evaluated) to ensure that the computation is fully
-- evaluated in the worker thread.
--
-- Note that there is some overhead in threading. It is often a good idea to
-- build larger chunks of input before passing it to 'asyncMapC' to amortize
-- the costs. That is, when @f@ is not a lot of work, instead of @asyncMapC f@,
-- it is sometimes better to do
--
-- @
--    CC.conduitVector 4096 .| asyncMapC (V.map f) .| CC.concat
-- @
--
-- where @CC@ refers to 'Data.Conduit.Combinators'
--
-- See 'unorderedAsyncMapC'
asyncMapC :: forall a m b . (MonadIO m, NFData b) =>
                    Int -- ^ Maximum number of worker threads
                    -> (a -> b) -- ^ Function to execute
                    -> C.ConduitT a b m ()
asyncMapC :: forall a (m :: * -> *) b.
(MonadIO m, NFData b) =>
Int -> (a -> b) -> ConduitT a b m ()
asyncMapC = Bool -> Int -> (a -> b) -> ConduitT a b m ()
forall a (m :: * -> *) b.
(MonadIO m, NFData b) =>
Bool -> Int -> (a -> b) -> ConduitT a b m ()
asyncMapCHelper Bool
True

-- | A version of 'asyncMapC' which can reorder results in the stream
--
-- If the order of the results is not important, this function can lead to a
-- better use of resources if some of the chunks take longer to complete.
--
-- See 'asyncMapC'
unorderedAsyncMapC :: forall a m b . (MonadIO m, NFData b) =>
                    Int -- ^ Maximum number of worker threads
                    -> (a -> b) -- ^ Function to execute
                    -> C.ConduitT a b m ()
unorderedAsyncMapC :: forall a (m :: * -> *) b.
(MonadIO m, NFData b) =>
Int -> (a -> b) -> ConduitT a b m ()
unorderedAsyncMapC = Bool -> Int -> (a -> b) -> ConduitT a b m ()
forall a (m :: * -> *) b.
(MonadIO m, NFData b) =>
Bool -> Int -> (a -> b) -> ConduitT a b m ()
asyncMapCHelper Bool
False

asyncMapCHelper  :: forall a m b . (MonadIO m, NFData b) =>
                    Bool
                    -> Int -- ^ Maximum number of worker threads
                    -> (a -> b) -- ^ Function to execute
                    -> C.ConduitT a b m ()
asyncMapCHelper :: forall a (m :: * -> *) b.
(MonadIO m, NFData b) =>
Bool -> Int -> (a -> b) -> ConduitT a b m ()
asyncMapCHelper Bool
isSynchronous Int
maxThreads a -> b
f = Int -> Seq (Async b) -> ConduitT a b m ()
initLoop (Int
0 :: Int) (Seq (Async b)
forall a. Seq a
Seq.empty :: Seq.Seq (A.Async b))
    where
        initLoop :: Int -> Seq.Seq (A.Async b) -> C.ConduitT a b m ()
        initLoop :: Int -> Seq (Async b) -> ConduitT a b m ()
initLoop Int
size Seq (Async b)
q
            | Int
size Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
maxThreads = Seq (Async b) -> ConduitT a b m ()
loop Seq (Async b)
q
            | Bool
otherwise = ConduitT a b m (Maybe a)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await ConduitT a b m (Maybe a)
-> (Maybe a -> ConduitT a b m ()) -> ConduitT a b m ()
forall a b.
ConduitT a b m a -> (a -> ConduitT a b m b) -> ConduitT a b m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                Maybe a
Nothing -> Seq (Async b) -> ConduitT a b m ()
yAll Seq (Async b)
q
                Just a
v -> do
                        Async b
v' <- a -> ConduitM a b m (Async b)
sched a
v
                        Int -> Seq (Async b) -> ConduitT a b m ()
initLoop (Int
size Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) (Seq (Async b)
q Seq (Async b) -> Async b -> Seq (Async b)
forall a. Seq a -> a -> Seq a
|> Async b
v')
        sched :: a -> C.ConduitM a b m (A.Async b)
        sched :: a -> ConduitM a b m (Async b)
sched a
v = IO (Async b) -> ConduitM a b m (Async b)
forall a. IO a -> ConduitT a b m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async b) -> ConduitM a b m (Async b))
-> (b -> IO (Async b)) -> b -> ConduitM a b m (Async b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO b -> IO (Async b)
forall a. IO a -> IO (Async a)
A.async (IO b -> IO (Async b)) -> (b -> IO b) -> b -> IO (Async b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> IO b
forall a. a -> IO a
evaluate (b -> IO b) -> (b -> b) -> b -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> b
forall a. NFData a => a -> a
force (b -> ConduitM a b m (Async b)) -> b -> ConduitM a b m (Async b)
forall a b. (a -> b) -> a -> b
$ a -> b
f a
v

        -- | yield all
        yAll :: Seq.Seq (A.Async b) -> C.ConduitT a b m ()
        yAll :: Seq (Async b) -> ConduitT a b m ()
yAll Seq (Async b)
Seq.Empty = () -> ConduitT a b m ()
forall a. a -> ConduitT a b m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        yAll Seq (Async b)
q = do
            (b
r, Seq (Async b)
q') <- IO (b, Seq (Async b)) -> ConduitT a b m (b, Seq (Async b))
forall a. IO a -> ConduitT a b m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (b, Seq (Async b)) -> ConduitT a b m (b, Seq (Async b)))
-> IO (b, Seq (Async b)) -> ConduitT a b m (b, Seq (Async b))
forall a b. (a -> b) -> a -> b
$ Seq (Async b) -> IO (b, Seq (Async b))
retrieveResult Seq (Async b)
q
            b -> ConduitT a b m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield b
r
            Seq (Async b) -> ConduitT a b m ()
yAll Seq (Async b)
q'

        loop :: Seq.Seq (A.Async b) -> C.ConduitT a b m ()
        loop :: Seq (Async b) -> ConduitT a b m ()
loop Seq (Async b)
q = ConduitT a b m (Maybe a)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await ConduitT a b m (Maybe a)
-> (Maybe a -> ConduitT a b m ()) -> ConduitT a b m ()
forall a b.
ConduitT a b m a -> (a -> ConduitT a b m b) -> ConduitT a b m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                Maybe a
Nothing -> Seq (Async b) -> ConduitT a b m ()
yAll Seq (Async b)
q
                Just a
v -> do
                    Async b
v' <- a -> ConduitM a b m (Async b)
sched a
v
                    (b
r, Seq (Async b)
q') <- IO (b, Seq (Async b)) -> ConduitT a b m (b, Seq (Async b))
forall a. IO a -> ConduitT a b m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (b, Seq (Async b)) -> ConduitT a b m (b, Seq (Async b)))
-> IO (b, Seq (Async b)) -> ConduitT a b m (b, Seq (Async b))
forall a b. (a -> b) -> a -> b
$ Seq (Async b) -> IO (b, Seq (Async b))
retrieveResult Seq (Async b)
q
                    b -> ConduitT a b m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield b
r
                    Seq (Async b) -> ConduitT a b m ()
loop (Seq (Async b)
q' Seq (Async b) -> Async b -> Seq (Async b)
forall a. Seq a -> a -> Seq a
|> Async b
v')

        retrieveResult :: Seq.Seq (A.Async b) -> IO (b, Seq.Seq (A.Async b))
        retrieveResult :: Seq (Async b) -> IO (b, Seq (Async b))
retrieveResult Seq (Async b)
q
            | Bool
isSynchronous = case Seq (Async b) -> ViewL (Async b)
forall a. Seq a -> ViewL a
Seq.viewl Seq (Async b)
q of
                        (Async b
r :< Seq (Async b)
rest) -> (, Seq (Async b)
rest) (b -> (b, Seq (Async b))) -> IO b -> IO (b, Seq (Async b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async b -> IO b
forall a. Async a -> IO a
A.wait Async b
r
                        ViewL (Async b)
_ -> [Char] -> IO (b, Seq (Async b))
forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible situation"
            | Bool
otherwise = do
                (Async b
k, b
r) <- IO (Async b, b) -> IO (Async b, b)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ([Async b] -> IO (Async b, b)
forall a. [Async a] -> IO (Async a, a)
A.waitAny (Seq (Async b) -> [Async b]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList Seq (Async b)
q))
                (b, Seq (Async b)) -> IO (b, Seq (Async b))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (b
r, (Async b -> Bool) -> Seq (Async b) -> Seq (Async b)
forall a. (a -> Bool) -> Seq a -> Seq a
Seq.filter (Async b -> Async b -> Bool
forall a. Eq a => a -> a -> Bool
/= Async b
k) Seq (Async b)
q)


-- | 'asyncMapC' with error handling. The inner function can now return an
-- error (as a 'Left'). When the first error is seen, it 'throwError's in the
-- main monad. Note that 'f' may be evaluated for arguments beyond the first
-- error (as some threads may be running in the background and already
-- processing elements after the first error).
--
-- See 'asyncMapC'
asyncMapEitherC :: forall a m b e . (MonadIO m, NFData b, NFData e, MonadError e m) => Int -> (a -> Either e b) -> C.ConduitT a b m ()
asyncMapEitherC :: forall a (m :: * -> *) b e.
(MonadIO m, NFData b, NFData e, MonadError e m) =>
Int -> (a -> Either e b) -> ConduitT a b m ()
asyncMapEitherC Int
maxThreads a -> Either e b
f = Int -> (a -> Either e b) -> ConduitT a (Either e b) m ()
forall a (m :: * -> *) b.
(MonadIO m, NFData b) =>
Int -> (a -> b) -> ConduitT a b m ()
asyncMapC Int
maxThreads a -> Either e b
f ConduitT a (Either e b) m ()
-> ConduitT (Either e b) b m () -> ConduitT a b m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ((Either e b -> ConduitT (Either e b) b m ())
-> ConduitT (Either e b) b m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
C.awaitForever ((Either e b -> ConduitT (Either e b) b m ())
 -> ConduitT (Either e b) b m ())
-> (Either e b -> ConduitT (Either e b) b m ())
-> ConduitT (Either e b) b m ()
forall a b. (a -> b) -> a -> b
$ \case
                                Right b
v -> b -> ConduitT (Either e b) b m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield b
v
                                Left e
err -> e -> ConduitT (Either e b) b m ()
forall a. e -> ConduitT (Either e b) b m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError e
err)


-- | concatenates input into larger chunks and yields it. Its indended use is
-- to build up larger blocks from smaller ones so that they can be sent across
-- thread barriers with little overhead.
--
-- the chunkSize parameter is a hint, not an exact element. In particular,
-- larger chunks are not split up and smaller chunks can be yielded too.
bsConcatTo :: MonadIO m => Int -- ^ chunk hint
                            -> C.ConduitT B.ByteString [B.ByteString] m ()
bsConcatTo :: forall (m :: * -> *).
MonadIO m =>
Int -> ConduitT ByteString [ByteString] m ()
bsConcatTo Int
chunkSize = (ByteString -> ConduitT ByteString [ByteString] m ())
-> ConduitT ByteString [ByteString] m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
C.awaitForever (\ByteString
v -> [ByteString] -> Int -> ConduitT ByteString [ByteString] m ()
forall {m :: * -> *}.
Monad m =>
[ByteString] -> Int -> ConduitT ByteString [ByteString] m ()
continue [ByteString
v] (ByteString -> Int
B.length ByteString
v))
    where
        continue :: [ByteString] -> Int -> ConduitT ByteString [ByteString] m ()
continue [ByteString]
chunks !Int
s
            | Int
s Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
chunkSize = [ByteString] -> ConduitT ByteString [ByteString] m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield [ByteString]
chunks
            | Bool
otherwise = ConduitT ByteString [ByteString] m (Maybe ByteString)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await ConduitT ByteString [ByteString] m (Maybe ByteString)
-> (Maybe ByteString -> ConduitT ByteString [ByteString] m ())
-> ConduitT ByteString [ByteString] m ()
forall a b.
ConduitT ByteString [ByteString] m a
-> (a -> ConduitT ByteString [ByteString] m b)
-> ConduitT ByteString [ByteString] m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT ByteString [ByteString] m ()
-> (ByteString -> ConduitT ByteString [ByteString] m ())
-> Maybe ByteString
-> ConduitT ByteString [ByteString] m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
                                        ([ByteString] -> ConduitT ByteString [ByteString] m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield [ByteString]
chunks)
                                        (\ByteString
v -> [ByteString] -> Int -> ConduitT ByteString [ByteString] m ()
continue (ByteString
vByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
:[ByteString]
chunks) (Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
+ ByteString -> Int
B.length ByteString
v))

untilNothing :: forall m i. (Monad m) => C.ConduitT (Maybe i) i m ()
untilNothing :: forall (m :: * -> *) i. Monad m => ConduitT (Maybe i) i m ()
untilNothing = (Maybe i -> ConduitT (Maybe i) i m ()) -> ConduitT (Maybe i) i m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> ConduitT a b m ()) -> ConduitT a b m ()
awaitJust ((Maybe i -> ConduitT (Maybe i) i m ())
 -> ConduitT (Maybe i) i m ())
-> (Maybe i -> ConduitT (Maybe i) i m ())
-> ConduitT (Maybe i) i m ()
forall a b. (a -> b) -> a -> b
$ \case
    Just i
val -> do
        i -> ConduitT (Maybe i) i m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield i
val
        ConduitT (Maybe i) i m ()
forall (m :: * -> *) i. Monad m => ConduitT (Maybe i) i m ()
untilNothing
    Maybe i
_ -> () -> ConduitT (Maybe i) i m ()
forall a. a -> ConduitT (Maybe i) i m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()


genericAsyncFrom :: forall m. (MonadIO m, MonadUnliftIO m) => C.ConduitT B.ByteString B.ByteString m () -> Handle -> C.ConduitT () B.ByteString m ()
genericAsyncFrom :: forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
ConduitT ByteString ByteString m ()
-> Handle -> ConduitT () ByteString m ()
genericAsyncFrom ConduitT ByteString ByteString m ()
transform Handle
h = do
    let prod :: TBQueue (Maybe ByteString) -> m ()
prod TBQueue (Maybe ByteString)
q = do
                    ConduitT () Void m () -> m ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit (ConduitT () Void m () -> m ()) -> ConduitT () Void m () -> m ()
forall a b. (a -> b) -> a -> b
$
                        Handle -> ConduitT () ByteString m ()
forall (m :: * -> *) i.
MonadIO m =>
Handle -> ConduitT i ByteString m ()
C.sourceHandle Handle
h
                            ConduitT () ByteString m ()
-> ConduitT ByteString Void m () -> ConduitT () Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString ByteString m ()
transform
                            ConduitT ByteString ByteString m ()
-> ConduitT ByteString Void m () -> ConduitT ByteString Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| (ByteString -> Maybe ByteString)
-> ConduitT ByteString (Maybe ByteString) m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
CL.map ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just
                            ConduitT ByteString (Maybe ByteString) m ()
-> ConduitT (Maybe ByteString) Void m ()
-> ConduitT ByteString Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| TBQueue (Maybe ByteString) -> ConduitT (Maybe ByteString) Void m ()
forall (m :: * -> *) a z.
MonadIO m =>
TBQueue a -> ConduitT a z m ()
CA.sinkTBQueue TBQueue (Maybe ByteString)
q
                    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe ByteString) -> Maybe ByteString -> STM ()
forall a. TBQueue a -> a -> STM ()
TQ.writeTBQueue TBQueue (Maybe ByteString)
q Maybe ByteString
forall a. Maybe a
Nothing)
    Int
-> (TBQueue (Maybe ByteString) -> m ())
-> ConduitT () (Maybe ByteString) m ()
forall (m :: * -> *) o.
(MonadIO m, MonadUnliftIO m) =>
Int -> (TBQueue o -> m ()) -> ConduitT () o m ()
CA.gatherFrom Int
8 TBQueue (Maybe ByteString) -> m ()
prod ConduitT () (Maybe ByteString) m ()
-> ConduitT (Maybe ByteString) ByteString m ()
-> ConduitT () ByteString m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT (Maybe ByteString) ByteString m ()
forall (m :: * -> *) i. Monad m => ConduitT (Maybe i) i m ()
untilNothing

genericAsyncTo :: forall m. (MonadIO m, MonadUnliftIO m) => C.ConduitT B.ByteString B.ByteString (R.ResourceT IO) () -> Handle -> C.ConduitT B.ByteString C.Void m ()
genericAsyncTo :: forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
ConduitT ByteString ByteString (ResourceT IO) ()
-> Handle -> ConduitT ByteString Void m ()
genericAsyncTo ConduitT ByteString ByteString (ResourceT IO) ()
tranform Handle
h = do
    let drain :: TBQueue (Maybe [ByteString]) -> m ()
drain TBQueue (Maybe [ByteString])
q = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (ConduitT () Void (ResourceT IO) () -> IO ())
-> ConduitT () Void (ResourceT IO) ()
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConduitT () Void (ResourceT IO) () -> IO ()
forall (m :: * -> *) r.
MonadUnliftIO m =>
ConduitT () Void (ResourceT m) r -> m r
C.runConduitRes (ConduitT () Void (ResourceT IO) () -> m ())
-> ConduitT () Void (ResourceT IO) () -> m ()
forall a b. (a -> b) -> a -> b
$
                TBQueue (Maybe [ByteString])
-> ConduitT () (Maybe [ByteString]) (ResourceT IO) ()
forall (m :: * -> *) a z.
MonadIO m =>
TBQueue a -> ConduitT z a m ()
CA.sourceTBQueue TBQueue (Maybe [ByteString])
q
                    ConduitT () (Maybe [ByteString]) (ResourceT IO) ()
-> ConduitT (Maybe [ByteString]) Void (ResourceT IO) ()
-> ConduitT () Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT (Maybe [ByteString]) [ByteString] (ResourceT IO) ()
forall (m :: * -> *) i. Monad m => ConduitT (Maybe i) i m ()
untilNothing
                    ConduitT (Maybe [ByteString]) [ByteString] (ResourceT IO) ()
-> ConduitT [ByteString] Void (ResourceT IO) ()
-> ConduitT (Maybe [ByteString]) Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ([ByteString] -> ByteString)
-> ConduitT [ByteString] ByteString (ResourceT IO) ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
CL.map ([ByteString] -> ByteString
B.concat ([ByteString] -> ByteString)
-> ([ByteString] -> [ByteString]) -> [ByteString] -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ByteString] -> [ByteString]
forall a. [a] -> [a]
reverse)
                    ConduitT [ByteString] ByteString (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> ConduitT [ByteString] Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString ByteString (ResourceT IO) ()
tranform
                    ConduitT ByteString ByteString (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
-> ConduitT ByteString Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| Handle -> ConduitT ByteString Void (ResourceT IO) ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitT ByteString o m ()
C.sinkHandle Handle
h
    Int -> ConduitT ByteString [ByteString] m ()
forall (m :: * -> *).
MonadIO m =>
Int -> ConduitT ByteString [ByteString] m ()
bsConcatTo ((Int
2 :: Int) Int -> Int -> Int
forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
15 :: Int))
        ConduitT ByteString [ByteString] m ()
-> ConduitT [ByteString] Void m () -> ConduitT ByteString Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| Int
-> (TBQueue (Maybe [ByteString]) -> m ())
-> ConduitT [ByteString] Void m ()
forall (m :: * -> *) i r.
(MonadIO m, MonadUnliftIO m) =>
Int -> (TBQueue (Maybe i) -> m r) -> ConduitT i Void m r
CA.drainTo Int
8 TBQueue (Maybe [ByteString]) -> m ()
forall {m :: * -> *}.
MonadIO m =>
TBQueue (Maybe [ByteString]) -> m ()
drain


genericFromFile :: forall m. (MonadResource m, MonadUnliftIO m) => (Handle -> C.ConduitT () B.ByteString m ()) -> FilePath -> C.ConduitT () B.ByteString m ()
genericFromFile :: forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
(Handle -> ConduitT () ByteString m ())
-> [Char] -> ConduitT () ByteString m ()
genericFromFile Handle -> ConduitT () ByteString m ()
from [Char]
fname = IO Handle
-> (Handle -> IO ())
-> (Handle -> ConduitT () ByteString m ())
-> ConduitT () ByteString m ()
forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
C.bracketP
    ([Char] -> IOMode -> IO Handle
openFile [Char]
fname IOMode
ReadMode)
    Handle -> IO ()
hClose
    Handle -> ConduitT () ByteString m ()
from

genericToFile :: forall m. (MonadResource m, MonadUnliftIO m) => (Handle -> C.ConduitT B.ByteString C.Void m ()) -> FilePath -> C.ConduitT B.ByteString C.Void m ()
genericToFile :: forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
(Handle -> ConduitT ByteString Void m ())
-> [Char] -> ConduitT ByteString Void m ()
genericToFile Handle -> ConduitT ByteString Void m ()
to [Char]
fname = IO Handle
-> (Handle -> IO ())
-> (Handle -> ConduitT ByteString Void m ())
-> ConduitT ByteString Void m ()
forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
C.bracketP
    ([Char] -> IOMode -> IO Handle
openFile [Char]
fname IOMode
WriteMode)
    Handle -> IO ()
hClose
    Handle -> ConduitT ByteString Void m ()
to

-- | A simple sink which performs gzip compression in a separate thread and
-- writes the results to `h`.
--
-- See also 'asyncGzipToFile'
asyncGzipTo :: forall m. (MonadIO m, MonadUnliftIO m) => Handle -> C.ConduitT B.ByteString C.Void m ()
asyncGzipTo :: forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
Handle -> ConduitT ByteString Void m ()
asyncGzipTo = Int -> Handle -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
Int -> Handle -> ConduitT ByteString Void m ()
asyncGzipTo' (-Int
1)

-- | A simple sink which performs gzip compression in a separate thread and
-- writes the results to `h` with a given compression level.
asyncGzipTo' :: forall m. (MonadIO m, MonadUnliftIO m) => Int -> Handle -> C.ConduitT B.ByteString C.Void m ()
asyncGzipTo' :: forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
Int -> Handle -> ConduitT ByteString Void m ()
asyncGzipTo' Int
clevel Handle
h = ConduitT ByteString ByteString (ResourceT IO) ()
-> Handle -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
ConduitT ByteString ByteString (ResourceT IO) ()
-> Handle -> ConduitT ByteString Void m ()
genericAsyncTo ConduitT ByteString ByteString (ResourceT IO) ()
gz Handle
h
    where
        gz :: ConduitT ByteString ByteString (ResourceT IO) ()
gz = Int
-> WindowBits -> ConduitT ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *).
(PrimMonad m, MonadThrow m) =>
Int -> WindowBits -> ConduitT ByteString ByteString m ()
CZ.compress Int
clevel (Int -> WindowBits
CZ.WindowBits Int
31) ConduitT ByteString ByteString (ResourceT IO) ()
-> (ZlibException
    -> ConduitT ByteString ByteString (ResourceT IO) ())
-> ConduitT ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *) e i o r.
(MonadUnliftIO m, Exception e) =>
ConduitT i o m r -> (e -> ConduitT i o m r) -> ConduitT i o m r
`C.catchC` ZlibException -> ConduitT ByteString ByteString (ResourceT IO) ()
forall {a}.
ZlibException -> ConduitT ByteString ByteString (ResourceT IO) a
handleZLibException
        handleZLibException :: ZlibException -> ConduitT ByteString ByteString (ResourceT IO) a
handleZLibException = \(ZlibException
e :: SZ.ZlibException) ->
                                    IO a -> ConduitT ByteString ByteString (ResourceT IO) a
forall a. IO a -> ConduitT ByteString ByteString (ResourceT IO) a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> ConduitT ByteString ByteString (ResourceT IO) a)
-> (IOError -> IO a)
-> IOError
-> ConduitT ByteString ByteString (ResourceT IO) a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IOError -> IO a
forall a. IOError -> IO a
ioError (IOError -> ConduitT ByteString ByteString (ResourceT IO) a)
-> IOError -> ConduitT ByteString ByteString (ResourceT IO) a
forall a b. (a -> b) -> a -> b
$ IOErrorType -> [Char] -> Maybe Handle -> Maybe [Char] -> IOError
mkIOError IOErrorType
userErrorType ([Char]
"Error compressing gzip stream: "[Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ZlibException -> [Char]
forall e. Exception e => e -> [Char]
displayException ZlibException
e) (Handle -> Maybe Handle
forall a. a -> Maybe a
Just Handle
h) Maybe [Char]
forall a. Maybe a
Nothing

-- | Compresses the output and writes to the given file with compression being
-- performed in a separate thread.
--
-- See also 'asyncGzipTo'
asyncGzipToFile :: forall m. (MonadResource m, MonadUnliftIO m) => FilePath -> C.ConduitT B.ByteString C.Void m ()
asyncGzipToFile :: forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
[Char] -> ConduitT ByteString Void m ()
asyncGzipToFile = (Handle -> ConduitT ByteString Void m ())
-> [Char] -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
(Handle -> ConduitT ByteString Void m ())
-> [Char] -> ConduitT ByteString Void m ()
genericToFile Handle -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
Handle -> ConduitT ByteString Void m ()
asyncGzipTo

-- | A source which produces the ungzipped content from the the given handle.
-- Note that this "reads ahead" so if you do not use all the input, the Handle
-- will probably be left at an undefined position in the file.
--
-- Note: unlike the ungzip conduit from 'Data.Conduit.Zlib', this function will
-- read *all* the compressed files in the stream (not just the first).
--
-- See also 'asyncGzipFromFile'
asyncGzipFrom :: forall m. (MonadIO m, MonadResource m, MonadUnliftIO m) => Handle -> C.ConduitT () B.ByteString m ()
asyncGzipFrom :: forall (m :: * -> *).
(MonadIO m, MonadResource m, MonadUnliftIO m) =>
Handle -> ConduitT () ByteString m ()
asyncGzipFrom Handle
h = do
    let prod :: TBQueue (Maybe ByteString) -> m ()
prod TBQueue (Maybe ByteString)
q = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                    ConduitT () Void IO () -> IO ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
C.runConduit (ConduitT () Void IO () -> IO ())
-> ConduitT () Void IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                        Handle -> ConduitT () ByteString IO ()
forall (m :: * -> *) i.
MonadIO m =>
Handle -> ConduitT i ByteString m ()
C.sourceHandle Handle
h
                            ConduitT () ByteString IO ()
-> ConduitT ByteString Void IO () -> ConduitT () Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString ByteString IO ()
-> ConduitT ByteString ByteString IO ()
forall (m :: * -> *) a.
Monad m =>
ConduitT ByteString a m () -> ConduitT ByteString a m ()
CZ.multiple ConduitT ByteString ByteString IO ()
forall (m :: * -> *).
(PrimMonad m, MonadThrow m) =>
ConduitT ByteString ByteString m ()
CZ.ungzip
                            ConduitT ByteString ByteString IO ()
-> ConduitT ByteString Void IO () -> ConduitT ByteString Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| (ByteString -> Maybe ByteString)
-> ConduitT ByteString (Maybe ByteString) IO ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
CL.map ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just
                            ConduitT ByteString (Maybe ByteString) IO ()
-> ConduitT (Maybe ByteString) Void IO ()
-> ConduitT ByteString Void IO ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| TBQueue (Maybe ByteString)
-> ConduitT (Maybe ByteString) Void IO ()
forall (m :: * -> *) a z.
MonadIO m =>
TBQueue a -> ConduitT a z m ()
CA.sinkTBQueue TBQueue (Maybe ByteString)
q
                    STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue (Maybe ByteString) -> Maybe ByteString -> STM ()
forall a. TBQueue a -> a -> STM ()
TQ.writeTBQueue TBQueue (Maybe ByteString)
q Maybe ByteString
forall a. Maybe a
Nothing)
    (Int
-> (TBQueue (Maybe ByteString) -> m ())
-> ConduitT () (Maybe ByteString) m ()
forall (m :: * -> *) o.
(MonadIO m, MonadUnliftIO m) =>
Int -> (TBQueue o -> m ()) -> ConduitT () o m ()
CA.gatherFrom Int
8 TBQueue (Maybe ByteString) -> m ()
forall {m :: * -> *}.
MonadIO m =>
TBQueue (Maybe ByteString) -> m ()
prod ConduitT () (Maybe ByteString) m ()
-> ConduitT (Maybe ByteString) ByteString m ()
-> ConduitT () ByteString m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT (Maybe ByteString) ByteString m ()
forall (m :: * -> *) i. Monad m => ConduitT (Maybe i) i m ()
untilNothing)
        ConduitT () ByteString m ()
-> (ZlibException -> ConduitT () ByteString m ())
-> ConduitT () ByteString m ()
forall (m :: * -> *) e i o r.
(MonadUnliftIO m, Exception e) =>
ConduitT i o m r -> (e -> ConduitT i o m r) -> ConduitT i o m r
`C.catchC`
        (\(ZlibException
e :: SZ.ZlibException) -> IO () -> ConduitT () ByteString m ()
forall a. IO a -> ConduitT () ByteString m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT () ByteString m ())
-> (IOError -> IO ()) -> IOError -> ConduitT () ByteString m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IOError -> IO ()
forall a. IOError -> IO a
ioError (IOError -> ConduitT () ByteString m ())
-> IOError -> ConduitT () ByteString m ()
forall a b. (a -> b) -> a -> b
$ IOErrorType -> [Char] -> Maybe Handle -> Maybe [Char] -> IOError
mkIOError IOErrorType
userErrorType ([Char]
"Error reading gzip file: "[Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ZlibException -> [Char]
forall e. Exception e => e -> [Char]
displayException ZlibException
e) (Handle -> Maybe Handle
forall a. a -> Maybe a
Just Handle
h) Maybe [Char]
forall a. Maybe a
Nothing)

-- | Open and read a gzip file with the uncompression being performed in a
-- separate thread.
--
-- See also 'asyncGzipFrom'
asyncGzipFromFile :: forall m. (MonadResource m, MonadUnliftIO m) => FilePath -> C.ConduitT () B.ByteString m ()
asyncGzipFromFile :: forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
[Char] -> ConduitT () ByteString m ()
asyncGzipFromFile = (Handle -> ConduitT () ByteString m ())
-> [Char] -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
(Handle -> ConduitT () ByteString m ())
-> [Char] -> ConduitT () ByteString m ()
genericFromFile Handle -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadIO m, MonadResource m, MonadUnliftIO m) =>
Handle -> ConduitT () ByteString m ()
asyncGzipFrom

-- | A simple sink which performs bzip2 compression in a separate thread and
-- writes the results to `h`.
--
-- See also 'asyncBzip2ToFile'
asyncBzip2To :: forall m. (MonadIO m, MonadUnliftIO m) => Handle -> C.ConduitT B.ByteString C.Void m ()
asyncBzip2To :: forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
Handle -> ConduitT ByteString Void m ()
asyncBzip2To = ConduitT ByteString ByteString (ResourceT IO) ()
-> Handle -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
ConduitT ByteString ByteString (ResourceT IO) ()
-> Handle -> ConduitT ByteString Void m ()
genericAsyncTo ConduitT ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *).
MonadResource m =>
ConduitT ByteString ByteString m ()
CBZ.bzip2

-- | Compresses the output and writes to the given file with compression being
-- performed in a separate thread.
--
-- See also 'asyncBzip2To'
asyncBzip2ToFile :: forall m. (MonadResource m, MonadUnliftIO m) => FilePath -> C.ConduitT B.ByteString C.Void m ()
asyncBzip2ToFile :: forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
[Char] -> ConduitT ByteString Void m ()
asyncBzip2ToFile = (Handle -> ConduitT ByteString Void m ())
-> [Char] -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
(Handle -> ConduitT ByteString Void m ())
-> [Char] -> ConduitT ByteString Void m ()
genericToFile Handle -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
Handle -> ConduitT ByteString Void m ()
asyncBzip2To

-- | A source which produces the bzipped2 content from the the given handle.
-- Note that this "reads ahead" so if you do not use all the input, the Handle
-- will probably be left at an undefined position in the file.
--
-- See also 'asyncBzip2FromFile'
asyncBzip2From :: forall m. (MonadIO m, MonadResource m, MonadUnliftIO m) => Handle -> C.ConduitT () B.ByteString m ()
asyncBzip2From :: forall (m :: * -> *).
(MonadIO m, MonadResource m, MonadUnliftIO m) =>
Handle -> ConduitT () ByteString m ()
asyncBzip2From = ConduitT ByteString ByteString m ()
-> Handle -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
ConduitT ByteString ByteString m ()
-> Handle -> ConduitT () ByteString m ()
genericAsyncFrom (ConduitT ByteString ByteString m ()
-> ConduitT ByteString ByteString m ()
forall (m :: * -> *) a.
Monad m =>
ConduitT ByteString a m () -> ConduitT ByteString a m ()
CZ.multiple ConduitT ByteString ByteString m ()
forall (m :: * -> *).
MonadResource m =>
ConduitT ByteString ByteString m ()
CBZ.bunzip2)

-- | Open and read a bzip2 file with the uncompression being performed in a
-- separate thread.
--
-- See also 'asyncBzip2From'
asyncBzip2FromFile :: forall m. (MonadResource m, MonadUnliftIO m) => FilePath -> C.ConduitT () B.ByteString m ()
asyncBzip2FromFile :: forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
[Char] -> ConduitT () ByteString m ()
asyncBzip2FromFile = (Handle -> ConduitT () ByteString m ())
-> [Char] -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
(Handle -> ConduitT () ByteString m ())
-> [Char] -> ConduitT () ByteString m ()
genericFromFile Handle -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadIO m, MonadResource m, MonadUnliftIO m) =>
Handle -> ConduitT () ByteString m ()
asyncBzip2From

-- | A simple sink which performs lzma/xz compression in a separate thread and
-- writes the results to `h`.
--
-- See also 'asyncXzToFile'
asyncXzTo :: forall m. (MonadIO m, MonadResource m, MonadUnliftIO m) => Handle -> C.ConduitT B.ByteString C.Void m ()
asyncXzTo :: forall (m :: * -> *).
(MonadIO m, MonadResource m, MonadUnliftIO m) =>
Handle -> ConduitT ByteString Void m ()
asyncXzTo = ConduitT ByteString ByteString (ResourceT IO) ()
-> Handle -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
ConduitT ByteString ByteString (ResourceT IO) ()
-> Handle -> ConduitT ByteString Void m ()
genericAsyncTo (Maybe Int -> ConduitT ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
Maybe Int -> ConduitM ByteString ByteString m ()
CX.compress Maybe Int
forall a. Maybe a
Nothing)

-- | A simple sink which performs lzma/xz compression in a separate thread and
-- writes the results to `h`.
--
-- See also 'asyncXzToFile' and 'asyncXzTo'
asyncXzTo' :: forall m. (MonadIO m, MonadResource m, MonadUnliftIO m) => Int -> Handle -> C.ConduitT B.ByteString C.Void m ()
asyncXzTo' :: forall (m :: * -> *).
(MonadIO m, MonadResource m, MonadUnliftIO m) =>
Int -> Handle -> ConduitT ByteString Void m ()
asyncXzTo' Int
clevel = ConduitT ByteString ByteString (ResourceT IO) ()
-> Handle -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
ConduitT ByteString ByteString (ResourceT IO) ()
-> Handle -> ConduitT ByteString Void m ()
genericAsyncTo (Maybe Int -> ConduitT ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
Maybe Int -> ConduitM ByteString ByteString m ()
CX.compress (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
clevel))

-- | Compresses the output and writes to the given file with compression being
-- performed in a separate thread.
--
-- See also 'asyncXzTo'
asyncXzToFile :: forall m. (MonadResource m, MonadUnliftIO m) => FilePath -> C.ConduitT B.ByteString C.Void m ()
asyncXzToFile :: forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
[Char] -> ConduitT ByteString Void m ()
asyncXzToFile = (Handle -> ConduitT ByteString Void m ())
-> [Char] -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
(Handle -> ConduitT ByteString Void m ())
-> [Char] -> ConduitT ByteString Void m ()
genericToFile Handle -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadIO m, MonadResource m, MonadUnliftIO m) =>
Handle -> ConduitT ByteString Void m ()
asyncXzTo

-- | A source which produces the unxzipped content from the the given handle.
-- Note that this "reads ahead" so if you do not use all the input, the Handle
-- will probably be left at an undefined position in the file.
--
-- See also 'asyncXzFromFile'
asyncXzFrom :: forall m. (MonadIO m, MonadResource m, MonadUnliftIO m, MonadThrow m) => Handle -> C.ConduitT () B.ByteString m ()
asyncXzFrom :: forall (m :: * -> *).
(MonadIO m, MonadResource m, MonadUnliftIO m, MonadThrow m) =>
Handle -> ConduitT () ByteString m ()
asyncXzFrom =
    let oneGBmembuffer :: Maybe Word64
oneGBmembuffer = Word64 -> Maybe Word64
forall a. a -> Maybe a
Just (Word64 -> Maybe Word64) -> Word64 -> Maybe Word64
forall a b. (a -> b) -> a -> b
$ Word64
1024 Word64 -> Integer -> Word64
forall a b. (Num a, Integral b) => a -> b -> a
^ (Integer
3 :: Integer)
    in ConduitT ByteString ByteString m ()
-> Handle -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
ConduitT ByteString ByteString m ()
-> Handle -> ConduitT () ByteString m ()
genericAsyncFrom (Maybe Word64 -> ConduitT ByteString ByteString m ()
forall (m :: * -> *).
(MonadThrow m, MonadIO m) =>
Maybe Word64 -> ConduitM ByteString ByteString m ()
CX.decompress Maybe Word64
oneGBmembuffer)

-- | Open and read a lzma/xz file with the uncompression being performed in a
-- separate thread.
--
-- See also 'asyncXzFrom'
asyncXzFromFile :: forall m. (MonadResource m, MonadUnliftIO m, MonadThrow m) => FilePath -> C.ConduitT () B.ByteString m ()
asyncXzFromFile :: forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m, MonadThrow m) =>
[Char] -> ConduitT () ByteString m ()
asyncXzFromFile = (Handle -> ConduitT () ByteString m ())
-> [Char] -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
(Handle -> ConduitT () ByteString m ())
-> [Char] -> ConduitT () ByteString m ()
genericFromFile Handle -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadIO m, MonadResource m, MonadUnliftIO m, MonadThrow m) =>
Handle -> ConduitT () ByteString m ()
asyncXzFrom

-- | Decompress ZStd format using a separate thread
--
-- See also 'asyncZstdFromFile'
asyncZstdFrom :: forall m. (MonadIO m, MonadUnliftIO m) => Handle -> C.ConduitT () B.ByteString m ()
asyncZstdFrom :: forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
Handle -> ConduitT () ByteString m ()
asyncZstdFrom = ConduitT ByteString ByteString m ()
-> Handle -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
ConduitT ByteString ByteString m ()
-> Handle -> ConduitT () ByteString m ()
genericAsyncFrom ConduitT ByteString ByteString m ()
forall (m :: * -> *).
MonadIO m =>
ConduitT ByteString ByteString m ()
CZstd.decompress

-- | Compress in ZStd format using a separate thread and write to a file
-- See also 'asyncZstdFrom'
asyncZstdFromFile :: forall m. (MonadResource m, MonadUnliftIO m, MonadThrow m) => FilePath -> C.ConduitT () B.ByteString m ()
asyncZstdFromFile :: forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m, MonadThrow m) =>
[Char] -> ConduitT () ByteString m ()
asyncZstdFromFile = (Handle -> ConduitT () ByteString m ())
-> [Char] -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
(Handle -> ConduitT () ByteString m ())
-> [Char] -> ConduitT () ByteString m ()
genericFromFile Handle -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
Handle -> ConduitT () ByteString m ()
asyncZstdFrom


-- | Compress in Zstd format using a separate thread
-- 
-- See also 'asyncZstdToFile'
asyncZstdTo :: forall m. (MonadIO m, MonadUnliftIO m) =>
                Int -- ^ compression level
                -> Handle -> C.ConduitT B.ByteString C.Void m ()
asyncZstdTo :: forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
Int -> Handle -> ConduitT ByteString Void m ()
asyncZstdTo Int
clevel = ConduitT ByteString ByteString (ResourceT IO) ()
-> Handle -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
ConduitT ByteString ByteString (ResourceT IO) ()
-> Handle -> ConduitT ByteString Void m ()
genericAsyncTo (Int -> ConduitT ByteString ByteString (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
Int -> ConduitT ByteString ByteString m ()
CZstd.compress Int
clevel)


-- | Compress in ZStd format using a separate thread and write to a file
--
-- This will use compression level 3 as this is the default in the ZStd C API
--
-- See also 'asyncZstdTo'
asyncZstdToFile :: forall m. (MonadResource m, MonadUnliftIO m) => FilePath -> C.ConduitT B.ByteString C.Void m ()
asyncZstdToFile :: forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
[Char] -> ConduitT ByteString Void m ()
asyncZstdToFile = (Handle -> ConduitT ByteString Void m ())
-> [Char] -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
(Handle -> ConduitT ByteString Void m ())
-> [Char] -> ConduitT ByteString Void m ()
genericToFile (Int -> Handle -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
Int -> Handle -> ConduitT ByteString Void m ()
asyncZstdTo Int
3)

-- | If the filename indicates a supported compressed file (gzip, xz, and, on
-- Unix, bzip2), then it reads it and uncompresses it.
--
-- Usage
--
-- @
--
--      withPossiblyCompressedFile fname $ \src ->
--          runConduit (src .| mySink)
-- @
--
-- Unlike 'conduitPossiblyCompressedFile', this ensures that the file is closed
-- even if the conduit terminates early.
--
-- On Windows, attempting to read from a bzip2 file, results in 'error'.
withPossiblyCompressedFile :: (MonadUnliftIO m, MonadResource m, MonadThrow m) => FilePath -> (C.ConduitT () B.ByteString m () -> m a) -> m a
withPossiblyCompressedFile :: forall (m :: * -> *) a.
(MonadUnliftIO m, MonadResource m, MonadThrow m) =>
[Char] -> (ConduitT () ByteString m () -> m a) -> m a
withPossiblyCompressedFile [Char]
fname ConduitT () ByteString m () -> m a
inner = ((forall a. m a -> IO a) -> IO a) -> m a
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO a) -> m a)
-> ((forall a. m a -> IO a) -> IO a) -> m a
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
    [Char] -> IOMode -> (Handle -> IO a) -> IO a
forall r. [Char] -> IOMode -> (Handle -> IO r) -> IO r
IO.withBinaryFile [Char]
fname IOMode
IO.ReadMode ((Handle -> IO a) -> IO a) -> (Handle -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$
        m a -> IO a
forall a. m a -> IO a
run (m a -> IO a) -> (Handle -> m a) -> Handle -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConduitT () ByteString m () -> m a
inner (ConduitT () ByteString m () -> m a)
-> (Handle -> ConduitT () ByteString m ()) -> Handle -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> Handle -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadResource m, MonadThrow m) =>
[Char] -> Handle -> ConduitT () ByteString m ()
withPossiblyCompressedFile' [Char]
fname

withPossiblyCompressedFile' :: (MonadUnliftIO m, MonadResource m, MonadThrow m) => FilePath -> Handle -> C.ConduitT () B.ByteString m ()
withPossiblyCompressedFile' :: forall (m :: * -> *).
(MonadUnliftIO m, MonadResource m, MonadThrow m) =>
[Char] -> Handle -> ConduitT () ByteString m ()
withPossiblyCompressedFile' [Char]
fname
    | [Char]
".gz" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = Handle -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadIO m, MonadResource m, MonadUnliftIO m) =>
Handle -> ConduitT () ByteString m ()
asyncGzipFrom
    | [Char]
".xz" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = Handle -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadIO m, MonadResource m, MonadUnliftIO m, MonadThrow m) =>
Handle -> ConduitT () ByteString m ()
asyncXzFrom
    | [Char]
".bz2" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = Handle -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadIO m, MonadResource m, MonadUnliftIO m) =>
Handle -> ConduitT () ByteString m ()
asyncBzip2From
    | [Char]
".zst" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = Handle -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
Handle -> ConduitT () ByteString m ()
asyncZstdFrom
    | [Char]
".zstd" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = Handle -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
Handle -> ConduitT () ByteString m ()
asyncZstdFrom
    | Bool
otherwise = Handle -> ConduitT () ByteString m ()
forall (m :: * -> *) i.
MonadIO m =>
Handle -> ConduitT i ByteString m ()
C.sourceHandle


-- | If the filename indicates a supported compressed file (gzip, xz, and, on
-- Unix, bzip2), then it provides an output source
--
-- Usage
--
-- @
--
--      withPossiblyCompressedFileOutput fname $ \out ->
--          runConduit (mySrc .| out)
-- @
--
-- This ensures that the file is closed even if the conduit terminates early.
--
-- On Windows, attempting to read from a bzip2 file, results in 'error'.
withPossiblyCompressedFileOutput :: (MonadUnliftIO m, MonadResource m, MonadThrow m) => FilePath -> (C.ConduitT B.ByteString C.Void m () -> m a) -> m a
withPossiblyCompressedFileOutput :: forall (m :: * -> *) a.
(MonadUnliftIO m, MonadResource m, MonadThrow m) =>
[Char] -> (ConduitT ByteString Void m () -> m a) -> m a
withPossiblyCompressedFileOutput [Char]
fname ConduitT ByteString Void m () -> m a
inner = ((forall a. m a -> IO a) -> IO a) -> m a
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO a) -> m a)
-> ((forall a. m a -> IO a) -> IO a) -> m a
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
    [Char] -> IOMode -> (Handle -> IO a) -> IO a
forall r. [Char] -> IOMode -> (Handle -> IO r) -> IO r
IO.withBinaryFile [Char]
fname IOMode
IO.WriteMode ((Handle -> IO a) -> IO a) -> (Handle -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$
        m a -> IO a
forall a. m a -> IO a
run (m a -> IO a) -> (Handle -> m a) -> Handle -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ConduitT ByteString Void m () -> m a
inner (ConduitT ByteString Void m () -> m a)
-> (Handle -> ConduitT ByteString Void m ()) -> Handle -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> Handle -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadResource m, MonadThrow m) =>
[Char] -> Handle -> ConduitT ByteString Void m ()
withPossiblyCompressedFileOutput' [Char]
fname


withPossiblyCompressedFileOutput' :: (MonadUnliftIO m, MonadResource m, MonadThrow m) => FilePath -> Handle -> C.ConduitT B.ByteString C.Void m ()
withPossiblyCompressedFileOutput' :: forall (m :: * -> *).
(MonadUnliftIO m, MonadResource m, MonadThrow m) =>
[Char] -> Handle -> ConduitT ByteString Void m ()
withPossiblyCompressedFileOutput' [Char]
fname
    | [Char]
".gz" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = Handle -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
Handle -> ConduitT ByteString Void m ()
asyncGzipTo
    | [Char]
".xz" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = Int -> Handle -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadIO m, MonadResource m, MonadUnliftIO m) =>
Int -> Handle -> ConduitT ByteString Void m ()
asyncXzTo' Int
6
    | [Char]
".bz2" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = Handle -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
Handle -> ConduitT ByteString Void m ()
asyncBzip2To
    | [Char]
".zst" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = Int -> Handle -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
Int -> Handle -> ConduitT ByteString Void m ()
asyncZstdTo Int
3
    | [Char]
".zstd" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = Int -> Handle -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadIO m, MonadUnliftIO m) =>
Int -> Handle -> ConduitT ByteString Void m ()
asyncZstdTo Int
3
    | Bool
otherwise = Handle -> ConduitT ByteString Void m ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitT ByteString o m ()
C.sinkHandle


-- | If the filename indicates a gzipped file (or, on Unix, also a bz2 file),
-- then it reads it and uncompresses it.
--
--
-- To ensure that the file is closed even if the downstream finishes early,
-- consider using 'withPossiblyCompressedFile'.
--
-- On Windows, attempting to read from a bzip2 file, results in 'error'.
conduitPossiblyCompressedFile :: (MonadUnliftIO m, MonadResource m, MonadThrow m) => FilePath -> C.ConduitT () B.ByteString m ()
conduitPossiblyCompressedFile :: forall (m :: * -> *).
(MonadUnliftIO m, MonadResource m, MonadThrow m) =>
[Char] -> ConduitT () ByteString m ()
conduitPossiblyCompressedFile [Char]
fname
    | [Char]
".gz" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = [Char] -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
[Char] -> ConduitT () ByteString m ()
asyncGzipFromFile [Char]
fname
    | [Char]
".xz" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = [Char] -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m, MonadThrow m) =>
[Char] -> ConduitT () ByteString m ()
asyncXzFromFile [Char]
fname
    | [Char]
".bz2" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = [Char] -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
[Char] -> ConduitT () ByteString m ()
asyncBzip2FromFile [Char]
fname
    | [Char]
".zst" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = [Char] -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m, MonadThrow m) =>
[Char] -> ConduitT () ByteString m ()
asyncZstdFromFile [Char]
fname
    | [Char]
".zstd" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = [Char] -> ConduitT () ByteString m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m, MonadThrow m) =>
[Char] -> ConduitT () ByteString m ()
asyncZstdFromFile [Char]
fname
    | Bool
otherwise = [Char] -> ConduitT () ByteString m ()
forall (m :: * -> *) i.
MonadResource m =>
[Char] -> ConduitT i ByteString m ()
C.sourceFile [Char]
fname

-- | If the filename indicates a gzipped file (or, on Unix, also a bz2 file),
-- then it compresses and write with the algorithm matching the filename
--
-- Consider using 'withPossiblyCompressedFileOutput' to ensure prompt file closing.
--
-- On Windows, attempting to write to a bzip2 file, results in 'error'.
conduitPossiblyCompressedToFile :: (MonadUnliftIO m, MonadResource m) => FilePath -> C.ConduitT B.ByteString C.Void m ()
conduitPossiblyCompressedToFile :: forall (m :: * -> *).
(MonadUnliftIO m, MonadResource m) =>
[Char] -> ConduitT ByteString Void m ()
conduitPossiblyCompressedToFile [Char]
fname
    | [Char]
".gz" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = [Char] -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
[Char] -> ConduitT ByteString Void m ()
asyncGzipToFile [Char]
fname
    | [Char]
".xz" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = [Char] -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
[Char] -> ConduitT ByteString Void m ()
asyncXzToFile [Char]
fname
    | [Char]
".bz2" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = [Char] -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
[Char] -> ConduitT ByteString Void m ()
asyncBzip2ToFile [Char]
fname
    | [Char]
".zst" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = [Char] -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
[Char] -> ConduitT ByteString Void m ()
asyncZstdToFile [Char]
fname
    | [Char]
".zstd" [Char] -> [Char] -> Bool
forall a. Eq a => [a] -> [a] -> Bool
`isSuffixOf` [Char]
fname = [Char] -> ConduitT ByteString Void m ()
forall (m :: * -> *).
(MonadResource m, MonadUnliftIO m) =>
[Char] -> ConduitT ByteString Void m ()
asyncZstdToFile [Char]
fname
    | Bool
otherwise = [Char] -> ConduitT ByteString Void m ()
forall (m :: * -> *) o.
MonadResource m =>
[Char] -> ConduitT ByteString o m ()
C.sinkFile [Char]
fname