{-|
Module      : Z.IO.BIO
Description : Composable IO Loops
Copyright   : (c) Dong Han, 2017-2020
License     : BSD
Maintainer  : winterland1989@gmail.com
Stability   : experimental
Portability : non-portable

This module provides 'BIO' (block IO) type to facilitate writing streaming programs. A 'BIO' node usually:

  * Process input in unit of block(or item).
  * Running in constant spaces, which means the memory usage won't accumulate.
  * Keep some state in IO, which is sealed in 'BIO' closure.

Some examples of such nodes are:

  * Compressor \/ decompressor, e.g. zlib, etc.
  * Codec, e.g. utf8 codec, base64 codec.
  * Ciphers.
  * Packet parsers.

We use @BIO inp out@ type to represent all the objects above, @BIO Void out@ to represent an 'IO' source,
and @BIO inp Void@ to represent an 'IO' sink, which can all be connected with '>|>' to build a larger 'BIO' node.

@
import Z.Data.CBytes    (CBytes)
import Z.IO
import Z.IO.BIO
import Z.IO.BIO.Zlib

base64AndCompressFile :: HasCallStack => CBytes -> CBytes -> IO ()
base64AndCompressFile origin target = do
    base64Enc <- newBase64Encoder
    (_, zlibCompressor) <- newCompress defaultCompressConfig{compressWindowBits = 31}

    withResource (initSourceFromFile origin) $ \ src ->
        withResource (initSinkToFile target) $ \ sink ->
            runBIO_ $ src . base64Enc . zlibCompressor . sink

> base64AndCompressFile "test" "test.gz"
-- run 'zcat "test.gz" | base64 -d' will give you original file
@

This module is intended to be imported qualified:
@
import           Z.IO.BIO (BIO, Source, Sink)
import qualified Z.IO.BIO as BIO
@
-}
module Z.IO.BIO (
  -- * The BIO type
    BIO, pattern EOF, Source, Sink
  -- ** Basic combinators
  , appendSource, concatSource, concatSource'
  , joinSink, fuseSink
  -- * Run BIO chain
  , discard
  , stepBIO, stepBIO_
  , runBIO, runBIO_
  , runBlock, runBlock_, unsafeRunBlock
  , runBlocks, runBlocks_, unsafeRunBlocks
  -- * Make new BIO
  , pureBIO, ioBIO
  , filter, filterM
  -- ** Source
  , initSourceFromFile
  , initSourceFromFile'
  , sourceFromIO
  , sourceFromList
  , sourceFromBuffered
  , sourceTextFromBuffered
  , sourceJSONFromBuffered
  , sourceParserFromBuffered
  , sourceParseChunkFromBuffered
  -- ** Sink
  , sinkToIO
  , sinkToList
  , initSinkToFile
  , sinkToBuffered
  , sinkBuilderToBuffered
  -- ** Bytes specific
  , newReChunk
  , newUTF8Decoder
  , newParserNode, newMagicSplitter, newLineSplitter
  , newBase64Encoder, newBase64Decoder
  , hexEncoder, newHexDecoder
  -- ** Generic BIO
  , counterNode
  , seqNumNode
  , newGroupingNode
  , ungroupingNode
  , consumedNode
  ) where

import           Prelude                hiding (filter)
import           Control.Concurrent.MVar
import           Control.Monad          hiding  (filterM)
import           Control.Monad.IO.Class
import           Data.Bits              ((.|.))
import           Data.IORef
import qualified Data.List              as List
import           Data.Void
import           Data.Word
import           System.IO.Unsafe       (unsafePerformIO)
import qualified Z.Data.Array           as A
import qualified Z.Data.Builder         as B
import           Z.Data.CBytes          (CBytes)
import qualified Z.Data.JSON            as JSON
import qualified Z.Data.Parser          as P
import           Z.Data.PrimRef
import qualified Z.Data.Text            as T
import qualified Z.Data.Text.UTF8Codec  as T
import qualified Z.Data.Vector          as V
import qualified Z.Data.Vector.Base     as V
import           Z.Data.Vector.Base64
import           Z.Data.Vector.Hex
import           Z.IO.Buffered
import           Z.IO.Exception
import qualified Z.IO.FileSystem.Base   as FS
import           Z.IO.Resource

-- | A 'BIO'(blocked IO) node.
--
-- A 'BIO' node is a push based stream transformer. It can be used to describe different kinds of IO
-- devices:
--
--  * @BIO inp out@ describe an IO state machine(e.g. z_stream in zlib),
--    which takes some input in block, then outputs.
--  * @type Source out = BIO Void out@ described an IO source, which never takes input,
--    but gives output until EOF by looping.
--  * @type Sink inp = BIO inp Void@ described an IO sink, which takes input and perform some IO effects,
--    such as writing to terminal or files.
--
-- You can connect these 'BIO' nodes with '>|>', which connect left node's output to right node's input,
-- and return a new 'BIO' node with left node's input type and right node's output type.
--
-- You can run a 'BIO' node in different ways:
--
--   * 'stepBIO'\/'stepBIO_' to supply a single chunk of input and step the BIO node.
--   * 'runBIO'\/'runBIO_' will supply EOF directly, which will effectively pull all values from source,
--     and push to sink until source reaches EOF.
--   * 'runBlock'\/'runBlock_' will supply a single block of input as whole input and run the BIO node.
--   * 'runBlocks'\/'runBlocks_' will supply a list of blocks as whole input and run the BIO node.
--
-- Note 'BIO' usually contains some IO states, you can consider it as an opaque 'IORef':
--
--   * You shouldn't use a 'BIO' node across multiple 'BIO' chain unless the state can be reset.
--   * You shouldn't use a 'BIO' node across multiple threads unless document states otherwise.
--
-- 'BIO' is simply a convenient way to construct single-thread streaming computation, to use 'BIO'
-- in multiple threads, check "Z.IO.BIO.Concurrent" module.
--
type BIO inp out = (Maybe out -> IO ())     -- ^ Pass 'EOF' to indicate current node reaches EOF
                -> Maybe inp                -- ^ 'EOF' indicates upstream reaches EOF
                -> IO ()

-- | Patterns for more meaningful pattern matching.
pattern EOF :: Maybe a
pattern $bEOF :: Maybe a
$mEOF :: forall r a. Maybe a -> (Void# -> r) -> (Void# -> r) -> r
EOF = Nothing

-- | Type alias for 'BIO' node which never takes input.
--
-- Note when implement a 'Source', you should assume 'EOF' argument is supplied only once, and you
-- should loop to call downstream continuation with all available chunks, then write a final 'EOF'
-- to indicate EOF.
type Source x = BIO Void x

-- | Type alias for 'BIO' node which only takes input and perform effects.
--
-- Note when implement a 'Sink', you should assume 'EOF' argument is supplied only once(when upstream
-- reaches EOF), you do not need to call downstream continuation before EOF, and
-- do a flush(also write a final 'EOF') when upstream reach EOF.
type Sink x = BIO x Void

-- | Connect two 'BIO' source, after first reach EOF, draw elements from second.
appendSource :: HasCallStack => Source a -> Source a -> Source a
{-# INLINE appendSource #-}
Source a
b1 appendSource :: Source a -> Source a -> Source a
`appendSource` Source a
b2 = \ Maybe a -> IO ()
k Maybe Void
_ ->
    Source a
b1 (\ Maybe a
y ->
        case Maybe a
y of Just a
_ -> Maybe a -> IO ()
k Maybe a
y
                  Maybe a
_      -> Source a
b2 Maybe a -> IO ()
k Maybe Void
forall a. Maybe a
EOF) Maybe Void
forall a. Maybe a
EOF

-- | Fuse two 'BIO' sinks, i.e. everything written to the fused sink will be written to left and right sink.
--
-- Flush result 'BIO' will effectively flush both sink.
joinSink :: HasCallStack => Sink out -> Sink out -> Sink out
{-# INLINE joinSink #-}
Sink out
b1 joinSink :: Sink out -> Sink out -> Sink out
`joinSink` Sink out
b2 = \ Maybe Void -> IO ()
k Maybe out
mx ->
    case Maybe out
mx of
        Just out
_ -> do
            Sink out
b1 Maybe Void -> IO ()
forall a. a -> IO ()
discard Maybe out
mx
            Sink out
b2 Maybe Void -> IO ()
forall a. a -> IO ()
discard Maybe out
mx
        Maybe out
_ -> do
            Sink out
b1 Maybe Void -> IO ()
forall a. a -> IO ()
discard Maybe out
forall a. Maybe a
EOF
            Sink out
b2 Maybe Void -> IO ()
forall a. a -> IO ()
discard Maybe out
forall a. Maybe a
EOF
            Maybe Void -> IO ()
k Maybe Void
forall a. Maybe a
EOF

-- | Fuse a list of 'BIO' sinks, everything written to the fused sink will be written to every sink in the list.
--
-- Flush result 'BIO' will effectively flush every sink in the list.
fuseSink :: HasCallStack => [Sink out] -> Sink out
{-# INLINABLE fuseSink #-}
fuseSink :: [Sink out] -> Sink out
fuseSink [Sink out]
ss = \ Maybe Void -> IO ()
k Maybe out
mx ->
    case Maybe out
mx of
        Just out
_ -> (Sink out -> IO ()) -> [Sink out] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\ Sink out
s -> Sink out
s Maybe Void -> IO ()
forall a. a -> IO ()
discard Maybe out
mx) [Sink out]
ss
        Maybe out
_ -> do
            (Sink out -> IO ()) -> [Sink out] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\ Sink out
s -> Sink out
s Maybe Void -> IO ()
forall a. a -> IO ()
discard Maybe out
mx) [Sink out]
ss
            Maybe Void -> IO ()
k Maybe Void
forall a. Maybe a
EOF

-- | Connect list of 'BIO' sources, after one reach EOF, draw element from next.
concatSource :: HasCallStack => [Source a] -> Source a
{-# INLINABLE concatSource #-}
concatSource :: [Source a] -> Source a
concatSource = (Source a -> Source a -> Source a)
-> Source a -> [Source a] -> Source a
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl' Source a -> Source a -> Source a
forall a. HasCallStack => Source a -> Source a -> Source a
appendSource Source a
forall a. Source a
emptySource

-- | A 'Source' directly write EOF to downstream.
emptySource :: Source a
{-# INLINABLE emptySource #-}
emptySource :: Source a
emptySource = \ Maybe a -> IO ()
k Maybe Void
_ -> Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF

-- | Connect list of 'BIO' sources, after one reach EOF, draw element from next.
concatSource' :: HasCallStack => Source (Source a) -> Source a
{-# INLINABLE concatSource' #-}
concatSource' :: Source (Source a) -> Source a
concatSource' Source (Source a)
ssrc = \ Maybe a -> IO ()
k Maybe Void
_ -> Source (Source a)
ssrc (\ Maybe (Source a)
msrc ->
    case Maybe (Source a)
msrc of
        Just Source a
src -> Source a
src (\ Maybe a
mx ->
            case Maybe a
mx of Just a
_ -> Maybe a -> IO ()
k Maybe a
mx
                       Maybe a
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) Maybe Void
forall a. Maybe a
EOF
        Maybe (Source a)
_ -> Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF) Maybe Void
forall a. Maybe a
EOF

-------------------------------------------------------------------------------
-- Run BIO

-- | Discards a value.
discard :: a -> IO ()
{-# INLINABLE discard #-}
discard :: a -> IO ()
discard a
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Supply a single chunk of input to a 'BIO' and collect result.
stepBIO :: HasCallStack => BIO inp out -> inp -> IO [out]
{-# INLINABLE stepBIO #-}
stepBIO :: BIO inp out -> inp -> IO [out]
stepBIO BIO inp out
bio inp
inp = do
    IORef [out]
accRef <- [out] -> IO (IORef [out])
forall a. a -> IO (IORef a)
newIORef []
    BIO inp out
bio ((out -> IO ()) -> Maybe out -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_  ((out -> IO ()) -> Maybe out -> IO ())
-> (out -> IO ()) -> Maybe out -> IO ()
forall a b. (a -> b) -> a -> b
$ \ out
x -> IORef [out] -> ([out] -> [out]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [out]
accRef (out
xout -> [out] -> [out]
forall a. a -> [a] -> [a]
:)) (inp -> Maybe inp
forall a. a -> Maybe a
Just inp
inp)
    [out] -> [out]
forall a. [a] -> [a]
reverse ([out] -> [out]) -> IO [out] -> IO [out]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [out] -> IO [out]
forall a. IORef a -> IO a
readIORef IORef [out]
accRef

-- | Supply a single chunk of input to a 'BIO' without collecting result.
stepBIO_ :: HasCallStack => BIO inp out -> inp -> IO ()
{-# INLINABLE stepBIO_ #-}
stepBIO_ :: BIO inp out -> inp -> IO ()
stepBIO_ BIO inp out
bio = BIO inp out
bio Maybe out -> IO ()
forall a. a -> IO ()
discard (Maybe inp -> IO ()) -> (inp -> Maybe inp) -> inp -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. inp -> Maybe inp
forall a. a -> Maybe a
Just

-- | Run a 'BIO' loop without providing input.
--
-- When used on 'Source', it starts the streaming loop.
-- When used on 'Sink', it performs a flush.
runBIO_ :: HasCallStack => BIO inp out -> IO ()
{-# INLINABLE runBIO_ #-}
runBIO_ :: BIO inp out -> IO ()
runBIO_ BIO inp out
bio = BIO inp out
bio Maybe out -> IO ()
forall a. a -> IO ()
discard Maybe inp
forall a. Maybe a
EOF

-- | Run a 'BIO' loop without providing input, and collect result.
--
-- When used on 'Source', it will collect all input chunks.
runBIO :: HasCallStack => BIO inp out -> IO [out]
{-# INLINABLE runBIO #-}
runBIO :: BIO inp out -> IO [out]
runBIO BIO inp out
bio = do
    IORef [out]
accRef <- [out] -> IO (IORef [out])
forall a. a -> IO (IORef a)
newIORef []
    BIO inp out
bio ((out -> IO ()) -> Maybe out -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_  ((out -> IO ()) -> Maybe out -> IO ())
-> (out -> IO ()) -> Maybe out -> IO ()
forall a b. (a -> b) -> a -> b
$ \ out
x -> IORef [out] -> ([out] -> [out]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [out]
accRef (out
xout -> [out] -> [out]
forall a. a -> [a] -> [a]
:)) Maybe inp
forall a. Maybe a
EOF
    [out] -> [out]
forall a. [a] -> [a]
reverse ([out] -> [out]) -> IO [out] -> IO [out]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [out] -> IO [out]
forall a. IORef a -> IO a
readIORef IORef [out]
accRef

-- | Run a 'BIO' loop with a single chunk of input and EOF, and collect result.
--
runBlock :: HasCallStack => BIO inp out -> inp -> IO [out]
{-# INLINABLE runBlock #-}
runBlock :: BIO inp out -> inp -> IO [out]
runBlock BIO inp out
bio inp
inp = do
    IORef [out]
accRef <- [out] -> IO (IORef [out])
forall a. a -> IO (IORef a)
newIORef []
    BIO inp out
bio ((out -> IO ()) -> Maybe out -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_  ((out -> IO ()) -> Maybe out -> IO ())
-> (out -> IO ()) -> Maybe out -> IO ()
forall a b. (a -> b) -> a -> b
$ \ out
x -> IORef [out] -> ([out] -> [out]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [out]
accRef (out
xout -> [out] -> [out]
forall a. a -> [a] -> [a]
:)) (inp -> Maybe inp
forall a. a -> Maybe a
Just inp
inp)
    BIO inp out
bio ((out -> IO ()) -> Maybe out -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_  ((out -> IO ()) -> Maybe out -> IO ())
-> (out -> IO ()) -> Maybe out -> IO ()
forall a b. (a -> b) -> a -> b
$ \ out
x -> IORef [out] -> ([out] -> [out]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [out]
accRef (out
xout -> [out] -> [out]
forall a. a -> [a] -> [a]
:)) Maybe inp
forall a. Maybe a
EOF
    [out] -> [out]
forall a. [a] -> [a]
reverse ([out] -> [out]) -> IO [out] -> IO [out]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [out] -> IO [out]
forall a. IORef a -> IO a
readIORef IORef [out]
accRef

-- | Run a 'BIO' loop with a single chunk of input and EOF, without collecting result.
--
runBlock_ :: HasCallStack => BIO inp out -> inp -> IO ()
{-# INLINABLE runBlock_ #-}
runBlock_ :: BIO inp out -> inp -> IO ()
runBlock_ BIO inp out
bio inp
inp = do
    BIO inp out
bio Maybe out -> IO ()
forall a. a -> IO ()
discard (inp -> Maybe inp
forall a. a -> Maybe a
Just inp
inp)
    BIO inp out
bio Maybe out -> IO ()
forall a. a -> IO ()
discard Maybe inp
forall a. Maybe a
EOF

-- | Wrap 'runBlock' into a pure interface.
--
-- You can wrap a stateful BIO computation(including the creation of 'BIO' node),
-- when you can guarantee a computation is pure, e.g. compressing, decoding, etc.
unsafeRunBlock :: HasCallStack => IO (BIO inp out) -> inp -> [out]
{-# INLINABLE unsafeRunBlock #-}
unsafeRunBlock :: IO (BIO inp out) -> inp -> [out]
unsafeRunBlock IO (BIO inp out)
new inp
inp = IO [out] -> [out]
forall a. IO a -> a
unsafePerformIO (IO (BIO inp out)
new IO (BIO inp out) -> (BIO inp out -> IO [out]) -> IO [out]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ BIO inp out
bio -> BIO inp out -> inp -> IO [out]
forall inp out. HasCallStack => BIO inp out -> inp -> IO [out]
runBlock BIO inp out
bio inp
inp)

-- | Supply blocks of input and EOF to a 'BIO', and collect results.
--
-- Note many 'BIO' node will be closed or not be able to take new input after drained.
runBlocks :: HasCallStack => BIO inp out -> [inp] -> IO [out]
{-# INLINABLE runBlocks #-}
runBlocks :: BIO inp out -> [inp] -> IO [out]
runBlocks BIO inp out
bio [inp]
inps = do
    IORef [out]
accRef <- [out] -> IO (IORef [out])
forall a. a -> IO (IORef a)
newIORef []
    [inp] -> (inp -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [inp]
inps ((inp -> IO ()) -> IO ()) -> (inp -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ BIO inp out
bio ((out -> IO ()) -> Maybe out -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_  ((out -> IO ()) -> Maybe out -> IO ())
-> (out -> IO ()) -> Maybe out -> IO ()
forall a b. (a -> b) -> a -> b
$ \ out
x -> IORef [out] -> ([out] -> [out]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [out]
accRef (out
xout -> [out] -> [out]
forall a. a -> [a] -> [a]
:)) (Maybe inp -> IO ()) -> (inp -> Maybe inp) -> inp -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. inp -> Maybe inp
forall a. a -> Maybe a
Just
    BIO inp out
bio ((out -> IO ()) -> Maybe out -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_  ((out -> IO ()) -> Maybe out -> IO ())
-> (out -> IO ()) -> Maybe out -> IO ()
forall a b. (a -> b) -> a -> b
$ \ out
x -> IORef [out] -> ([out] -> [out]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [out]
accRef (out
xout -> [out] -> [out]
forall a. a -> [a] -> [a]
:)) Maybe inp
forall a. Maybe a
EOF
    [out] -> [out]
forall a. [a] -> [a]
reverse ([out] -> [out]) -> IO [out] -> IO [out]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [out] -> IO [out]
forall a. IORef a -> IO a
readIORef IORef [out]
accRef

-- | Supply blocks of input and EOF to a 'BIO', without collecting results.
--
-- Note many 'BIO' node will be closed or not be able to take new input after drained.
runBlocks_ :: HasCallStack => BIO inp out -> [inp] -> IO ()
{-# INLINABLE runBlocks_ #-}
runBlocks_ :: BIO inp out -> [inp] -> IO ()
runBlocks_ BIO inp out
bio [inp]
inps = do
    [inp] -> (inp -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [inp]
inps ((inp -> IO ()) -> IO ()) -> (inp -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ BIO inp out
bio Maybe out -> IO ()
forall a. a -> IO ()
discard (Maybe inp -> IO ()) -> (inp -> Maybe inp) -> inp -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. inp -> Maybe inp
forall a. a -> Maybe a
Just
    BIO inp out
bio Maybe out -> IO ()
forall a. a -> IO ()
discard Maybe inp
forall a. Maybe a
EOF

-- | Wrap 'runBlocks' into a pure interface.
--
-- Similar to 'unsafeRunBlock', but with a list of input blocks.
unsafeRunBlocks :: HasCallStack => IO (BIO inp out) -> [inp] -> [out]
{-# INLINABLE unsafeRunBlocks #-}
unsafeRunBlocks :: IO (BIO inp out) -> [inp] -> [out]
unsafeRunBlocks IO (BIO inp out)
new [inp]
inps = IO [out] -> [out]
forall a. IO a -> a
unsafePerformIO (IO (BIO inp out)
new IO (BIO inp out) -> (BIO inp out -> IO [out]) -> IO [out]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ BIO inp out
bio -> BIO inp out -> [inp] -> IO [out]
forall inp out. HasCallStack => BIO inp out -> [inp] -> IO [out]
runBlocks BIO inp out
bio [inp]
inps)

-------------------------------------------------------------------------------
-- Source

-- | Source a list(or any 'Foldable') from memory.
--
sourceFromList :: Foldable f => f a -> Source a
sourceFromList :: f a -> Source a
sourceFromList f a
xs0 = \ Maybe a -> IO ()
k Maybe Void
_ -> do
    (a -> IO ()) -> f a -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Maybe a -> IO ()
k (Maybe a -> IO ()) -> (a -> Maybe a) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just) f a
xs0
    Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF

-- | Turn a 'BufferedInput' into 'BIO' source, map EOF to EOF.
--
sourceFromBuffered :: HasCallStack => BufferedInput -> Source V.Bytes
{-# INLINABLE sourceFromBuffered #-}
sourceFromBuffered :: BufferedInput -> Source Bytes
sourceFromBuffered BufferedInput
i = \ Maybe Bytes -> IO ()
k Maybe Void
_ ->
    let loop :: IO ()
loop = HasCallStack => BufferedInput -> IO Bytes
BufferedInput -> IO Bytes
readBuffer BufferedInput
i IO Bytes -> (Bytes -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ Bytes
x ->
            if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
x then Maybe Bytes -> IO ()
k Maybe Bytes
forall a. Maybe a
EOF else Maybe Bytes -> IO ()
k (Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just Bytes
x) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
    in IO ()
loop

-- | Turn a `IO` action into 'Source'
sourceFromIO :: HasCallStack => IO (Maybe a) -> Source a
{-# INLINABLE sourceFromIO #-}
sourceFromIO :: IO (Maybe a) -> Source a
sourceFromIO IO (Maybe a)
io = \ Maybe a -> IO ()
k Maybe Void
_ ->
    let loop :: IO ()
loop = IO (Maybe a)
io IO (Maybe a) -> (Maybe a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ Maybe a
x ->
            case Maybe a
x of
                Just a
_ -> Maybe a -> IO ()
k Maybe a
x IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
                Maybe a
_      -> Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF
    in IO ()
loop

-- | Turn a UTF8 encoded 'BufferedInput' into 'BIO' source, map EOF to EOF.
--
sourceTextFromBuffered :: HasCallStack => BufferedInput -> Source T.Text
{-# INLINABLE sourceTextFromBuffered #-}
sourceTextFromBuffered :: BufferedInput -> Source Text
sourceTextFromBuffered BufferedInput
i = \ Maybe Text -> IO ()
k Maybe Void
_ ->
    let loop :: IO ()
loop = HasCallStack => BufferedInput -> IO Text
BufferedInput -> IO Text
readBufferText BufferedInput
i IO Text -> (Text -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ Text
x ->
            if Text -> Bool
T.null Text
x then Maybe Text -> IO ()
k Maybe Text
forall a. Maybe a
EOF else Maybe Text -> IO ()
k (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
x) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
    in IO ()
loop

-- | Turn a 'JSON' encoded 'BufferedInput' into 'BIO' source, ignoring any
-- whitespaces bewteen JSON objects. If EOF reached, then return 'EOF'.
-- Throw 'OtherError' with name "EJSON" if JSON value is not parsed or converted.
--
sourceJSONFromBuffered :: forall a. (JSON.JSON a, HasCallStack) => BufferedInput -> Source a
{-# INLINABLE sourceJSONFromBuffered #-}
sourceJSONFromBuffered :: BufferedInput -> Source a
sourceJSONFromBuffered = (Bytes -> Result DecodeError a) -> BufferedInput -> Source a
forall e a.
(HasCallStack, Print e) =>
(Bytes -> Result e a) -> BufferedInput -> Source a
sourceParseChunkFromBuffered Bytes -> Result DecodeError a
forall a. JSON a => Bytes -> Result DecodeError a
JSON.decodeChunk

-- | Turn buffered input device into a packet source, throw 'OtherError' with name @EPARSE@ if parsing fail.
sourceParserFromBuffered :: HasCallStack => P.Parser a -> BufferedInput -> Source a
{-# INLINABLE sourceParserFromBuffered #-}
sourceParserFromBuffered :: Parser a -> BufferedInput -> Source a
sourceParserFromBuffered Parser a
p = (Bytes -> Result ParseError a) -> BufferedInput -> Source a
forall e a.
(HasCallStack, Print e) =>
(Bytes -> Result e a) -> BufferedInput -> Source a
sourceParseChunkFromBuffered (Parser a -> Bytes -> Result ParseError a
forall a. Parser a -> Bytes -> Result ParseError a
P.parseChunk Parser a
p)

-- | Turn buffered input device into a packet source, throw 'OtherError' with name @EPARSE@ if parsing fail.
sourceParseChunkFromBuffered :: (HasCallStack, T.Print e)
                              => (V.Bytes -> P.Result e a) -> BufferedInput -> Source a
{-# INLINABLE sourceParseChunkFromBuffered #-}
sourceParseChunkFromBuffered :: (Bytes -> Result e a) -> BufferedInput -> Source a
sourceParseChunkFromBuffered Bytes -> Result e a
pc BufferedInput
bi = \ Maybe a -> IO ()
k Maybe Void
_ ->
    let loopA :: IO ()
loopA = do
            Bytes
bs <- HasCallStack => BufferedInput -> IO Bytes
BufferedInput -> IO Bytes
readBuffer BufferedInput
bi
            if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
bs
            then Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF
            else Bytes -> IO ()
loopB Bytes
bs
        loopB :: Bytes -> IO ()
loopB Bytes
bs = do
            (Bytes
rest, Either e a
r) <- (Bytes -> Result e a) -> ParseChunks IO e a
forall (m :: * -> *) e a.
Monad m =>
(Bytes -> Result e a) -> ParseChunks m e a
P.parseChunks Bytes -> Result e a
pc (HasCallStack => BufferedInput -> IO Bytes
BufferedInput -> IO Bytes
readBuffer BufferedInput
bi) Bytes
bs
            case Either e a
r of Right a
v -> Maybe a -> IO ()
k (a -> Maybe a
forall a. a -> Maybe a
Just a
v)
                      Left e
e  -> Text -> Text -> IO ()
forall a. HasCallStack => Text -> Text -> IO a
throwOtherError Text
"EPARSE" (e -> Text
forall a. Print a => a -> Text
T.toText e
e)
            if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
rest
            then IO ()
loopA
            else Bytes -> IO ()
loopB Bytes
rest
    in IO ()
loopA

-- | Turn a file into a 'V.Bytes' source.
initSourceFromFile :: HasCallStack => CBytes -> Resource (Source V.Bytes)
{-# INLINABLE initSourceFromFile #-}
initSourceFromFile :: CBytes -> Resource (Source Bytes)
initSourceFromFile CBytes
p = do
    File
f <- HasCallStack => CBytes -> FileFlag -> FileFlag -> Resource File
CBytes -> FileFlag -> FileFlag -> Resource File
FS.initFile CBytes
p FileFlag
FS.O_RDONLY FileFlag
FS.DEFAULT_FILE_MODE
    IO (Source Bytes) -> Resource (Source Bytes)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (HasCallStack => BufferedInput -> Source Bytes
BufferedInput -> Source Bytes
sourceFromBuffered (BufferedInput -> Source Bytes)
-> IO BufferedInput -> IO (Source Bytes)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> File -> IO BufferedInput
forall i. Input i => i -> IO BufferedInput
newBufferedInput File
f)

-- | Turn a file into a 'V.Bytes' source with given chunk size.
initSourceFromFile' :: HasCallStack => CBytes -> Int -> Resource (Source V.Bytes)
{-# INLINABLE initSourceFromFile' #-}
initSourceFromFile' :: CBytes -> Int -> Resource (Source Bytes)
initSourceFromFile' CBytes
p Int
bufSiz = do
    File
f <- HasCallStack => CBytes -> FileFlag -> FileFlag -> Resource File
CBytes -> FileFlag -> FileFlag -> Resource File
FS.initFile CBytes
p FileFlag
FS.O_RDONLY FileFlag
FS.DEFAULT_FILE_MODE
    IO (Source Bytes) -> Resource (Source Bytes)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (HasCallStack => BufferedInput -> Source Bytes
BufferedInput -> Source Bytes
sourceFromBuffered (BufferedInput -> Source Bytes)
-> IO BufferedInput -> IO (Source Bytes)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> File -> IO BufferedInput
forall i. Input i => Int -> i -> IO BufferedInput
newBufferedInput' Int
bufSiz File
f)

--------------------------------------------------------------------------------
-- Sink

-- | Turn a 'BufferedOutput' into a 'V.Bytes' sink.
sinkToBuffered :: HasCallStack => BufferedOutput -> Sink V.Bytes
{-# INLINABLE sinkToBuffered #-}
sinkToBuffered :: BufferedOutput -> Sink Bytes
sinkToBuffered BufferedOutput
bo = \ Maybe Void -> IO ()
k Maybe Bytes
mbs ->
    case Maybe Bytes
mbs of
        Just Bytes
bs -> HasCallStack => BufferedOutput -> Bytes -> IO ()
BufferedOutput -> Bytes -> IO ()
writeBuffer BufferedOutput
bo Bytes
bs
        Maybe Bytes
_       -> HasCallStack => BufferedOutput -> IO ()
BufferedOutput -> IO ()
flushBuffer BufferedOutput
bo IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO ()
k Maybe Void
forall a. Maybe a
EOF

-- | Turn a 'BufferedOutput' into a 'B.Builder' sink.
--
sinkBuilderToBuffered :: HasCallStack => BufferedOutput -> Sink (B.Builder a)
{-# INLINABLE sinkBuilderToBuffered #-}
sinkBuilderToBuffered :: BufferedOutput -> Sink (Builder a)
sinkBuilderToBuffered BufferedOutput
bo = \ Maybe Void -> IO ()
k Maybe (Builder a)
mbs ->
    case Maybe (Builder a)
mbs of
        Just Builder a
bs -> BufferedOutput -> Builder a -> IO ()
forall a. HasCallStack => BufferedOutput -> Builder a -> IO ()
writeBuilder BufferedOutput
bo Builder a
bs
        Maybe (Builder a)
_       -> HasCallStack => BufferedOutput -> IO ()
BufferedOutput -> IO ()
flushBuffer BufferedOutput
bo IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO ()
k Maybe Void
forall a. Maybe a
EOF

-- | Turn a file into a 'V.Bytes' sink.
--
-- Note the file will be opened in @'FS.O_APPEND' .|. 'FS.O_CREAT' .|. 'FS.O_WRONLY'@ mode,
-- bytes will be written after the end of the original file if there'are old bytes.
initSinkToFile :: HasCallStack => CBytes -> Resource (Sink V.Bytes)
{-# INLINABLE initSinkToFile #-}
initSinkToFile :: CBytes -> Resource (Sink Bytes)
initSinkToFile CBytes
p = do
    File
f <- HasCallStack => CBytes -> FileFlag -> FileFlag -> Resource File
CBytes -> FileFlag -> FileFlag -> Resource File
FS.initFile CBytes
p (FileFlag
FS.O_APPEND FileFlag -> FileFlag -> FileFlag
forall a. Bits a => a -> a -> a
.|. FileFlag
FS.O_CREAT FileFlag -> FileFlag -> FileFlag
forall a. Bits a => a -> a -> a
.|. FileFlag
FS.O_WRONLY) FileFlag
FS.DEFAULT_FILE_MODE
    IO (Sink Bytes) -> Resource (Sink Bytes)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (HasCallStack => BufferedOutput -> Sink Bytes
BufferedOutput -> Sink Bytes
sinkToBuffered (BufferedOutput -> Sink Bytes)
-> IO BufferedOutput -> IO (Sink Bytes)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> File -> IO BufferedOutput
forall o. Output o => o -> IO BufferedOutput
newBufferedOutput File
f)

-- | Turn an `IO` action into 'BIO' sink.
--
sinkToIO :: HasCallStack => (a -> IO ()) -> Sink a
{-# INLINABLE sinkToIO #-}
sinkToIO :: (a -> IO ()) -> Sink a
sinkToIO a -> IO ()
f = \ Maybe Void -> IO ()
k Maybe a
ma ->
    case Maybe a
ma of
        Just a
a -> a -> IO ()
f a
a
        Maybe a
_ -> Maybe Void -> IO ()
k Maybe Void
forall a. Maybe a
EOF

-- | Turn an `IO` action(and a flush action), into 'BIO' sink.
--
sinkToIO' :: HasCallStack => (a -> IO ()) -> IO () -> Sink a
{-# INLINABLE sinkToIO' #-}
sinkToIO' :: (a -> IO ()) -> IO () -> Sink a
sinkToIO' a -> IO ()
f IO ()
flush = \ Maybe Void -> IO ()
k Maybe a
ma ->
    case Maybe a
ma of
        Just a
a -> a -> IO ()
f a
a
        Maybe a
_ -> IO ()
flush IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO ()
k Maybe Void
forall a. Maybe a
EOF

-- | Sink to a list in memory.
--
-- The 'MVar' will be empty during sinking, and will be filled after sink receives an EOF.
sinkToList :: IO (MVar [a], Sink a)
sinkToList :: IO (MVar [a], Sink a)
sinkToList = do
    IORef [a]
xsRef <- [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef []
    MVar [a]
rRef <- IO (MVar [a])
forall a. IO (MVar a)
newEmptyMVar
    (MVar [a], Sink a) -> IO (MVar [a], Sink a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar [a]
rRef, (a -> IO ()) -> IO () -> Sink a
forall a. HasCallStack => (a -> IO ()) -> IO () -> Sink a
sinkToIO' (\ a
x -> IORef [a] -> ([a] -> [a]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef [a]
xsRef (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:))
                            (do IORef [a] -> ([a] -> [a]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef [a]
xsRef [a] -> [a]
forall a. [a] -> [a]
reverse
                                [a]
xs <- IORef [a] -> IO [a]
forall a. IORef a -> IO a
readIORef IORef [a]
xsRef
                                MVar [a] -> [a] -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar [a]
rRef [a]
xs))

--------------------------------------------------------------------------------
-- Nodes

-- | BIO node from a pure function.
--
-- BIO node made with this funtion are stateless, thus can be reused across chains.
pureBIO :: (a -> b) -> BIO a b
{-# INLINE pureBIO #-}
pureBIO :: (a -> b) -> BIO a b
pureBIO a -> b
f = \ Maybe b -> IO ()
k Maybe a
x -> Maybe b -> IO ()
k (a -> b
f (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe a
x)

-- | BIO node from an IO function.
--
-- BIO node made with this funtion may not be stateless, it depends on if the IO function use
-- IO state.
ioBIO :: HasCallStack => (a -> IO b) -> BIO a b
{-# INLINE ioBIO #-}
ioBIO :: (a -> IO b) -> BIO a b
ioBIO a -> IO b
f = \ Maybe b -> IO ()
k Maybe a
x ->
    case Maybe a
x of Just a
x' -> a -> IO b
f a
x' IO b -> (b -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe b -> IO ()
k (Maybe b -> IO ()) -> (b -> Maybe b) -> b -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> Maybe b
forall a. a -> Maybe a
Just
              Maybe a
_ -> Maybe b -> IO ()
k Maybe b
forall a. Maybe a
EOF

-- | BIO node from a pure filter.
--
-- BIO node made with this funtion are stateless, thus can be reused across chains.
filter ::  (a -> Bool) -> BIO a a
filter :: (a -> Bool) -> BIO a a
filter a -> Bool
f Maybe a -> IO ()
k = Maybe a -> IO ()
go
  where
    go :: Maybe a -> IO ()
go (Just a
a) = Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (a -> Bool
f a
a) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe a -> IO ()
k (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
    go Maybe a
Nothing = Maybe a -> IO ()
k Maybe a
forall a. Maybe a
Nothing

-- | BIO node from an impure filter.
--
-- BIO node made with this funtion may not be stateless, it depends on if the IO function use
filterM ::  (a -> IO Bool) -> BIO a a
filterM :: (a -> IO Bool) -> BIO a a
filterM a -> IO Bool
f Maybe a -> IO ()
k = Maybe a -> IO ()
go
  where
    go :: Maybe a -> IO ()
go (Just a
a) = do
        Bool
mbool <- a -> IO Bool
f a
a
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
mbool (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe a -> IO ()
k (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
    go Maybe a
Nothing = Maybe a -> IO ()
k Maybe a
forall a. Maybe a
Nothing

-- | Make a chunk size divider.
--
-- A divider size divide each chunk's size to the nearest multiplier to granularity,
-- last trailing chunk is directly returned.
newReChunk :: Int                -- ^ chunk granularity
           -> IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newReChunk #-}
newReChunk :: Int -> IO (BIO Bytes Bytes)
newReChunk Int
n = do
    IORef Bytes
trailingRef <- Bytes -> IO (IORef Bytes)
forall a. a -> IO (IORef a)
newIORef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Bytes -> IO (BIO Bytes Bytes))
-> BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall a b. (a -> b) -> a -> b
$ \ Maybe Bytes -> IO ()
k Maybe Bytes
mbs ->
        case Maybe Bytes
mbs of
            Just Bytes
bs -> do
                Bytes
trailing <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
trailingRef
                let chunk :: Bytes
chunk =  Bytes
trailing Bytes -> Bytes -> Bytes
forall (v :: * -> *) a. Vec v a => v a -> v a -> v a
`V.append` Bytes
bs
                    l :: Int
l = Bytes -> Int
forall (v :: * -> *) a. Vec v a => v a -> Int
V.length Bytes
chunk
                if Int
l Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
                then do
                    let l' :: Int
l' = Int
l Int -> Int -> Int
forall a. Num a => a -> a -> a
- (Int
l Int -> Int -> Int
forall a. Integral a => a -> a -> a
`rem` Int
n)
                        (Bytes
chunk', Bytes
rest) = Int -> Bytes -> (Bytes, Bytes)
forall (v :: * -> *) a. Vec v a => Int -> v a -> (v a, v a)
V.splitAt Int
l' Bytes
chunk
                    IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
rest
                    Maybe Bytes -> IO ()
k (Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just Bytes
chunk')
                else IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
chunk
            Maybe Bytes
_ -> do
                Bytes
trailing <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
trailingRef
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
trailing) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                    IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
                    Maybe Bytes -> IO ()
k (Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just Bytes
trailing)
                Maybe Bytes -> IO ()
k Maybe Bytes
forall a. Maybe a
EOF

-- | Read buffer and parse with 'Parser'.
--
-- This function will turn a 'Parser' into a 'BIO', throw 'OtherError' with name @EPARSE@ if parsing fail.
--
newParserNode :: HasCallStack => P.Parser a -> IO (BIO V.Bytes a)
{-# INLINABLE newParserNode #-}
newParserNode :: Parser a -> IO (BIO Bytes a)
newParserNode Parser a
p = do
    -- type LastParseState = Maybe (V.Bytes -> P.Result)
    IORef (Maybe (ParseStep ParseError a))
resultRef <- Maybe (ParseStep ParseError a)
-> IO (IORef (Maybe (ParseStep ParseError a)))
forall a. a -> IO (IORef a)
newIORef Maybe (ParseStep ParseError a)
forall a. Maybe a
EOF
    BIO Bytes a -> IO (BIO Bytes a)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes a -> IO (BIO Bytes a))
-> BIO Bytes a -> IO (BIO Bytes a)
forall a b. (a -> b) -> a -> b
$ \ Maybe a -> IO ()
k Maybe Bytes
mbs -> do
        let loop :: ParseStep ParseError a -> Bytes -> IO ()
loop ParseStep ParseError a
f Bytes
chunk = case ParseStep ParseError a
f Bytes
chunk of
                P.Success a
a Bytes
trailing -> do
                    Maybe a -> IO ()
k (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
                    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
trailing) (ParseStep ParseError a -> Bytes -> IO ()
loop ParseStep ParseError a
f Bytes
trailing)
                P.Partial ParseStep ParseError a
f' ->
                    IORef (Maybe (ParseStep ParseError a))
-> Maybe (ParseStep ParseError a) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe (ParseStep ParseError a))
resultRef (ParseStep ParseError a -> Maybe (ParseStep ParseError a)
forall a. a -> Maybe a
Just ParseStep ParseError a
f')
                P.Failure ParseError
e Bytes
_ ->
                    Text -> Text -> IO ()
forall a. HasCallStack => Text -> Text -> IO a
throwOtherError Text
"EPARSE" (ParseError -> Text
forall a. Print a => a -> Text
T.toText ParseError
e)
        Maybe (ParseStep ParseError a)
lastResult <- IORef (Maybe (ParseStep ParseError a))
-> IO (Maybe (ParseStep ParseError a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (ParseStep ParseError a))
resultRef
        case Maybe Bytes
mbs of
            Just Bytes
bs -> do
                let f :: ParseStep ParseError a
f = case Maybe (ParseStep ParseError a)
lastResult of
                        Just ParseStep ParseError a
x    -> ParseStep ParseError a
x
                        Maybe (ParseStep ParseError a)
_         -> Parser a -> ParseStep ParseError a
forall a. Parser a -> Bytes -> Result ParseError a
P.parseChunk Parser a
p
                ParseStep ParseError a -> Bytes -> IO ()
loop ParseStep ParseError a
f Bytes
bs
            Maybe Bytes
_ ->
                case Maybe (ParseStep ParseError a)
lastResult of
                    Just ParseStep ParseError a
f -> ParseStep ParseError a -> Bytes -> IO ()
loop ParseStep ParseError a
f Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
                    Maybe (ParseStep ParseError a)
_ -> Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF

-- | Make a new UTF8 decoder, which decode bytes streams into text streams.
--
-- If there're invalid UTF8 bytes, an 'OtherError' with name 'EINVALIDUTF8' will be thrown.`
--
-- Note this node is supposed to be used with preprocess node such as decompressor, parser, etc.
-- where bytes boundary cannot be controlled, UTF8 decoder will concat trailing bytes from last block to next one.
-- Use this node directly with 'sourceFromBuffered' will not be as efficient as directly use
-- 'sourceTextFromBuffered', because 'BufferedInput' provides push back capability,
-- trailing bytes can be pushed back to reading buffer then returned with next block input together.
--
newUTF8Decoder :: HasCallStack => IO (BIO V.Bytes T.Text)
{-# INLINABLE newUTF8Decoder #-}
newUTF8Decoder :: IO (BIO Bytes Text)
newUTF8Decoder = do
    IORef Bytes
trailingRef <- Bytes -> IO (IORef Bytes)
forall a. a -> IO (IORef a)
newIORef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
    BIO Bytes Text -> IO (BIO Bytes Text)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Text -> IO (BIO Bytes Text))
-> BIO Bytes Text -> IO (BIO Bytes Text)
forall a b. (a -> b) -> a -> b
$ \ Maybe Text -> IO ()
k Maybe Bytes
mbs -> do
        case Maybe Bytes
mbs of
            Just Bytes
bs -> do
                Bytes
trailing <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
trailingRef
                let chunk :: Bytes
chunk =  Bytes
trailing Bytes -> Bytes -> Bytes
forall (v :: * -> *) a. Vec v a => v a -> v a -> v a
`V.append` Bytes
bs
                    (V.PrimVector PrimArray Word8
arr Int
s Int
l) = Bytes
chunk
                if Int
l Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 Bool -> Bool -> Bool
&& PrimArray Word8 -> Int -> Int
T.decodeCharLen PrimArray Word8
arr Int
s Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
l
                then do
                    let (Int
i, Maybe Word8
_) = (Word8 -> Bool) -> Bytes -> (Int, Maybe Word8)
forall (v :: * -> *) a.
Vec v a =>
(a -> Bool) -> v a -> (Int, Maybe a)
V.findR (\ Word8
w -> Word8
w Word8 -> Word8 -> Bool
forall a. Ord a => a -> a -> Bool
>= Word8
0b11000000 Bool -> Bool -> Bool
|| Word8
w Word8 -> Word8 -> Bool
forall a. Ord a => a -> a -> Bool
<= Word8
0b01111111) Bytes
chunk
                    if (Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== -Int
1)
                    then Text -> Text -> IO ()
forall a. HasCallStack => Text -> Text -> IO a
throwOtherError Text
"EINVALIDUTF8" Text
"invalid UTF8 bytes"
                    else do
                        if PrimArray Word8 -> Int -> Int
T.decodeCharLen PrimArray Word8
arr (Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
i) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
l Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
i
                        then do
                            IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef (IArray PrimVector Word8 -> Int -> Int -> Bytes
forall (v :: * -> *) a. Vec v a => IArray v a -> Int -> Int -> v a
V.fromArr IArray PrimVector Word8
PrimArray Word8
arr (Int
sInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
i) (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
i))
                            Maybe Text -> IO ()
k (Text -> Maybe Text
forall a. a -> Maybe a
Just (HasCallStack => Bytes -> Text
Bytes -> Text
T.validate (IArray PrimVector Word8 -> Int -> Int -> Bytes
forall (v :: * -> *) a. Vec v a => IArray v a -> Int -> Int -> v a
V.fromArr IArray PrimVector Word8
PrimArray Word8
arr Int
s Int
i)))
                        else do
                            IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
                            Maybe Text -> IO ()
k (Text -> Maybe Text
forall a. a -> Maybe a
Just (HasCallStack => Bytes -> Text
Bytes -> Text
T.validate Bytes
chunk))
                else IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
chunk

            Maybe Bytes
_ -> do
                Bytes
trailing <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
trailingRef
                if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
trailing
                then Maybe Text -> IO ()
k Maybe Text
forall a. Maybe a
EOF
                else Text -> Text -> IO ()
forall a. HasCallStack => Text -> Text -> IO a
throwOtherError Text
"EINVALIDUTF8" Text
"invalid UTF8 bytes"

-- | Make a new stream splitter based on magic byte.
--
newMagicSplitter :: Word8 -> IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newMagicSplitter #-}
newMagicSplitter :: Word8 -> IO (BIO Bytes Bytes)
newMagicSplitter Word8
magic = do
    IORef Bytes
trailingRef <- Bytes -> IO (IORef Bytes)
forall a. a -> IO (IORef a)
newIORef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Bytes -> IO (BIO Bytes Bytes))
-> BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall a b. (a -> b) -> a -> b
$ \ Maybe Bytes -> IO ()
k Maybe Bytes
mx ->
        case Maybe Bytes
mx of
            Just Bytes
bs -> do
                Bytes
trailing <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
trailingRef
                let loop :: Bytes -> IO ()
loop Bytes
chunk = case Word8 -> Bytes -> Maybe Int
forall (v :: * -> *) a. (Vec v a, Eq a) => a -> v a -> Maybe Int
V.elemIndex Word8
magic Bytes
chunk of
                        Just Int
i -> do
                            -- TODO: looping
                            let (Bytes
line, Bytes
rest) = Int -> Bytes -> (Bytes, Bytes)
forall (v :: * -> *) a. Vec v a => Int -> v a -> (v a, v a)
V.splitAt (Int
iInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1) Bytes
chunk
                            Maybe Bytes -> IO ()
k (Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just Bytes
line)
                            Bytes -> IO ()
loop Bytes
rest
                        Maybe Int
_ -> IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
chunk
                Bytes -> IO ()
loop (Bytes
trailing Bytes -> Bytes -> Bytes
forall (v :: * -> *) a. Vec v a => v a -> v a -> v a
`V.append` Bytes
bs)
            Maybe Bytes
_ -> do
                Bytes
chunk <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
trailingRef
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
chunk) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                    IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
                    Maybe Bytes -> IO ()
k (Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just Bytes
chunk)
                Maybe Bytes -> IO ()
k Maybe Bytes
forall a. Maybe a
EOF

-- | Make a new stream splitter based on linefeed(@\r\n@ or @\n@).
--
-- The result bytes doesn't contain linefeed.
newLineSplitter :: IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newLineSplitter #-}
newLineSplitter :: IO (BIO Bytes Bytes)
newLineSplitter = do
    BIO Bytes Bytes
s <- Word8 -> IO (BIO Bytes Bytes)
newMagicSplitter Word8
10
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Bytes
s BIO Bytes Bytes -> BIO Bytes Bytes -> BIO Bytes Bytes
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Bytes -> Bytes) -> BIO Bytes Bytes
forall a b. (a -> b) -> BIO a b
pureBIO Bytes -> Bytes
forall a. (Prim a, Eq a, Num a) => PrimVector a -> PrimVector a
dropLineEnd)
  where
    dropLineEnd :: PrimVector a -> PrimVector a
dropLineEnd bs :: PrimVector a
bs@(V.PrimVector PrimArray a
arr Int
s Int
l) =
        case PrimVector a
bs PrimVector a -> Int -> Maybe a
forall (v :: * -> *) a. Vec v a => v a -> Int -> Maybe a
`V.indexMaybe` (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
2) of
            Just a
r | a
r a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
13   -> PrimArray a -> Int -> Int -> PrimVector a
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray a
arr Int
s (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
2)
                   | Bool
otherwise -> PrimArray a -> Int -> Int -> PrimVector a
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray a
arr Int
s (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1)
            Maybe a
_ | PrimVector a -> a
forall (v :: * -> *) a. (Vec v a, HasCallStack) => v a -> a
V.head PrimVector a
bs a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
10 -> PrimArray a -> Int -> Int -> PrimVector a
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray a
arr Int
s (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1)
              | Bool
otherwise -> PrimArray a -> Int -> Int -> PrimVector a
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray a
arr Int
s Int
l

-- | Make a new base64 encoder node.
newBase64Encoder :: IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newBase64Encoder #-}
newBase64Encoder :: IO (BIO Bytes Bytes)
newBase64Encoder = do
    BIO Bytes Bytes
re <- Int -> IO (BIO Bytes Bytes)
newReChunk Int
3
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Bytes
re BIO Bytes Bytes -> BIO Bytes Bytes -> BIO Bytes Bytes
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Bytes -> Bytes) -> BIO Bytes Bytes
forall a b. (a -> b) -> BIO a b
pureBIO Bytes -> Bytes
base64Encode)

-- | Make a new base64 decoder node.
newBase64Decoder :: HasCallStack => IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newBase64Decoder #-}
newBase64Decoder :: IO (BIO Bytes Bytes)
newBase64Decoder = do
    BIO Bytes Bytes
re <- Int -> IO (BIO Bytes Bytes)
newReChunk Int
4
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Bytes
re BIO Bytes Bytes -> BIO Bytes Bytes -> BIO Bytes Bytes
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Bytes -> Bytes) -> BIO Bytes Bytes
forall a b. (a -> b) -> BIO a b
pureBIO HasCallStack => Bytes -> Bytes
Bytes -> Bytes
base64Decode')

-- | Make a hex encoder node.
--
-- Hex encoder is stateless, it can be reused across chains.
hexEncoder :: Bool   -- ^ uppercase?
           -> BIO V.Bytes V.Bytes
{-# INLINABLE hexEncoder #-}
hexEncoder :: Bool -> BIO Bytes Bytes
hexEncoder Bool
upper = (Bytes -> Bytes) -> BIO Bytes Bytes
forall a b. (a -> b) -> BIO a b
pureBIO (Bool -> Bytes -> Bytes
hexEncode Bool
upper)

-- | Make a new hex decoder node.
newHexDecoder :: IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newHexDecoder #-}
newHexDecoder :: IO (BIO Bytes Bytes)
newHexDecoder = do
    BIO Bytes Bytes
re <- Int -> IO (BIO Bytes Bytes)
newReChunk Int
2
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Bytes
re BIO Bytes Bytes -> BIO Bytes Bytes -> BIO Bytes Bytes
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Bytes -> Bytes) -> BIO Bytes Bytes
forall a b. (a -> b) -> BIO a b
pureBIO HasCallStack => Bytes -> Bytes
Bytes -> Bytes
hexDecode')

-- | Make a new BIO node which counts items flow throught it.
--
-- 'Counter' is increased atomically, it's safe to read \/ reset the counter from other threads.
counterNode :: Counter -> BIO a a
{-# INLINABLE counterNode #-}
counterNode :: Counter -> BIO a a
counterNode Counter
c = (a -> IO a) -> BIO a a
forall a b. HasCallStack => (a -> IO b) -> BIO a b
ioBIO a -> IO a
inc
  where
    inc :: a -> IO a
inc a
x = do
        Counter -> Int -> IO ()
atomicAddCounter_ Counter
c Int
1
        a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x

-- | Make a new BIO node which counts items, and label item with a sequence number.
--
-- 'Counter' is increased atomically, it's safe to read \/ reset the counter from other threads.
seqNumNode :: Counter -> BIO a (Int, a)
{-# INLINABLE seqNumNode #-}
seqNumNode :: Counter -> BIO a (Int, a)
seqNumNode Counter
c = (a -> IO (Int, a)) -> BIO a (Int, a)
forall a b. HasCallStack => (a -> IO b) -> BIO a b
ioBIO a -> IO (Int, a)
inc
  where
    inc :: a -> IO (Int, a)
inc a
x = do
        Int
i <- Counter -> Int -> IO Int
atomicAddCounter Counter
c Int
1
        (Int, a) -> IO (Int, a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
i, a
x)

-- | Make a BIO node grouping items into fixed size arrays.
--
-- Trailing items are directly returned.
newGroupingNode :: Int -> IO (BIO a (V.Vector a))
{-# INLINABLE newGroupingNode #-}
newGroupingNode :: Int -> IO (BIO a (Vector a))
newGroupingNode Int
n
    | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 =  Int -> IO (BIO a (Vector a))
forall a. Int -> IO (BIO a (Vector a))
newGroupingNode Int
1
    | Bool
otherwise = do
        Counter
c <- Int -> IO Counter
newCounter Int
0
        IORef (SmallMutableArray RealWorld a)
arrRef <- SmallMutableArray RealWorld a
-> IO (IORef (SmallMutableArray RealWorld a))
forall a. a -> IO (IORef a)
newIORef (SmallMutableArray RealWorld a
 -> IO (IORef (SmallMutableArray RealWorld a)))
-> IO (SmallMutableArray RealWorld a)
-> IO (IORef (SmallMutableArray RealWorld a))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> IO (MArr SmallArray RealWorld a)
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
Int -> m (MArr arr s a)
A.newArr Int
n
        BIO a (Vector a) -> IO (BIO a (Vector a))
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO a (Vector a) -> IO (BIO a (Vector a)))
-> BIO a (Vector a) -> IO (BIO a (Vector a))
forall a b. (a -> b) -> a -> b
$ \ Maybe (Vector a) -> IO ()
k Maybe a
mx ->
            case Maybe a
mx of
                Just a
x -> do
                    Int
i <- Counter -> IO Int
forall a. Prim a => PrimIORef a -> IO a
readPrimIORef Counter
c
                    if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
                    then do
                        SmallMutableArray RealWorld a
marr <- IORef (SmallMutableArray RealWorld a)
-> IO (SmallMutableArray RealWorld a)
forall a. IORef a -> IO a
readIORef IORef (SmallMutableArray RealWorld a)
arrRef
                        MArr SmallArray RealWorld a -> Int -> a -> IO ()
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> Int -> a -> m ()
A.writeArr MArr SmallArray RealWorld a
SmallMutableArray RealWorld a
marr Int
i a
x
                        Counter -> Int -> IO ()
forall a. Prim a => PrimIORef a -> a -> IO ()
writePrimIORef Counter
c Int
0
                        IORef (SmallMutableArray RealWorld a)
-> SmallMutableArray RealWorld a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (SmallMutableArray RealWorld a)
arrRef (SmallMutableArray RealWorld a -> IO ())
-> IO (SmallMutableArray RealWorld a) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> IO (MArr SmallArray RealWorld a)
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
Int -> m (MArr arr s a)
A.newArr Int
n
                        SmallArray a
arr <- MArr SmallArray RealWorld a -> IO (SmallArray a)
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> m (arr a)
A.unsafeFreezeArr MArr SmallArray RealWorld a
SmallMutableArray RealWorld a
marr
                        Maybe (Vector a) -> IO ()
k (Maybe (Vector a) -> IO ())
-> (Vector a -> Maybe (Vector a)) -> Vector a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Vector a -> Maybe (Vector a)
forall a. a -> Maybe a
Just (Vector a -> IO ()) -> Vector a -> IO ()
forall a b. (a -> b) -> a -> b
$! IArray Vector a -> Int -> Int -> Vector a
forall (v :: * -> *) a. Vec v a => IArray v a -> Int -> Int -> v a
V.fromArr IArray Vector a
SmallArray a
arr Int
0 Int
n
                    else do
                        SmallMutableArray RealWorld a
marr <- IORef (SmallMutableArray RealWorld a)
-> IO (SmallMutableArray RealWorld a)
forall a. IORef a -> IO a
readIORef IORef (SmallMutableArray RealWorld a)
arrRef
                        MArr SmallArray RealWorld a -> Int -> a -> IO ()
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> Int -> a -> m ()
A.writeArr MArr SmallArray RealWorld a
SmallMutableArray RealWorld a
marr Int
i a
x
                        Counter -> Int -> IO ()
forall a. Prim a => PrimIORef a -> a -> IO ()
writePrimIORef Counter
c (Int
iInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
                Maybe a
_ -> do
                    Int
i <- Counter -> IO Int
forall a. Prim a => PrimIORef a -> IO a
readPrimIORef Counter
c
                    if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0
                    then do
                        Counter -> Int -> IO ()
forall a. Prim a => PrimIORef a -> a -> IO ()
writePrimIORef Counter
c Int
0
                        SmallMutableArray RealWorld a
marr <- IORef (SmallMutableArray RealWorld a)
-> IO (SmallMutableArray RealWorld a)
forall a. IORef a -> IO a
readIORef IORef (SmallMutableArray RealWorld a)
arrRef
                        MArr SmallArray RealWorld a -> Int -> IO ()
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> Int -> m ()
A.shrinkMutableArr MArr SmallArray RealWorld a
SmallMutableArray RealWorld a
marr Int
i
                        SmallArray a
arr <- MArr SmallArray RealWorld a -> IO (SmallArray a)
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> m (arr a)
A.unsafeFreezeArr MArr SmallArray RealWorld a
SmallMutableArray RealWorld a
marr
                        Maybe (Vector a) -> IO ()
k (Maybe (Vector a) -> IO ())
-> (Vector a -> Maybe (Vector a)) -> Vector a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Vector a -> Maybe (Vector a)
forall a. a -> Maybe a
Just (Vector a -> IO ()) -> Vector a -> IO ()
forall a b. (a -> b) -> a -> b
$! IArray Vector a -> Int -> Int -> Vector a
forall (v :: * -> *) a. Vec v a => IArray v a -> Int -> Int -> v a
V.fromArr IArray Vector a
SmallArray a
arr Int
0 Int
i
                    else Maybe (Vector a) -> IO ()
k Maybe (Vector a)
forall a. Maybe a
EOF

-- | A BIO node flatten items.
--
ungroupingNode :: BIO (V.Vector a) a
{-# INLINABLE ungroupingNode #-}
ungroupingNode :: BIO (Vector a) a
ungroupingNode = \ Maybe a -> IO ()
k Maybe (Vector a)
mx ->
    case Maybe (Vector a)
mx of
        Just Vector a
x -> (a -> IO ()) -> Vector a -> IO ()
forall (v :: * -> *) a (f :: * -> *) b.
(Vec v a, Applicative f) =>
(a -> f b) -> v a -> f ()
V.traverseVec_ (Maybe a -> IO ()
k (Maybe a -> IO ()) -> (a -> Maybe a) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just) Vector a
x
        Maybe (Vector a)
_      -> Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF

-- | A BIO node which write 'True' to 'IORef' when 'EOF' is reached.
consumedNode :: IORef Bool -> BIO a a
{-# INLINABLE consumedNode #-}
consumedNode :: IORef Bool -> BIO a a
consumedNode IORef Bool
ref = \ Maybe a -> IO ()
k Maybe a
mx -> case Maybe a
mx of
    Just a
_ -> Maybe a -> IO ()
k Maybe a
mx
    Maybe a
_ -> do IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
ref Bool
True
            Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF