{-| Module : Data.Conduit.Algorithms.Async Copyright : 2013-2018 Luis Pedro Coelho License : MIT Maintainer : luis@luispedro.org Higher level async processing interfaces. -} {-# LANGUAGE ScopedTypeVariables, FlexibleContexts, CPP, TupleSections #-} module Data.Conduit.Algorithms.Async ( conduitPossiblyCompressedFile , conduitPossiblyCompressedToFile , withPossiblyCompressedFile , asyncMapC , asyncMapEitherC , asyncGzipTo , asyncGzipToFile , asyncGzipFrom , asyncGzipFromFile , asyncBzip2To , asyncBzip2ToFile , asyncBzip2From , asyncBzip2FromFile , asyncXzTo , asyncXzToFile , asyncXzFrom , asyncXzFromFile , unorderedAsyncMapC ) where import qualified Data.ByteString as B import qualified Control.Concurrent.Async as A import qualified Control.Concurrent.STM.TBQueue as TQ import Control.Concurrent.STM (atomically) import qualified Data.Conduit.Combinators as C import qualified Data.Conduit.Async as CA import qualified Data.Conduit.TQueue as CA import qualified Data.Conduit.List as CL import qualified Data.Conduit.Zlib as CZ import qualified Data.Conduit.Lzma as CX import qualified Data.Streaming.Zlib as SZ import qualified Data.Conduit.BZlib as CZ import qualified Data.Conduit as C import Data.Conduit ((.|)) import qualified Data.Sequence as Seq import Data.Sequence ((|>), ViewL(..)) import Data.Foldable (toList) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Error.Class (MonadError(..)) import Control.Monad.IO.Unlift (MonadUnliftIO, withRunInIO) import Control.Monad.Trans.Resource (MonadResource) import Control.Monad.Catch (MonadThrow) import Control.Exception (evaluate, displayException) import Control.DeepSeq (NFData, force) import System.IO.Error (mkIOError, userErrorType) import System.IO import qualified System.IO as IO import Data.List (isSuffixOf) import Data.Conduit.Algorithms.Utils (awaitJust) -- | This is like 'Data.Conduit.List.map', except that each element is processed -- in a separate thread (up to 'maxThreads' can be queued up at any one time). -- Results are evaluated to normal form (not weak-head normal form!, i.e., the -- structure is deeply evaluated) to ensure that the computation is fully -- evaluated in the worker thread. -- -- Note that there is some overhead in threading. It is often a good idea to -- build larger chunks of input before passing it to 'asyncMapC' to amortize -- the costs. That is, when @f@ is not a lot of work, instead of @asyncMapC f@, -- it is sometimes better to do -- -- @ -- CC.conduitVector 4096 .| asyncMapC (V.map f) .| CC.concat -- @ -- -- where @CC@ refers to 'Data.Conduit.Combinators' -- -- See 'unorderedAsyncMapC' asyncMapC :: forall a m b . (MonadIO m, NFData b) => Int -- ^ Maximum number of worker threads -> (a -> b) -- ^ Function to execute -> C.ConduitT a b m () asyncMapC = asyncMapCHelper True -- | A version of 'asyncMapC' which can reorder results in the stream -- -- If the order of the results is not important, this function can lead to a -- better use of resources if some of the chunks take longer to complete. -- -- See 'asyncMapC' unorderedAsyncMapC :: forall a m b . (MonadIO m, NFData b) => Int -- ^ Maximum number of worker threads -> (a -> b) -- ^ Function to execute -> C.ConduitT a b m () unorderedAsyncMapC = asyncMapCHelper False asyncMapCHelper :: forall a m b . (MonadIO m, NFData b) => Bool -> Int -- ^ Maximum number of worker threads -> (a -> b) -- ^ Function to execute -> C.ConduitT a b m () asyncMapCHelper isSynchronous maxThreads f = initLoop (0 :: Int) (Seq.empty :: Seq.Seq (A.Async b)) where initLoop :: Int -> Seq.Seq (A.Async b) -> C.ConduitT a b m () initLoop size q | size == maxThreads = loop q | otherwise = C.await >>= \case Nothing -> yAll q Just v -> do v' <- sched v initLoop (size + 1) (q |> v') sched :: a -> C.ConduitM a b m (A.Async b) sched v = liftIO . A.async . evaluate . force $ f v -- | yield all yAll :: Seq.Seq (A.Async b) -> C.ConduitT a b m () yAll q | Seq.null q = return () | otherwise = do (r, q') <- liftIO $ retrieveResult q C.yield r yAll q' loop :: Seq.Seq (A.Async b) -> C.ConduitT a b m () loop q = C.await >>= \case Nothing -> yAll q Just v -> do v' <- sched v (r, q') <- liftIO $ retrieveResult q C.yield r loop (q' |> v') retrieveResult :: Seq.Seq (A.Async b) -> IO (b, Seq.Seq (A.Async b)) retrieveResult q | isSynchronous = case Seq.viewl q of (r :< rest) -> (, rest) <$> A.wait r _ -> error "Impossible situation" | otherwise = do (k, r) <- liftIO (A.waitAny (toList q)) return (r, Seq.filter (/= k) q) -- | 'asyncMapC' with error handling. The inner function can now return an -- error (as a 'Left'). When the first error is seen, it 'throwError's in the -- main monad. Note that 'f' may be evaluated for arguments beyond the first -- error (as some threads may be running in the background and already -- processing elements after the first error). -- -- See 'asyncMapC' asyncMapEitherC :: forall a m b e . (MonadIO m, NFData b, NFData e, MonadError e m) => Int -> (a -> Either e b) -> C.ConduitT a b m () asyncMapEitherC maxThreads f = asyncMapC maxThreads f .| (C.awaitForever $ \case Right v -> C.yield v Left err -> throwError err) -- | concatenates input into larger chunks and yields it. Its indended use is -- to build up larger blocks from smaller ones so that they can be sent across -- thread barriers with little overhead. -- -- the chunkSize parameter is a hint, not an exact element. In particular, -- larger chunks are not split up and smaller chunks can be yielded too. bsConcatTo :: MonadIO m => Int -- ^ chunk hint -> C.ConduitT B.ByteString [B.ByteString] m () bsConcatTo chunkSize = awaitJust start where start v | B.length v >= chunkSize = C.yield [v] >> bsConcatTo chunkSize | otherwise = continue [v] (B.length v) continue chunks s = C.await >>= \case Nothing -> C.yield chunks Just v | B.length v + s > chunkSize -> C.yield chunks >> start v | otherwise -> continue (v:chunks) (s + B.length v) untilNothing :: forall m i. (Monad m) => C.ConduitT (Maybe i) i m () untilNothing = awaitJust $ \case Just val -> do C.yield val untilNothing _ -> return () -- | A simple sink which performs gzip compression in a separate thread and -- writes the results to `h`. -- -- See also 'asyncGzipToFile' asyncGzipTo :: forall m. (MonadIO m, MonadUnliftIO m) => Handle -> C.ConduitT B.ByteString C.Void m () asyncGzipTo h = do let drain q = liftIO . C.runConduit $ CA.sourceTBQueue q .| untilNothing .| CL.map (B.concat . reverse) .| CZ.gzip .| C.sinkHandle h bsConcatTo ((2 :: Int) ^ (15 :: Int)) .| CA.drainTo 8 drain -- | Compresses the output and writes to the given file with compression being -- performed in a separate thread. -- -- See also 'asyncGzipTo' asyncGzipToFile :: forall m. (MonadResource m, MonadUnliftIO m) => FilePath -> C.ConduitT B.ByteString C.Void m () asyncGzipToFile fname = C.bracketP (openFile fname WriteMode) hClose asyncGzipTo -- | A source which produces the ungzipped content from the the given handle. -- Note that this "reads ahead" so if you do not use all the input, the Handle -- will probably be left at an undefined position in the file. -- -- See also 'asyncGzipFromFile' asyncGzipFrom :: forall m. (MonadIO m, MonadResource m, MonadUnliftIO m) => Handle -> C.ConduitT () B.ByteString m () asyncGzipFrom h = do let prod q = liftIO $ do C.runConduit $ C.sourceHandle h .| CZ.multiple CZ.ungzip .| CL.map Just .| CA.sinkTBQueue q atomically (TQ.writeTBQueue q Nothing) (CA.gatherFrom 8 prod .| untilNothing) `C.catchC` (\(e :: SZ.ZlibException) -> liftIO . ioError $ mkIOError userErrorType ("Error reading gzip file: "++displayException e) (Just h) Nothing) -- | Open and read a gzip file with the uncompression being performed in a -- separate thread. -- -- See also 'asyncGzipFrom' asyncGzipFromFile :: forall m. (MonadResource m, MonadUnliftIO m) => FilePath -> C.ConduitT () B.ByteString m () asyncGzipFromFile fname = C.bracketP (openFile fname ReadMode) hClose asyncGzipFrom -- | A simple sink which performs bzip2 compression in a separate thread and -- writes the results to `h`. -- -- See also 'asyncBzip2ToFile' asyncBzip2To :: forall m. (MonadIO m, MonadResource m, MonadUnliftIO m) => Handle -> C.ConduitT B.ByteString C.Void m () asyncBzip2To h = do let drain q = C.runConduit $ CA.sourceTBQueue q .| untilNothing .| CL.map (B.concat . reverse) .| CZ.bzip2 .| C.sinkHandle h bsConcatTo ((2 :: Int) ^ (15 :: Int)) .| CA.drainTo 8 drain -- | Compresses the output and writes to the given file with compression being -- performed in a separate thread. -- -- See also 'asyncBzip2To' asyncBzip2ToFile :: forall m. (MonadResource m, MonadUnliftIO m) => FilePath -> C.ConduitT B.ByteString C.Void m () asyncBzip2ToFile fname = C.bracketP (openFile fname WriteMode) hClose asyncBzip2To -- | A source which produces the bzipped2 content from the the given handle. -- Note that this "reads ahead" so if you do not use all the input, the Handle -- will probably be left at an undefined position in the file. -- -- See also 'asyncBzip2FromFile' asyncBzip2From :: forall m. (MonadIO m, MonadResource m, MonadUnliftIO m) => Handle -> C.ConduitT () B.ByteString m () asyncBzip2From h = do let prod q = do C.runConduit $ C.sourceHandle h .| CZ.multiple CZ.bunzip2 .| CL.map Just .| CA.sinkTBQueue q liftIO $ atomically (TQ.writeTBQueue q Nothing) CA.gatherFrom 8 prod .| untilNothing -- | Open and read a bzip2 file with the uncompression being performed in a -- separate thread. -- -- See also 'asyncBzip2From' asyncBzip2FromFile :: forall m. (MonadResource m, MonadUnliftIO m) => FilePath -> C.ConduitT () B.ByteString m () asyncBzip2FromFile fname = C.bracketP (openFile fname ReadMode) hClose asyncBzip2From -- | A simple sink which performs lzma/xz compression in a separate thread and -- writes the results to `h`. -- -- See also 'asyncXzToFile' asyncXzTo :: forall m. (MonadIO m, MonadResource m, MonadUnliftIO m) => Handle -> C.ConduitT B.ByteString C.Void m () asyncXzTo h = do let drain q = C.runConduit $ CA.sourceTBQueue q .| untilNothing .| CL.map (B.concat . reverse) .| CX.compress Nothing .| C.sinkHandle h bsConcatTo ((2 :: Int) ^ (15 :: Int)) .| CA.drainTo 8 drain -- | Compresses the output and writes to the given file with compression being -- performed in a separate thread. -- -- See also 'asyncXzTo' asyncXzToFile :: forall m. (MonadResource m, MonadUnliftIO m) => FilePath -> C.ConduitT B.ByteString C.Void m () asyncXzToFile fname = C.bracketP (openFile fname WriteMode) hClose asyncXzTo -- | A source which produces the unxzipped content from the the given handle. -- Note that this "reads ahead" so if you do not use all the input, the Handle -- will probably be left at an undefined position in the file. -- -- See also 'asyncXzFromFile' asyncXzFrom :: forall m. (MonadIO m, MonadResource m, MonadUnliftIO m, MonadThrow m) => Handle -> C.ConduitT () B.ByteString m () asyncXzFrom h = do let oneGBmembuffer = Just $ 1024 ^ (3 :: Integer) prod q = do C.runConduit $ C.sourceHandle h .| CZ.multiple (CX.decompress oneGBmembuffer) .| CL.map Just .| CA.sinkTBQueue q liftIO $ atomically (TQ.writeTBQueue q Nothing) CA.gatherFrom 8 prod .| untilNothing -- | Open and read a lzma/xz file with the uncompression being performed in a -- separate thread. -- -- See also 'asyncXzFrom' asyncXzFromFile :: forall m. (MonadResource m, MonadUnliftIO m, MonadThrow m) => FilePath -> C.ConduitT () B.ByteString m () asyncXzFromFile fname = C.bracketP (openFile fname ReadMode) hClose asyncXzFrom -- | If the filename indicates a supported compressed file (gzip, xz, and, on -- Unix, bzip2), then it reads it and uncompresses it. -- -- Usage -- -- @ -- -- withPossiblyCompressedFile fname $ \src -> -- src .| mySink -- @ -- -- Unlike 'conduitPossiblyCompressedFile', this ensures that the file is closed -- even if the conduit terminates early. -- -- On Windows, attempting to read from a bzip2 file, results in 'error'. withPossiblyCompressedFile :: (MonadUnliftIO m, MonadResource m, MonadThrow m) => FilePath -> (C.ConduitT () B.ByteString m () -> m a) -> m a withPossiblyCompressedFile fname inner = withRunInIO $ \run -> do IO.withBinaryFile fname IO.ReadMode $ run . inner . withPossiblyCompressedFile' fname withPossiblyCompressedFile' :: (MonadUnliftIO m, MonadResource m, MonadThrow m) => FilePath -> Handle -> C.ConduitT () B.ByteString m () withPossiblyCompressedFile' fname | ".gz" `isSuffixOf` fname = asyncGzipFrom | ".xz" `isSuffixOf` fname = asyncXzFrom | ".bz2" `isSuffixOf` fname = asyncBzip2From | otherwise = C.sourceHandle -- | If the filename indicates a gzipped file (or, on Unix, also a bz2 file), -- then it reads it and uncompresses it. -- -- -- To ensure that the file is closed even if the downstream finishes early, -- consider using 'withPossiblyCompressedFile'. -- -- On Windows, attempting to read from a bzip2 file, results in 'error'. conduitPossiblyCompressedFile :: (MonadUnliftIO m, MonadResource m, MonadThrow m) => FilePath -> C.ConduitT () B.ByteString m () conduitPossiblyCompressedFile fname | ".gz" `isSuffixOf` fname = asyncGzipFromFile fname | ".xz" `isSuffixOf` fname = asyncXzFromFile fname | ".bz2" `isSuffixOf` fname = asyncBzip2FromFile fname | otherwise = C.sourceFile fname -- | If the filename indicates a gzipped file (or, on Unix, also a bz2 file), -- then it compresses and write with the algorithm matching the filename -- -- On Windows, attempting to write to a bzip2 file, results in 'error'. conduitPossiblyCompressedToFile :: (MonadUnliftIO m, MonadResource m) => FilePath -> C.ConduitT B.ByteString C.Void m () conduitPossiblyCompressedToFile fname | ".gz" `isSuffixOf` fname = asyncGzipToFile fname | ".xz" `isSuffixOf` fname = asyncXzToFile fname | ".bz2" `isSuffixOf` fname = asyncBzip2ToFile fname | otherwise = C.sinkFile fname