{-|
Module      : Z.IO.BIO.Base
Description : Composable IO Loops
Copyright   : (c) Dong Han, 2017-2020
License     : BSD
Maintainer  : winterland1989@gmail.com
Stability   : experimental
Portability : non-portable

This module provides 'BIO' (block IO) type to facilitate writing streaming programs. A 'BIO' node usually:

  * Process input in unit of block(or item).
  * Running in constant spaces, which means the memory usage won't accumulate.
  * Keep some state in IO, which is sealed in 'BIO' closure.

-}
module Z.IO.BIO.Base (
  -- * The BIO type
    BIO, pattern EOF, Source, Sink
  -- ** Basic combinators
  , appendSource, concatSource, concatSource'
  , joinSink, fuseSink
  -- * Run BIO chain
  , discard
  , step, step_
  , run, run_
  , runBlock, runBlock_, unsafeRunBlock
  , runBlocks, runBlocks_, unsafeRunBlocks
  -- * Make new BIO
  , fromPure, fromIO
  , filter, filterIO
  -- * Use with fold
  , fold', foldIO'
  -- ** Source
  , initSourceFromFile
  , initSourceFromFile'
  , sourceFromIO
  , sourceFromList
  , sourceFromBuffered
  , sourceTextFromBuffered
  , sourceJSONFromBuffered
  , sourceParserFromBuffered
  , sourceParseChunkFromBuffered
  -- ** Sink
  , sinkToIO
  , sinkToList
  , initSinkToFile
  , sinkToBuffered
  , sinkBuilderToBuffered
  -- ** Bytes specific
  , newReChunk
  , newUTF8Decoder
  , newParser, newMagicSplitter, newLineSplitter
  , newBase64Encoder, newBase64Decoder
  , hexEncode
  , newHexDecoder
  -- ** Generic BIO
  , counter
  , seqNum
  , newGrouping
  , ungrouping
  , consumed
  ) where

import           Prelude                hiding (filter)
import           Control.Concurrent.MVar
import           Control.Concurrent.STM
import qualified Control.Foldl          as L
import           Control.Monad
import           Control.Monad.IO.Class
import           Data.Bits              ((.|.))
import           Data.IORef
import qualified Data.List              as List
import           Data.Void
import           Data.Word
import           System.IO.Unsafe       (unsafePerformIO)
import qualified Z.Data.Array           as A
import qualified Z.Data.Builder         as B
import           Z.Data.CBytes          (CBytes)
import qualified Z.Data.JSON            as JSON
import qualified Z.Data.Parser          as P
import           Z.Data.PrimRef
import qualified Z.Data.Text            as T
import qualified Z.Data.Text.UTF8Codec  as T
import qualified Z.Data.Vector          as V
import qualified Z.Data.Vector.Base     as V
import           Z.Data.Vector.Base64
import qualified Z.Data.Vector.Hex      as Hex
import           Z.IO.Buffered
import           Z.IO.Exception
import qualified Z.IO.FileSystem.Base   as FS
import           Z.IO.Resource

-- | A 'BIO'(blocked IO) node.
--
-- A 'BIO' node is a push based stream transformer. It can be used to describe different kinds of IO
-- devices:
--
--  * @BIO inp out@ describe an IO state machine(e.g. z_stream in zlib),
--    which takes some input in block, then outputs.
--  * @type Source out = BIO Void out@ described an IO source, which never takes input,
--    but gives output until EOF by looping.
--  * @type Sink inp = BIO inp Void@ described an IO sink, which takes input and perform some IO effects,
--    such as writing to terminal or files.
--
-- You can connect these 'BIO' nodes with '.' which connect left node's output to right node's input,
-- and return a new 'BIO' node with left node's input type and right node's output type.
--
-- You can run a 'BIO' node in different ways:
--
--   * 'step'\/'step_' to supply a single chunk of input and step the BIO node.
--   * 'run'\/'run_' will supply EOF directly, which will effectively pull all values from source,
--     and push to sink until source reaches EOF.
--   * 'runBlock'\/'runBlock_' will supply a single block of input as whole input and run the BIO node.
--   * 'runBlocks'\/'runBlocks_' will supply a list of blocks as whole input and run the BIO node.
--
-- Note 'BIO' usually contains some IO states, you can consider it as an opaque 'IORef':
--
--   * You shouldn't use a 'BIO' node across multiple 'BIO' chain unless the state can be reset.
--   * You shouldn't use a 'BIO' node across multiple threads unless document states otherwise.
--
-- 'BIO' is simply a convenient way to construct single-thread streaming computation, to use 'BIO'
-- in multiple threads, check "Z.IO.BIO.Concurrent" module.
--
type BIO inp out = (Maybe out -> IO ())     -- ^ Pass 'EOF' to indicate current node reaches EOF
                -> Maybe inp                -- ^ 'EOF' indicates upstream reaches EOF
                -> IO ()

-- | Patterns for more meaningful pattern matching.
pattern EOF :: Maybe a
pattern $bEOF :: Maybe a
$mEOF :: forall r a. Maybe a -> (Void# -> r) -> (Void# -> r) -> r
EOF = Nothing

-- | Type alias for 'BIO' node which never takes input.
--
-- Note when implement a 'Source', you should assume 'EOF' argument is supplied only once, and you
-- should loop to call downstream continuation with all available chunks, then write a final 'EOF'
-- to indicate EOF.
type Source x = BIO Void x

-- | Type alias for 'BIO' node which only takes input and perform effects.
--
-- Note when implement a 'Sink', you should assume 'EOF' argument is supplied only once(when upstream
-- reaches EOF), you do not need to call downstream continuation before EOF, and
-- do a flush(also write a final 'EOF') when upstream reach EOF.
type Sink x = BIO x ()

-- | Connect two 'BIO' source, after first reach EOF, draw elements from second.
appendSource :: HasCallStack => Source a -> Source a -> Source a
{-# INLINABLE appendSource #-}
Source a
b1 appendSource :: Source a -> Source a -> Source a
`appendSource` Source a
b2 = \ Maybe a -> IO ()
k Maybe Void
_ ->
    Source a
b1 (\ Maybe a
y ->
        case Maybe a
y of Just a
_ -> Maybe a -> IO ()
k Maybe a
y
                  Maybe a
_      -> Source a
b2 Maybe a -> IO ()
k Maybe Void
forall a. Maybe a
EOF) Maybe Void
forall a. Maybe a
EOF

-- | Fuse two 'BIO' sinks, i.e. everything written to the fused sink will be written to left and right sink.
--
-- Flush result 'BIO' will effectively flush both sink.
joinSink :: HasCallStack => Sink out -> Sink out -> Sink out
{-# INLINABLE joinSink #-}
Sink out
b1 joinSink :: Sink out -> Sink out -> Sink out
`joinSink` Sink out
b2 = \ Maybe () -> IO ()
k Maybe out
mx ->
    case Maybe out
mx of
        Just out
_ -> do
            Sink out
b1 Maybe () -> IO ()
forall a. a -> IO ()
discard Maybe out
mx
            Sink out
b2 Maybe () -> IO ()
forall a. a -> IO ()
discard Maybe out
mx
        Maybe out
_ -> do
            Sink out
b1 Maybe () -> IO ()
forall a. a -> IO ()
discard Maybe out
forall a. Maybe a
EOF
            Sink out
b2 Maybe () -> IO ()
forall a. a -> IO ()
discard Maybe out
forall a. Maybe a
EOF
            Maybe () -> IO ()
k Maybe ()
forall a. Maybe a
EOF

-- | Fuse a list of 'BIO' sinks, everything written to the fused sink will be written to every sink in the list.
--
-- Flush result 'BIO' will effectively flush every sink in the list.
fuseSink :: HasCallStack => [Sink out] -> Sink out
{-# INLINABLE fuseSink #-}
fuseSink :: [Sink out] -> Sink out
fuseSink [Sink out]
ss = \ Maybe () -> IO ()
k Maybe out
mx ->
    case Maybe out
mx of
        Just out
_ -> (Sink out -> IO ()) -> [Sink out] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\ Sink out
s -> Sink out
s Maybe () -> IO ()
forall a. a -> IO ()
discard Maybe out
mx) [Sink out]
ss
        Maybe out
_ -> do
            (Sink out -> IO ()) -> [Sink out] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\ Sink out
s -> Sink out
s Maybe () -> IO ()
forall a. a -> IO ()
discard Maybe out
mx) [Sink out]
ss
            Maybe () -> IO ()
k Maybe ()
forall a. Maybe a
EOF

-- | Connect list of 'BIO' sources, after one reach EOF, draw element from next.
concatSource :: HasCallStack => [Source a] -> Source a
{-# INLINABLE concatSource #-}
concatSource :: [Source a] -> Source a
concatSource = (Source a -> Source a -> Source a)
-> Source a -> [Source a] -> Source a
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
List.foldl' Source a -> Source a -> Source a
forall a. HasCallStack => Source a -> Source a -> Source a
appendSource Source a
forall a. Source a
emptySource

-- | A 'Source' directly write EOF to downstream.
emptySource :: Source a
{-# INLINABLE emptySource #-}
emptySource :: Source a
emptySource = \ Maybe a -> IO ()
k Maybe Void
_ -> Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF

-- | Connect list of 'BIO' sources, after one reach EOF, draw element from next.
concatSource' :: HasCallStack => Source (Source a) -> Source a
{-# INLINABLE concatSource' #-}
concatSource' :: Source (Source a) -> Source a
concatSource' Source (Source a)
ssrc = \ Maybe a -> IO ()
k Maybe Void
_ -> Source (Source a)
ssrc (\ Maybe (Source a)
msrc ->
    case Maybe (Source a)
msrc of
        Just Source a
src -> Source a
src (\ Maybe a
mx ->
            case Maybe a
mx of Just a
_ -> Maybe a -> IO ()
k Maybe a
mx
                       Maybe a
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) Maybe Void
forall a. Maybe a
EOF
        Maybe (Source a)
_ -> Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF) Maybe Void
forall a. Maybe a
EOF

-------------------------------------------------------------------------------
-- Run BIO

-- | Discards a value.
discard :: a -> IO ()
{-# INLINABLE discard #-}
discard :: a -> IO ()
discard a
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Supply a single chunk of input to a 'BIO' and collect result.
step :: HasCallStack => BIO inp out -> inp -> IO [out]
{-# INLINABLE step #-}
step :: BIO inp out -> inp -> IO [out]
step BIO inp out
bio inp
inp = do
    IORef [out]
accRef <- [out] -> IO (IORef [out])
forall a. a -> IO (IORef a)
newIORef []
    BIO inp out
bio ((out -> IO ()) -> Maybe out -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_  ((out -> IO ()) -> Maybe out -> IO ())
-> (out -> IO ()) -> Maybe out -> IO ()
forall a b. (a -> b) -> a -> b
$ \ out
x -> IORef [out] -> ([out] -> [out]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [out]
accRef (out
xout -> [out] -> [out]
forall a. a -> [a] -> [a]
:)) (inp -> Maybe inp
forall a. a -> Maybe a
Just inp
inp)
    [out] -> [out]
forall a. [a] -> [a]
reverse ([out] -> [out]) -> IO [out] -> IO [out]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [out] -> IO [out]
forall a. IORef a -> IO a
readIORef IORef [out]
accRef

-- | Supply a single chunk of input to a 'BIO' without collecting result.
step_ :: HasCallStack => BIO inp out -> inp -> IO ()
{-# INLINABLE step_ #-}
step_ :: BIO inp out -> inp -> IO ()
step_ BIO inp out
bio = BIO inp out
bio Maybe out -> IO ()
forall a. a -> IO ()
discard (Maybe inp -> IO ()) -> (inp -> Maybe inp) -> inp -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. inp -> Maybe inp
forall a. a -> Maybe a
Just

-- | Run a 'BIO' loop without providing input.
--
-- When used on 'Source', it starts the streaming loop.
-- When used on 'Sink', it performs a flush.
run_ :: HasCallStack => BIO inp out -> IO ()
{-# INLINABLE run_ #-}
run_ :: BIO inp out -> IO ()
run_ BIO inp out
bio = BIO inp out
bio Maybe out -> IO ()
forall a. a -> IO ()
discard Maybe inp
forall a. Maybe a
EOF

-- | Run a 'BIO' loop without providing input, and collect result.
--
-- When used on 'Source', it will collect all input chunks.
run :: HasCallStack => BIO inp out -> IO [out]
{-# INLINABLE run #-}
run :: BIO inp out -> IO [out]
run BIO inp out
bio = do
    IORef [out]
accRef <- [out] -> IO (IORef [out])
forall a. a -> IO (IORef a)
newIORef []
    BIO inp out
bio ((out -> IO ()) -> Maybe out -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_  ((out -> IO ()) -> Maybe out -> IO ())
-> (out -> IO ()) -> Maybe out -> IO ()
forall a b. (a -> b) -> a -> b
$ \ out
x -> IORef [out] -> ([out] -> [out]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [out]
accRef (out
xout -> [out] -> [out]
forall a. a -> [a] -> [a]
:)) Maybe inp
forall a. Maybe a
EOF
    [out] -> [out]
forall a. [a] -> [a]
reverse ([out] -> [out]) -> IO [out] -> IO [out]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [out] -> IO [out]
forall a. IORef a -> IO a
readIORef IORef [out]
accRef

-- | Run a strict fold over a source with 'L.Fold'.
fold' :: L.Fold a b -> Source a -> IO b
{-# INLINABLE fold' #-}
fold' :: Fold a b -> Source a -> IO b
fold' (L.Fold x -> a -> x
s x
i x -> b
e) Source a
bio = do
    IORef x
iref <- x -> IO (IORef x)
forall a. a -> IO (IORef a)
newIORef x
i
    Source a
bio ((a -> IO ()) -> Maybe a -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_  (\ a
x -> IORef x -> (x -> x) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef x
iref (\ x
i' -> x -> a -> x
s x
i' a
x))) Maybe Void
forall a. Maybe a
Nothing
    x -> b
e (x -> b) -> IO x -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef x -> IO x
forall a. IORef a -> IO a
readIORef IORef x
iref

-- | Run a strict fold over a source with 'L.FoldM'.
foldIO' :: L.FoldM IO a b -> Source a -> IO b
{-# INLINABLE foldIO' #-}
foldIO' :: FoldM IO a b -> Source a -> IO b
foldIO' (L.FoldM x -> a -> IO x
s IO x
i x -> IO b
e) Source a
bio = do
    IORef x
iref <- x -> IO (IORef x)
forall a. a -> IO (IORef a)
newIORef (x -> IO (IORef x)) -> IO x -> IO (IORef x)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO x
i
    Source a
bio ((a -> IO ()) -> Maybe a -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_  (\ a
x -> do
        x
i' <- IORef x -> IO x
forall a. IORef a -> IO a
readIORef IORef x
iref
        !x
x' <- x -> a -> IO x
s x
i' a
x
        IORef x -> x -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef x
iref x
x')) Maybe Void
forall a. Maybe a
Nothing
    x -> IO b
e (x -> IO b) -> IO x -> IO b
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IORef x -> IO x
forall a. IORef a -> IO a
readIORef IORef x
iref

-- | Run a 'BIO' loop with a single chunk of input and EOF, and collect result.
--
runBlock :: HasCallStack => BIO inp out -> inp -> IO [out]
{-# INLINABLE runBlock #-}
runBlock :: BIO inp out -> inp -> IO [out]
runBlock BIO inp out
bio inp
inp = do
    IORef [out]
accRef <- [out] -> IO (IORef [out])
forall a. a -> IO (IORef a)
newIORef []
    BIO inp out
bio ((out -> IO ()) -> Maybe out -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_  ((out -> IO ()) -> Maybe out -> IO ())
-> (out -> IO ()) -> Maybe out -> IO ()
forall a b. (a -> b) -> a -> b
$ \ out
x -> IORef [out] -> ([out] -> [out]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [out]
accRef (out
xout -> [out] -> [out]
forall a. a -> [a] -> [a]
:)) (inp -> Maybe inp
forall a. a -> Maybe a
Just inp
inp)
    BIO inp out
bio ((out -> IO ()) -> Maybe out -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_  ((out -> IO ()) -> Maybe out -> IO ())
-> (out -> IO ()) -> Maybe out -> IO ()
forall a b. (a -> b) -> a -> b
$ \ out
x -> IORef [out] -> ([out] -> [out]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [out]
accRef (out
xout -> [out] -> [out]
forall a. a -> [a] -> [a]
:)) Maybe inp
forall a. Maybe a
EOF
    [out] -> [out]
forall a. [a] -> [a]
reverse ([out] -> [out]) -> IO [out] -> IO [out]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [out] -> IO [out]
forall a. IORef a -> IO a
readIORef IORef [out]
accRef

-- | Run a 'BIO' loop with a single chunk of input and EOF, without collecting result.
--
runBlock_ :: HasCallStack => BIO inp out -> inp -> IO ()
{-# INLINABLE runBlock_ #-}
runBlock_ :: BIO inp out -> inp -> IO ()
runBlock_ BIO inp out
bio inp
inp = do
    BIO inp out
bio Maybe out -> IO ()
forall a. a -> IO ()
discard (inp -> Maybe inp
forall a. a -> Maybe a
Just inp
inp)
    BIO inp out
bio Maybe out -> IO ()
forall a. a -> IO ()
discard Maybe inp
forall a. Maybe a
EOF

-- | Wrap 'runBlock' into a pure interface.
--
-- You can wrap a stateful BIO computation(including the creation of 'BIO' node),
-- when you can guarantee a computation is pure, e.g. compressing, decoding, etc.
unsafeRunBlock :: HasCallStack => IO (BIO inp out) -> inp -> [out]
{-# INLINABLE unsafeRunBlock #-}
unsafeRunBlock :: IO (BIO inp out) -> inp -> [out]
unsafeRunBlock IO (BIO inp out)
new inp
inp = IO [out] -> [out]
forall a. IO a -> a
unsafePerformIO (IO (BIO inp out)
new IO (BIO inp out) -> (BIO inp out -> IO [out]) -> IO [out]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ BIO inp out
bio -> BIO inp out -> inp -> IO [out]
forall inp out. HasCallStack => BIO inp out -> inp -> IO [out]
runBlock BIO inp out
bio inp
inp)

-- | Supply blocks of input and EOF to a 'BIO', and collect results.
--
-- Note many 'BIO' node will be closed or not be able to take new input after drained.
runBlocks :: HasCallStack => BIO inp out -> [inp] -> IO [out]
{-# INLINABLE runBlocks #-}
runBlocks :: BIO inp out -> [inp] -> IO [out]
runBlocks BIO inp out
bio [inp]
inps = do
    IORef [out]
accRef <- [out] -> IO (IORef [out])
forall a. a -> IO (IORef a)
newIORef []
    [inp] -> (inp -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [inp]
inps ((inp -> IO ()) -> IO ()) -> (inp -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ BIO inp out
bio ((out -> IO ()) -> Maybe out -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_  ((out -> IO ()) -> Maybe out -> IO ())
-> (out -> IO ()) -> Maybe out -> IO ()
forall a b. (a -> b) -> a -> b
$ \ out
x -> IORef [out] -> ([out] -> [out]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [out]
accRef (out
xout -> [out] -> [out]
forall a. a -> [a] -> [a]
:)) (Maybe inp -> IO ()) -> (inp -> Maybe inp) -> inp -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. inp -> Maybe inp
forall a. a -> Maybe a
Just
    BIO inp out
bio ((out -> IO ()) -> Maybe out -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_  ((out -> IO ()) -> Maybe out -> IO ())
-> (out -> IO ()) -> Maybe out -> IO ()
forall a b. (a -> b) -> a -> b
$ \ out
x -> IORef [out] -> ([out] -> [out]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [out]
accRef (out
xout -> [out] -> [out]
forall a. a -> [a] -> [a]
:)) Maybe inp
forall a. Maybe a
EOF
    [out] -> [out]
forall a. [a] -> [a]
reverse ([out] -> [out]) -> IO [out] -> IO [out]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [out] -> IO [out]
forall a. IORef a -> IO a
readIORef IORef [out]
accRef

-- | Supply blocks of input and EOF to a 'BIO', without collecting results.
--
-- Note many 'BIO' node will be closed or not be able to take new input after drained.
runBlocks_ :: HasCallStack => BIO inp out -> [inp] -> IO ()
{-# INLINABLE runBlocks_ #-}
runBlocks_ :: BIO inp out -> [inp] -> IO ()
runBlocks_ BIO inp out
bio [inp]
inps = do
    [inp] -> (inp -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [inp]
inps ((inp -> IO ()) -> IO ()) -> (inp -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ BIO inp out
bio Maybe out -> IO ()
forall a. a -> IO ()
discard (Maybe inp -> IO ()) -> (inp -> Maybe inp) -> inp -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. inp -> Maybe inp
forall a. a -> Maybe a
Just
    BIO inp out
bio Maybe out -> IO ()
forall a. a -> IO ()
discard Maybe inp
forall a. Maybe a
EOF

-- | Wrap 'runBlocks' into a pure interface.
--
-- Similar to 'unsafeRunBlock', but with a list of input blocks.
unsafeRunBlocks :: HasCallStack => IO (BIO inp out) -> [inp] -> [out]
{-# INLINABLE unsafeRunBlocks #-}
unsafeRunBlocks :: IO (BIO inp out) -> [inp] -> [out]
unsafeRunBlocks IO (BIO inp out)
new [inp]
inps = IO [out] -> [out]
forall a. IO a -> a
unsafePerformIO (IO (BIO inp out)
new IO (BIO inp out) -> (BIO inp out -> IO [out]) -> IO [out]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ BIO inp out
bio -> BIO inp out -> [inp] -> IO [out]
forall inp out. HasCallStack => BIO inp out -> [inp] -> IO [out]
runBlocks BIO inp out
bio [inp]
inps)

-------------------------------------------------------------------------------
-- Source

-- | Source a list(or any 'Foldable') from memory.
--
sourceFromList :: Foldable f => f a -> Source a
{-# INLINABLE sourceFromList #-}
sourceFromList :: f a -> Source a
sourceFromList f a
xs0 = \ Maybe a -> IO ()
k Maybe Void
_ -> do
    (a -> IO ()) -> f a -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Maybe a -> IO ()
k (Maybe a -> IO ()) -> (a -> Maybe a) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just) f a
xs0
    Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF

-- | Turn a 'BufferedInput' into 'BIO' source, map EOF to EOF.
--
sourceFromBuffered :: HasCallStack => BufferedInput -> Source V.Bytes
{-# INLINABLE sourceFromBuffered #-}
sourceFromBuffered :: BufferedInput -> Source Bytes
sourceFromBuffered BufferedInput
i = \ Maybe Bytes -> IO ()
k Maybe Void
_ -> (Maybe Bytes -> IO ()) -> IO ()
loop Maybe Bytes -> IO ()
k
  where
    loop :: (Maybe Bytes -> IO ()) -> IO ()
loop Maybe Bytes -> IO ()
k = do
        Bytes
x <- HasCallStack => BufferedInput -> IO Bytes
BufferedInput -> IO Bytes
readBuffer BufferedInput
i
        if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
x then Maybe Bytes -> IO ()
k Maybe Bytes
forall a. Maybe a
EOF else Maybe Bytes -> IO ()
k (Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just Bytes
x) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Maybe Bytes -> IO ()) -> IO ()
loop Maybe Bytes -> IO ()
k

-- | Turn a `IO` action into 'Source'
sourceFromIO :: HasCallStack => IO (Maybe a) -> Source a
{-# INLINABLE sourceFromIO #-}
sourceFromIO :: IO (Maybe a) -> Source a
sourceFromIO IO (Maybe a)
io = \ Maybe a -> IO ()
k Maybe Void
_ -> (Maybe a -> IO ()) -> IO ()
loop Maybe a -> IO ()
k
  where
    loop :: (Maybe a -> IO ()) -> IO ()
loop Maybe a -> IO ()
k = do
        Maybe a
x <- IO (Maybe a)
io
        case Maybe a
x of
            Just a
_ -> Maybe a -> IO ()
k Maybe a
x IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Maybe a -> IO ()) -> IO ()
loop Maybe a -> IO ()
k
            Maybe a
_      -> Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF

-- | Turn a UTF8 encoded 'BufferedInput' into 'BIO' source, map EOF to EOF.
--
sourceTextFromBuffered :: HasCallStack => BufferedInput -> Source T.Text
{-# INLINABLE sourceTextFromBuffered #-}
sourceTextFromBuffered :: BufferedInput -> Source Text
sourceTextFromBuffered BufferedInput
i = \ Maybe Text -> IO ()
k Maybe Void
_ -> (Maybe Text -> IO ()) -> IO ()
loop Maybe Text -> IO ()
k
  where
    loop :: (Maybe Text -> IO ()) -> IO ()
loop Maybe Text -> IO ()
k = do
        Text
x <- HasCallStack => BufferedInput -> IO Text
BufferedInput -> IO Text
readBufferText BufferedInput
i
        if Text -> Bool
T.null Text
x then Maybe Text -> IO ()
k Maybe Text
forall a. Maybe a
EOF else Maybe Text -> IO ()
k (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
x) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Maybe Text -> IO ()) -> IO ()
loop Maybe Text -> IO ()
k

-- | Turn a 'JSON' encoded 'BufferedInput' into 'BIO' source, ignoring any
-- whitespaces bewteen JSON objects. If EOF reached, then return 'EOF'.
-- Throw 'OtherError' with name "EJSON" if JSON value is not parsed or converted.
--
sourceJSONFromBuffered :: forall a. (JSON.JSON a, HasCallStack) => BufferedInput -> Source a
{-# INLINABLE sourceJSONFromBuffered #-}
sourceJSONFromBuffered :: BufferedInput -> Source a
sourceJSONFromBuffered = (Bytes -> Result DecodeError a) -> BufferedInput -> Source a
forall e a.
(HasCallStack, Print e) =>
(Bytes -> Result e a) -> BufferedInput -> Source a
sourceParseChunkFromBuffered Bytes -> Result DecodeError a
forall a. JSON a => Bytes -> Result DecodeError a
JSON.decodeChunk

-- | Turn buffered input device into a packet source, throw 'OtherError' with name @EPARSE@ if parsing fail.
sourceParserFromBuffered :: HasCallStack => P.Parser a -> BufferedInput -> Source a
{-# INLINABLE sourceParserFromBuffered #-}
sourceParserFromBuffered :: Parser a -> BufferedInput -> Source a
sourceParserFromBuffered Parser a
p = (Bytes -> Result ParseError a) -> BufferedInput -> Source a
forall e a.
(HasCallStack, Print e) =>
(Bytes -> Result e a) -> BufferedInput -> Source a
sourceParseChunkFromBuffered (Parser a -> Bytes -> Result ParseError a
forall a. Parser a -> Bytes -> Result ParseError a
P.parseChunk Parser a
p)

-- | Turn buffered input device into a packet source, throw 'OtherError' with name @EPARSE@ if parsing fail.
sourceParseChunkFromBuffered :: (HasCallStack, T.Print e)
                              => (V.Bytes -> P.Result e a) -> BufferedInput -> Source a
{-# INLINABLE sourceParseChunkFromBuffered #-}
sourceParseChunkFromBuffered :: (Bytes -> Result e a) -> BufferedInput -> Source a
sourceParseChunkFromBuffered Bytes -> Result e a
pc BufferedInput
bi = \ Maybe a -> IO ()
k Maybe Void
_ ->
    let loopA :: IO ()
loopA = do
            Bytes
bs <- HasCallStack => BufferedInput -> IO Bytes
BufferedInput -> IO Bytes
readBuffer BufferedInput
bi
            if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
bs
            then Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF
            else Bytes -> IO ()
loopB Bytes
bs
        loopB :: Bytes -> IO ()
loopB Bytes
bs = do
            (Bytes
rest, Either e a
r) <- (Bytes -> Result e a) -> ParseChunks IO e a
forall (m :: * -> *) e a.
Monad m =>
(Bytes -> Result e a) -> ParseChunks m e a
P.parseChunks Bytes -> Result e a
pc (HasCallStack => BufferedInput -> IO Bytes
BufferedInput -> IO Bytes
readBuffer BufferedInput
bi) Bytes
bs
            case Either e a
r of Right a
v -> Maybe a -> IO ()
k (a -> Maybe a
forall a. a -> Maybe a
Just a
v)
                      Left e
e  -> Text -> Text -> IO ()
forall a. HasCallStack => Text -> Text -> IO a
throwOtherError Text
"EPARSE" (e -> Text
forall a. Print a => a -> Text
T.toText e
e)
            if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
rest
            then IO ()
loopA
            else Bytes -> IO ()
loopB Bytes
rest
    in IO ()
loopA

-- | Turn a file into a 'V.Bytes' source.
initSourceFromFile :: HasCallStack => CBytes -> Resource (Source V.Bytes)
{-# INLINABLE initSourceFromFile #-}
initSourceFromFile :: CBytes -> Resource (Source Bytes)
initSourceFromFile CBytes
p = do
    File
f <- HasCallStack => CBytes -> FileFlag -> FileFlag -> Resource File
CBytes -> FileFlag -> FileFlag -> Resource File
FS.initFile CBytes
p FileFlag
FS.O_RDONLY FileFlag
FS.DEFAULT_FILE_MODE
    IO (Source Bytes) -> Resource (Source Bytes)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (HasCallStack => BufferedInput -> Source Bytes
BufferedInput -> Source Bytes
sourceFromBuffered (BufferedInput -> Source Bytes)
-> IO BufferedInput -> IO (Source Bytes)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> File -> IO BufferedInput
forall i. Input i => i -> IO BufferedInput
newBufferedInput File
f)

-- | Turn a file into a 'V.Bytes' source with given chunk size.
initSourceFromFile' :: HasCallStack => CBytes -> Int -> Resource (Source V.Bytes)
{-# INLINABLE initSourceFromFile' #-}
initSourceFromFile' :: CBytes -> Int -> Resource (Source Bytes)
initSourceFromFile' CBytes
p Int
bufSiz = do
    File
f <- HasCallStack => CBytes -> FileFlag -> FileFlag -> Resource File
CBytes -> FileFlag -> FileFlag -> Resource File
FS.initFile CBytes
p FileFlag
FS.O_RDONLY FileFlag
FS.DEFAULT_FILE_MODE
    IO (Source Bytes) -> Resource (Source Bytes)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (HasCallStack => BufferedInput -> Source Bytes
BufferedInput -> Source Bytes
sourceFromBuffered (BufferedInput -> Source Bytes)
-> IO BufferedInput -> IO (Source Bytes)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> File -> IO BufferedInput
forall i. Input i => Int -> i -> IO BufferedInput
newBufferedInput' Int
bufSiz File
f)

--------------------------------------------------------------------------------
-- Sink

-- | Turn a 'BufferedOutput' into a 'V.Bytes' sink.
sinkToBuffered :: HasCallStack => BufferedOutput -> Sink V.Bytes
{-# INLINABLE sinkToBuffered #-}
sinkToBuffered :: BufferedOutput -> Sink Bytes
sinkToBuffered BufferedOutput
bo = \ Maybe () -> IO ()
k Maybe Bytes
mbs ->
    case Maybe Bytes
mbs of
        Just Bytes
bs -> HasCallStack => BufferedOutput -> Bytes -> IO ()
BufferedOutput -> Bytes -> IO ()
writeBuffer BufferedOutput
bo Bytes
bs IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe () -> IO ()
k (() -> Maybe ()
forall a. a -> Maybe a
Just ())
        Maybe Bytes
_       -> HasCallStack => BufferedOutput -> IO ()
BufferedOutput -> IO ()
flushBuffer BufferedOutput
bo IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe () -> IO ()
k Maybe ()
forall a. Maybe a
EOF

-- | Turn a 'BufferedOutput' into a 'B.Builder' sink.
--
sinkBuilderToBuffered :: HasCallStack => BufferedOutput -> Sink (B.Builder a)
{-# INLINABLE sinkBuilderToBuffered #-}
sinkBuilderToBuffered :: BufferedOutput -> Sink (Builder a)
sinkBuilderToBuffered BufferedOutput
bo = \ Maybe () -> IO ()
k Maybe (Builder a)
mbs ->
    case Maybe (Builder a)
mbs of
        Just Builder a
bs -> BufferedOutput -> Builder a -> IO ()
forall a. HasCallStack => BufferedOutput -> Builder a -> IO ()
writeBuilder BufferedOutput
bo Builder a
bs IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe () -> IO ()
k (() -> Maybe ()
forall a. a -> Maybe a
Just ())
        Maybe (Builder a)
_       -> HasCallStack => BufferedOutput -> IO ()
BufferedOutput -> IO ()
flushBuffer BufferedOutput
bo IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe () -> IO ()
k Maybe ()
forall a. Maybe a
EOF

-- | Turn a file into a 'V.Bytes' sink.
--
-- Note the file will be opened in @'FS.O_APPEND' .|. 'FS.O_CREAT' .|. 'FS.O_WRONLY'@ mode,
-- bytes will be written after the end of the original file if there'are old bytes.
initSinkToFile :: HasCallStack => CBytes -> Resource (Sink V.Bytes)
{-# INLINABLE initSinkToFile #-}
initSinkToFile :: CBytes -> Resource (Sink Bytes)
initSinkToFile CBytes
p = do
    File
f <- HasCallStack => CBytes -> FileFlag -> FileFlag -> Resource File
CBytes -> FileFlag -> FileFlag -> Resource File
FS.initFile CBytes
p (FileFlag
FS.O_APPEND FileFlag -> FileFlag -> FileFlag
forall a. Bits a => a -> a -> a
.|. FileFlag
FS.O_CREAT FileFlag -> FileFlag -> FileFlag
forall a. Bits a => a -> a -> a
.|. FileFlag
FS.O_WRONLY) FileFlag
FS.DEFAULT_FILE_MODE
    IO (Sink Bytes) -> Resource (Sink Bytes)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (HasCallStack => BufferedOutput -> Sink Bytes
BufferedOutput -> Sink Bytes
sinkToBuffered (BufferedOutput -> Sink Bytes)
-> IO BufferedOutput -> IO (Sink Bytes)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> File -> IO BufferedOutput
forall o. Output o => o -> IO BufferedOutput
newBufferedOutput File
f)

-- | Turn an `IO` action into 'BIO' sink.
--
sinkToIO :: HasCallStack => (a -> IO ()) -> Sink a
{-# INLINABLE sinkToIO #-}
sinkToIO :: (a -> IO ()) -> Sink a
sinkToIO a -> IO ()
f = \ Maybe () -> IO ()
k Maybe a
ma ->
    case Maybe a
ma of
        Just a
a -> a -> IO ()
f a
a IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe () -> IO ()
k (() -> Maybe ()
forall a. a -> Maybe a
Just ())
        Maybe a
_ -> Maybe () -> IO ()
k Maybe ()
forall a. Maybe a
EOF

-- | Turn an `IO` action(and a flush action), into 'BIO' sink.
--
sinkToIO' :: HasCallStack => (a -> IO ()) -> IO () -> Sink a
{-# INLINABLE sinkToIO' #-}
sinkToIO' :: (a -> IO ()) -> IO () -> Sink a
sinkToIO' a -> IO ()
f IO ()
flush = \ Maybe () -> IO ()
k Maybe a
ma ->
    case Maybe a
ma of
        Just a
a -> a -> IO ()
f a
a IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe () -> IO ()
k (() -> Maybe ()
forall a. a -> Maybe a
Just ())
        Maybe a
_ -> IO ()
flush IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe () -> IO ()
k Maybe ()
forall a. Maybe a
EOF

-- | Sink to a list in memory.
--
-- The 'MVar' will be empty during sinking, and will be filled after sink receives an EOF.
sinkToList :: IO (MVar [a], Sink a)
sinkToList :: IO (MVar [a], Sink a)
sinkToList = do
    IORef [a]
xsRef <- [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef []
    MVar [a]
rRef <- IO (MVar [a])
forall a. IO (MVar a)
newEmptyMVar
    (MVar [a], Sink a) -> IO (MVar [a], Sink a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar [a]
rRef, (a -> IO ()) -> IO () -> Sink a
forall a. HasCallStack => (a -> IO ()) -> IO () -> Sink a
sinkToIO' (\ a
x -> IORef [a] -> ([a] -> [a]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef [a]
xsRef (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:))
                            (do IORef [a] -> ([a] -> [a]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef [a]
xsRef [a] -> [a]
forall a. [a] -> [a]
reverse
                                [a]
xs <- IORef [a] -> IO [a]
forall a. IORef a -> IO a
readIORef IORef [a]
xsRef
                                MVar [a] -> [a] -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar [a]
rRef [a]
xs))

--------------------------------------------------------------------------------
-- s

-- | BIO node from a pure function.
--
-- BIO node made with this funtion are stateless, thus can be reused across chains.
fromPure :: (a -> b) -> BIO a b
{-# INLINABLE fromPure #-}
fromPure :: (a -> b) -> BIO a b
fromPure a -> b
f = \ Maybe b -> IO ()
k Maybe a
x -> Maybe b -> IO ()
k (a -> b
f (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe a
x)

-- | BIO node from an IO function.
--
-- BIO node made with this funtion may not be stateless, it depends on if the IO function use
-- IO state.
fromIO :: HasCallStack => (a -> IO b) -> BIO a b
{-# INLINABLE fromIO #-}
fromIO :: (a -> IO b) -> BIO a b
fromIO a -> IO b
f = \ Maybe b -> IO ()
k Maybe a
x ->
    case Maybe a
x of Just a
x' -> a -> IO b
f a
x' IO b -> (b -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe b -> IO ()
k (Maybe b -> IO ()) -> (b -> Maybe b) -> b -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> Maybe b
forall a. a -> Maybe a
Just
              Maybe a
_ -> Maybe b -> IO ()
k Maybe b
forall a. Maybe a
EOF

-- | BIO node from a pure filter.
--
-- BIO node made with this funtion are stateless, thus can be reused across chains.
filter ::  (a -> Bool) -> BIO a a
{-# INLINABLE filter #-}
filter :: (a -> Bool) -> BIO a a
filter a -> Bool
f Maybe a -> IO ()
k = Maybe a -> IO ()
go
  where
    go :: Maybe a -> IO ()
go (Just a
a) = Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (a -> Bool
f a
a) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe a -> IO ()
k (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
    go Maybe a
Nothing = Maybe a -> IO ()
k Maybe a
forall a. Maybe a
Nothing

-- | BIO node from an impure filter.
--
-- BIO node made with this funtion may not be stateless, it depends on if the IO function use
filterIO ::  (a -> IO Bool) -> BIO a a
{-# INLINABLE filterIO #-}
filterIO :: (a -> IO Bool) -> BIO a a
filterIO a -> IO Bool
f Maybe a -> IO ()
k = Maybe a -> IO ()
go
  where
    go :: Maybe a -> IO ()
go (Just a
a) = do
        Bool
mbool <- a -> IO Bool
f a
a
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
mbool (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe a -> IO ()
k (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
    go Maybe a
Nothing = Maybe a -> IO ()
k Maybe a
forall a. Maybe a
Nothing

-- | Make a chunk size divider.
--
-- A divider size divide each chunk's size to the nearest multiplier to granularity,
-- last trailing chunk is directly returned.
newReChunk :: Int                -- ^ chunk granularity
           -> IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newReChunk #-}
newReChunk :: Int -> IO (BIO Bytes Bytes)
newReChunk Int
n = do
    IORef Bytes
trailingRef <- Bytes -> IO (IORef Bytes)
forall a. a -> IO (IORef a)
newIORef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Bytes -> IO (BIO Bytes Bytes))
-> BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall a b. (a -> b) -> a -> b
$ \ Maybe Bytes -> IO ()
k Maybe Bytes
mbs ->
        case Maybe Bytes
mbs of
            Just Bytes
bs -> do
                Bytes
trailing <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
trailingRef
                let chunk :: Bytes
chunk =  Bytes
trailing Bytes -> Bytes -> Bytes
forall (v :: * -> *) a. Vec v a => v a -> v a -> v a
`V.append` Bytes
bs
                    l :: Int
l = Bytes -> Int
forall (v :: * -> *) a. Vec v a => v a -> Int
V.length Bytes
chunk
                if Int
l Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
                then do
                    let l' :: Int
l' = Int
l Int -> Int -> Int
forall a. Num a => a -> a -> a
- (Int
l Int -> Int -> Int
forall a. Integral a => a -> a -> a
`rem` Int
n)
                        (Bytes
chunk', Bytes
rest) = Int -> Bytes -> (Bytes, Bytes)
forall (v :: * -> *) a. Vec v a => Int -> v a -> (v a, v a)
V.splitAt Int
l' Bytes
chunk
                    IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
rest
                    Maybe Bytes -> IO ()
k (Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just Bytes
chunk')
                else IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
chunk
            Maybe Bytes
_ -> do
                Bytes
trailing <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
trailingRef
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
trailing) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                    IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
                    Maybe Bytes -> IO ()
k (Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just Bytes
trailing)
                Maybe Bytes -> IO ()
k Maybe Bytes
forall a. Maybe a
EOF

-- | Read buffer and parse with 'Parser'.
--
-- This function will turn a 'Parser' into a 'BIO', throw 'OtherError' with name @EPARSE@ if parsing fail.
--
newParser :: HasCallStack => P.Parser a -> IO (BIO V.Bytes a)
{-# INLINABLE newParser #-}
newParser :: Parser a -> IO (BIO Bytes a)
newParser Parser a
p = do
    -- type LastParseState = Maybe (V.Bytes -> P.Result)
    IORef (Maybe (ParseStep ParseError a))
resultRef <- Maybe (ParseStep ParseError a)
-> IO (IORef (Maybe (ParseStep ParseError a)))
forall a. a -> IO (IORef a)
newIORef Maybe (ParseStep ParseError a)
forall a. Maybe a
EOF
    BIO Bytes a -> IO (BIO Bytes a)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes a -> IO (BIO Bytes a))
-> BIO Bytes a -> IO (BIO Bytes a)
forall a b. (a -> b) -> a -> b
$ \ Maybe a -> IO ()
k Maybe Bytes
mbs -> do
        let loop :: ParseStep ParseError a -> Bytes -> IO ()
loop ParseStep ParseError a
f Bytes
chunk = case ParseStep ParseError a
f Bytes
chunk of
                P.Success a
a Bytes
trailing -> do
                    Maybe a -> IO ()
k (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
                    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
trailing) (ParseStep ParseError a -> Bytes -> IO ()
loop ParseStep ParseError a
f Bytes
trailing)
                P.Partial ParseStep ParseError a
f' ->
                    IORef (Maybe (ParseStep ParseError a))
-> Maybe (ParseStep ParseError a) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe (ParseStep ParseError a))
resultRef (ParseStep ParseError a -> Maybe (ParseStep ParseError a)
forall a. a -> Maybe a
Just ParseStep ParseError a
f')
                P.Failure ParseError
e Bytes
_ ->
                    Text -> Text -> IO ()
forall a. HasCallStack => Text -> Text -> IO a
throwOtherError Text
"EPARSE" (ParseError -> Text
forall a. Print a => a -> Text
T.toText ParseError
e)
        Maybe (ParseStep ParseError a)
lastResult <- IORef (Maybe (ParseStep ParseError a))
-> IO (Maybe (ParseStep ParseError a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (ParseStep ParseError a))
resultRef
        case Maybe Bytes
mbs of
            Just Bytes
bs -> do
                let f :: ParseStep ParseError a
f = case Maybe (ParseStep ParseError a)
lastResult of
                        Just ParseStep ParseError a
x    -> ParseStep ParseError a
x
                        Maybe (ParseStep ParseError a)
_         -> Parser a -> ParseStep ParseError a
forall a. Parser a -> Bytes -> Result ParseError a
P.parseChunk Parser a
p
                ParseStep ParseError a -> Bytes -> IO ()
loop ParseStep ParseError a
f Bytes
bs
            Maybe Bytes
_ ->
                case Maybe (ParseStep ParseError a)
lastResult of
                    Just ParseStep ParseError a
f -> ParseStep ParseError a -> Bytes -> IO ()
loop ParseStep ParseError a
f Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
                    Maybe (ParseStep ParseError a)
_ -> Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF

-- | Make a new UTF8 decoder, which decode bytes streams into text streams.
--
-- If there're invalid UTF8 bytes, an 'OtherError' with name 'EINVALIDUTF8' will be thrown.`
--
-- Note this node is supposed to be used with preprocess node such as decompressor, parser, etc.
-- where bytes boundary cannot be controlled, UTF8 decoder will concat trailing bytes from last block to next one.
-- Use this node directly with 'sourceFromBuffered' will not be as efficient as directly use
-- 'sourceTextFromBuffered', because 'BufferedInput' provides push back capability,
-- trailing bytes can be pushed back to reading buffer then returned with next block input together.
--
newUTF8Decoder :: HasCallStack => IO (BIO V.Bytes T.Text)
{-# INLINABLE newUTF8Decoder #-}
newUTF8Decoder :: IO (BIO Bytes Text)
newUTF8Decoder = do
    IORef Bytes
trailingRef <- Bytes -> IO (IORef Bytes)
forall a. a -> IO (IORef a)
newIORef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
    BIO Bytes Text -> IO (BIO Bytes Text)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Text -> IO (BIO Bytes Text))
-> BIO Bytes Text -> IO (BIO Bytes Text)
forall a b. (a -> b) -> a -> b
$ \ Maybe Text -> IO ()
k Maybe Bytes
mbs -> do
        case Maybe Bytes
mbs of
            Just Bytes
bs -> do
                Bytes
trailing <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
trailingRef
                let chunk :: Bytes
chunk =  Bytes
trailing Bytes -> Bytes -> Bytes
forall (v :: * -> *) a. Vec v a => v a -> v a -> v a
`V.append` Bytes
bs
                    (V.PrimVector PrimArray Word8
arr Int
s Int
l) = Bytes
chunk
                if Int
l Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 Bool -> Bool -> Bool
&& PrimArray Word8 -> Int -> Int
T.decodeCharLen PrimArray Word8
arr Int
s Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
l
                then do
                    let (Int
i, Maybe Word8
_) = (Word8 -> Bool) -> Bytes -> (Int, Maybe Word8)
forall (v :: * -> *) a.
Vec v a =>
(a -> Bool) -> v a -> (Int, Maybe a)
V.findR (\ Word8
w -> Word8
w Word8 -> Word8 -> Bool
forall a. Ord a => a -> a -> Bool
>= Word8
0b11000000 Bool -> Bool -> Bool
|| Word8
w Word8 -> Word8 -> Bool
forall a. Ord a => a -> a -> Bool
<= Word8
0b01111111) Bytes
chunk
                    if (Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== -Int
1)
                    then Text -> Text -> IO ()
forall a. HasCallStack => Text -> Text -> IO a
throwOtherError Text
"EINVALIDUTF8" Text
"invalid UTF8 bytes"
                    else do
                        if PrimArray Word8 -> Int -> Int
T.decodeCharLen PrimArray Word8
arr (Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
i) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
l Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
i
                        then do
                            IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef (IArray PrimVector Word8 -> Int -> Int -> Bytes
forall (v :: * -> *) a. Vec v a => IArray v a -> Int -> Int -> v a
V.fromArr IArray PrimVector Word8
PrimArray Word8
arr (Int
sInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
i) (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
i))
                            Maybe Text -> IO ()
k (Text -> Maybe Text
forall a. a -> Maybe a
Just (HasCallStack => Bytes -> Text
Bytes -> Text
T.validate (IArray PrimVector Word8 -> Int -> Int -> Bytes
forall (v :: * -> *) a. Vec v a => IArray v a -> Int -> Int -> v a
V.fromArr IArray PrimVector Word8
PrimArray Word8
arr Int
s Int
i)))
                        else do
                            IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
                            Maybe Text -> IO ()
k (Text -> Maybe Text
forall a. a -> Maybe a
Just (HasCallStack => Bytes -> Text
Bytes -> Text
T.validate Bytes
chunk))
                else IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
chunk

            Maybe Bytes
_ -> do
                Bytes
trailing <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
trailingRef
                if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
trailing
                then Maybe Text -> IO ()
k Maybe Text
forall a. Maybe a
EOF
                else Text -> Text -> IO ()
forall a. HasCallStack => Text -> Text -> IO a
throwOtherError Text
"EINVALIDUTF8" Text
"invalid UTF8 bytes"

-- | Make a new stream splitter based on magic byte.
--
newMagicSplitter :: Word8 -> IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newMagicSplitter #-}
newMagicSplitter :: Word8 -> IO (BIO Bytes Bytes)
newMagicSplitter Word8
magic = do
    IORef Bytes
trailingRef <- Bytes -> IO (IORef Bytes)
forall a. a -> IO (IORef a)
newIORef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Bytes -> IO (BIO Bytes Bytes))
-> BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall a b. (a -> b) -> a -> b
$ \ Maybe Bytes -> IO ()
k Maybe Bytes
mx ->
        case Maybe Bytes
mx of
            Just Bytes
bs -> do
                Bytes
trailing <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
trailingRef
                let loop :: Bytes -> IO ()
loop Bytes
chunk = case Word8 -> Bytes -> Maybe Int
forall (v :: * -> *) a. (Vec v a, Eq a) => a -> v a -> Maybe Int
V.elemIndex Word8
magic Bytes
chunk of
                        Just Int
i -> do
                            -- TODO: looping
                            let (Bytes
line, Bytes
rest) = Int -> Bytes -> (Bytes, Bytes)
forall (v :: * -> *) a. Vec v a => Int -> v a -> (v a, v a)
V.splitAt (Int
iInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1) Bytes
chunk
                            Maybe Bytes -> IO ()
k (Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just Bytes
line)
                            Bytes -> IO ()
loop Bytes
rest
                        Maybe Int
_ -> IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
chunk
                Bytes -> IO ()
loop (Bytes
trailing Bytes -> Bytes -> Bytes
forall (v :: * -> *) a. Vec v a => v a -> v a -> v a
`V.append` Bytes
bs)
            Maybe Bytes
_ -> do
                Bytes
chunk <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
trailingRef
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
chunk) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                    IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
                    Maybe Bytes -> IO ()
k (Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just Bytes
chunk)
                Maybe Bytes -> IO ()
k Maybe Bytes
forall a. Maybe a
EOF

-- | Make a new stream splitter based on linefeed(@\r\n@ or @\n@).
--
-- The result bytes doesn't contain linefeed.
newLineSplitter :: IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newLineSplitter #-}
newLineSplitter :: IO (BIO Bytes Bytes)
newLineSplitter = do
    BIO Bytes Bytes
s <- Word8 -> IO (BIO Bytes Bytes)
newMagicSplitter Word8
10
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Bytes
s BIO Bytes Bytes -> BIO Bytes Bytes -> BIO Bytes Bytes
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Bytes -> Bytes) -> BIO Bytes Bytes
forall a b. (a -> b) -> BIO a b
fromPure Bytes -> Bytes
forall a. (Prim a, Eq a, Num a) => PrimVector a -> PrimVector a
dropLineEnd)
  where
    dropLineEnd :: PrimVector a -> PrimVector a
dropLineEnd bs :: PrimVector a
bs@(V.PrimVector PrimArray a
arr Int
s Int
l) =
        case PrimVector a
bs PrimVector a -> Int -> Maybe a
forall (v :: * -> *) a. Vec v a => v a -> Int -> Maybe a
`V.indexMaybe` (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
2) of
            Just a
r | a
r a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
13   -> PrimArray a -> Int -> Int -> PrimVector a
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray a
arr Int
s (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
2)
                   | Bool
otherwise -> PrimArray a -> Int -> Int -> PrimVector a
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray a
arr Int
s (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1)
            Maybe a
_ | PrimVector a -> a
forall (v :: * -> *) a. (Vec v a, HasCallStack) => v a -> a
V.head PrimVector a
bs a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
10 -> PrimArray a -> Int -> Int -> PrimVector a
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray a
arr Int
s (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1)
              | Bool
otherwise -> PrimArray a -> Int -> Int -> PrimVector a
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray a
arr Int
s Int
l

-- | Make a new base64 encoder node.
newBase64Encoder :: IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newBase64Encoder #-}
newBase64Encoder :: IO (BIO Bytes Bytes)
newBase64Encoder = do
    BIO Bytes Bytes
re <- Int -> IO (BIO Bytes Bytes)
newReChunk Int
3
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Bytes
re BIO Bytes Bytes -> BIO Bytes Bytes -> BIO Bytes Bytes
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Bytes -> Bytes) -> BIO Bytes Bytes
forall a b. (a -> b) -> BIO a b
fromPure Bytes -> Bytes
base64Encode)

-- | Make a new base64 decoder node.
newBase64Decoder :: HasCallStack => IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newBase64Decoder #-}
newBase64Decoder :: IO (BIO Bytes Bytes)
newBase64Decoder = do
    BIO Bytes Bytes
re <- Int -> IO (BIO Bytes Bytes)
newReChunk Int
4
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Bytes
re BIO Bytes Bytes -> BIO Bytes Bytes -> BIO Bytes Bytes
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Bytes -> Bytes) -> BIO Bytes Bytes
forall a b. (a -> b) -> BIO a b
fromPure HasCallStack => Bytes -> Bytes
Bytes -> Bytes
base64Decode')

-- | Make a hex encoder node.
--
-- Hex encoder is stateless, it can be reused across chains.
hexEncode :: Bool   -- ^ uppercase?
          -> BIO V.Bytes V.Bytes
{-# INLINABLE hexEncode #-}
hexEncode :: Bool -> BIO Bytes Bytes
hexEncode Bool
upper = (Bytes -> Bytes) -> BIO Bytes Bytes
forall a b. (a -> b) -> BIO a b
fromPure (Bool -> Bytes -> Bytes
Hex.hexEncode Bool
upper)

-- | Make a new hex decoder node.
newHexDecoder :: IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newHexDecoder #-}
newHexDecoder :: IO (BIO Bytes Bytes)
newHexDecoder = do
    BIO Bytes Bytes
re <- Int -> IO (BIO Bytes Bytes)
newReChunk Int
2
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Bytes
re BIO Bytes Bytes -> BIO Bytes Bytes -> BIO Bytes Bytes
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Bytes -> Bytes) -> BIO Bytes Bytes
forall a b. (a -> b) -> BIO a b
fromPure HasCallStack => Bytes -> Bytes
Bytes -> Bytes
Hex.hexDecode')

-- | Make a new BIO node which counts items flow throught it.
--
-- 'Counter' is increased atomically, it's safe to read \/ reset the counter from other threads.
counter :: Counter -> BIO a a
{-# INLINABLE counter #-}
counter :: Counter -> BIO a a
counter Counter
c = (a -> IO a) -> BIO a a
forall a b. HasCallStack => (a -> IO b) -> BIO a b
fromIO a -> IO a
inc
  where
    inc :: a -> IO a
inc a
x = do
        Counter -> Int -> IO ()
atomicAddCounter_ Counter
c Int
1
        a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x

-- | Make a new BIO node which counts items, and label item with a sequence number.
--
-- 'Counter' is increased atomically, it's safe to read \/ reset the counter from other threads.
seqNum :: Counter -> BIO a (Int, a)
{-# INLINABLE seqNum #-}
seqNum :: Counter -> BIO a (Int, a)
seqNum Counter
c = (a -> IO (Int, a)) -> BIO a (Int, a)
forall a b. HasCallStack => (a -> IO b) -> BIO a b
fromIO a -> IO (Int, a)
inc
  where
    inc :: a -> IO (Int, a)
inc a
x = do
        Int
i <- Counter -> Int -> IO Int
atomicAddCounter Counter
c Int
1
        (Int, a) -> IO (Int, a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
i, a
x)

-- | Make a BIO node grouping items into fixed size arrays.
--
-- Trailing items are directly returned.
newGrouping :: V.Vec v a => Int -> IO (BIO a (v a))
{-# INLINABLE newGrouping #-}
newGrouping :: Int -> IO (BIO a (v a))
newGrouping Int
n
    | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 =  Int -> IO (BIO a (v a))
forall (v :: * -> *) a. Vec v a => Int -> IO (BIO a (v a))
newGrouping Int
1
    | Bool
otherwise = do
        Counter
c <- Int -> IO Counter
newCounter Int
0
        IORef (MArr (IArray v) RealWorld a)
arrRef <- MArr (IArray v) RealWorld a
-> IO (IORef (MArr (IArray v) RealWorld a))
forall a. a -> IO (IORef a)
newIORef (MArr (IArray v) RealWorld a
 -> IO (IORef (MArr (IArray v) RealWorld a)))
-> IO (MArr (IArray v) RealWorld a)
-> IO (IORef (MArr (IArray v) RealWorld a))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> IO (MArr (IArray v) RealWorld a)
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s, HasCallStack) =>
Int -> m (MArr arr s a)
A.newArr Int
n
        BIO a (v a) -> IO (BIO a (v a))
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO a (v a) -> IO (BIO a (v a)))
-> BIO a (v a) -> IO (BIO a (v a))
forall a b. (a -> b) -> a -> b
$ \ Maybe (v a) -> IO ()
k Maybe a
mx ->
            case Maybe a
mx of
                Just a
x -> do
                    Int
i <- PrimRef (PrimState IO) Int -> IO Int
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
PrimRef (PrimState m) a -> m a
readPrimRef Counter
PrimRef (PrimState IO) Int
c
                    if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
                    then do
                        MArr (IArray v) RealWorld a
marr <- IORef (MArr (IArray v) RealWorld a)
-> IO (MArr (IArray v) RealWorld a)
forall a. IORef a -> IO a
readIORef IORef (MArr (IArray v) RealWorld a)
arrRef
                        MArr (IArray v) RealWorld a -> Int -> a -> IO ()
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s, HasCallStack) =>
MArr arr s a -> Int -> a -> m ()
A.writeArr MArr (IArray v) RealWorld a
marr Int
i a
x
                        PrimRef (PrimState IO) Int -> Int -> IO ()
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
PrimRef (PrimState m) a -> a -> m ()
writePrimRef Counter
PrimRef (PrimState IO) Int
c Int
0
                        IORef (MArr (IArray v) RealWorld a)
-> MArr (IArray v) RealWorld a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (MArr (IArray v) RealWorld a)
arrRef (MArr (IArray v) RealWorld a -> IO ())
-> IO (MArr (IArray v) RealWorld a) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> IO (MArr (IArray v) RealWorld a)
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s, HasCallStack) =>
Int -> m (MArr arr s a)
A.newArr Int
n
                        IArray v a
arr <- MArr (IArray v) RealWorld a -> IO (IArray v a)
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> m (arr a)
A.unsafeFreezeArr MArr (IArray v) RealWorld a
marr
                        Maybe (v a) -> IO ()
k (Maybe (v a) -> IO ()) -> (v a -> Maybe (v a)) -> v a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. v a -> Maybe (v a)
forall a. a -> Maybe a
Just (v a -> IO ()) -> v a -> IO ()
forall a b. (a -> b) -> a -> b
$! IArray v a -> Int -> Int -> v a
forall (v :: * -> *) a. Vec v a => IArray v a -> Int -> Int -> v a
V.fromArr IArray v a
arr Int
0 Int
n
                    else do
                        MArr (IArray v) RealWorld a
marr <- IORef (MArr (IArray v) RealWorld a)
-> IO (MArr (IArray v) RealWorld a)
forall a. IORef a -> IO a
readIORef IORef (MArr (IArray v) RealWorld a)
arrRef
                        MArr (IArray v) RealWorld a -> Int -> a -> IO ()
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s, HasCallStack) =>
MArr arr s a -> Int -> a -> m ()
A.writeArr MArr (IArray v) RealWorld a
marr Int
i a
x
                        PrimRef (PrimState IO) Int -> Int -> IO ()
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
PrimRef (PrimState m) a -> a -> m ()
writePrimRef Counter
PrimRef (PrimState IO) Int
c (Int
iInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
                Maybe a
_ -> do
                    Int
i <- PrimRef (PrimState IO) Int -> IO Int
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
PrimRef (PrimState m) a -> m a
readPrimRef Counter
PrimRef (PrimState IO) Int
c
                    if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0
                    then do
                        PrimRef (PrimState IO) Int -> Int -> IO ()
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
PrimRef (PrimState m) a -> a -> m ()
writePrimRef Counter
PrimRef (PrimState IO) Int
c Int
0
                        MArr (IArray v) RealWorld a
marr <- IORef (MArr (IArray v) RealWorld a)
-> IO (MArr (IArray v) RealWorld a)
forall a. IORef a -> IO a
readIORef IORef (MArr (IArray v) RealWorld a)
arrRef
                        MArr (IArray v) RealWorld a -> Int -> IO ()
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s, HasCallStack) =>
MArr arr s a -> Int -> m ()
A.shrinkMutableArr MArr (IArray v) RealWorld a
marr Int
i
                        IArray v a
arr <- MArr (IArray v) RealWorld a -> IO (IArray v a)
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> m (arr a)
A.unsafeFreezeArr MArr (IArray v) RealWorld a
marr
                        Maybe (v a) -> IO ()
k (Maybe (v a) -> IO ()) -> (v a -> Maybe (v a)) -> v a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. v a -> Maybe (v a)
forall a. a -> Maybe a
Just (v a -> IO ()) -> v a -> IO ()
forall a b. (a -> b) -> a -> b
$! IArray v a -> Int -> Int -> v a
forall (v :: * -> *) a. Vec v a => IArray v a -> Int -> Int -> v a
V.fromArr IArray v a
arr Int
0 Int
i
                    else Maybe (v a) -> IO ()
k Maybe (v a)
forall a. Maybe a
EOF

-- | A BIO node flatten items.
--
ungrouping :: BIO (V.Vector a) a
{-# INLINABLE ungrouping #-}
ungrouping :: BIO (Vector a) a
ungrouping = \ Maybe a -> IO ()
k Maybe (Vector a)
mx ->
    case Maybe (Vector a)
mx of
        Just Vector a
x -> (a -> IO ()) -> Vector a -> IO ()
forall (v :: * -> *) a (f :: * -> *) b.
(Vec v a, Applicative f) =>
(a -> f b) -> v a -> f ()
V.traverse_ (Maybe a -> IO ()
k (Maybe a -> IO ()) -> (a -> Maybe a) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just) Vector a
x
        Maybe (Vector a)
_      -> Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF

-- | A BIO node which write 'True' to 'IORef' when 'EOF' is reached.
consumed :: TVar Bool -> BIO a a
{-# INLINABLE consumed #-}
consumed :: TVar Bool -> BIO a a
consumed TVar Bool
ref = \ Maybe a -> IO ()
k Maybe a
mx -> case Maybe a
mx of
    Just a
_ -> Maybe a -> IO ()
k Maybe a
mx
    Maybe a
_ -> do STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
ref Bool
True)
            Maybe a -> IO ()
k Maybe a
forall a. Maybe a
EOF