{-# OPTIONS_GHC -Wno-missing-fields #-}
{-|
Module      : Z.IO.BIO
Description : Buffered IO interface
Copyright   : (c) Dong Han, 2017-2020
License     : BSD
Maintainer  : winterland1989@gmail.com
Stability   : experimental
Portability : non-portable

This module provides 'BIO' (block IO) type to facilitate writing streaming programs. A 'BIO' node usually:

  * Process input in unit of block(or item).
  * Running in constant spaces, which means the memory usage won't accumulate.
  * Keep some state in IO, which is sealed in 'BIO' closure.

Some examples of such nodes are:

  * Compressor \/ decompressor, e.g. zlib, etc.
  * Codec, e.g. utf8 codec, base64 codec.
  * Ciphers.
  * Packet parsers.

We use @BIO inp out@ type to represent all the objects above, @BIO Void out@ to represent an 'IO' source,
and @BIO inp Void@ to represent an 'IO' sink, which can all be connected with '>|>' to build a larger 'BIO' node.

@
import Z.Data.CBytes    (CBytes)
import Z.IO
import Z.IO.BIO
import Z.IO.BIO.Zlib

base64AndCompressFile :: HasCallStack => CBytes -> CBytes -> IO ()
base64AndCompressFile origin target = do
    base64Enc <- newBase64Encoder
    (_, zlibCompressor) <- newCompress defaultCompressConfig{compressWindowBits = 31}

    withResource (initSourceFromFile origin) $ \ src ->
        withResource (initSinkToFile target) $ \ sink ->
            runBIO $ src >|> base64Enc >|> zlibCompressor >|> sink

> base64AndCompressFile "test" "test.gz"
-- run 'zcat "test.gz" | base64 -d' will give you original file
@

-}
module Z.IO.BIO (
  -- * The BIO type
    BIO(..), Source, Sink
  -- ** Basic combinators
  , (>|>), (>~>), (>!>), appendSource
  , concatSource, zipSource, zipBIO
  , joinSink, fuseSink
  -- * Run BIO chain
  , runBIO
  , runSource, runSource_
  , runBlock, runBlock_, unsafeRunBlock
  , runBlocks, runBlocks_, unsafeRunBlocks
  -- * Make new BIO
  , pureBIO, ioBIO
  -- ** Source
  , sourceFromIO
  , sourceFromList
  , initSourceFromFile
  , sourceFromBuffered
  , sourceTextFromBuffered
  , sourceJSONFromBuffered
  , sourceParserFromBuffered
  , sourceParseChunksFromBuffered
  -- ** Sink
  , sinkToIO
  , sinkToList
  , initSinkToFile
  , sinkToBuffered
  , sinkBuilderToBuffered
  -- ** Bytes specific
  , newParserNode, newReChunk, newUTF8Decoder, newMagicSplitter, newLineSplitter
  , newBase64Encoder, newBase64Decoder
  , hexEncoder, newHexDecoder
  -- ** Generic BIO
  , newCounterNode
  , newSeqNumNode
  , newGroupingNode
  ) where

import           Control.Monad
import           Control.Monad.IO.Class
import           Data.Bits              ((.|.))
import           Data.IORef
import qualified Data.List              as List
import           Data.Sequence          (Seq (..))
import qualified Data.Sequence          as Seq
import           Data.Void
import           Data.Word
import           System.IO.Unsafe       (unsafePerformIO)
import qualified Z.Data.Array           as A
import qualified Z.Data.Builder         as B
import           Z.Data.CBytes          (CBytes)
import qualified Z.Data.JSON            as JSON
import qualified Z.Data.Parser          as P
import           Z.Data.PrimRef
import qualified Z.Data.Text            as T
import qualified Z.Data.Text.UTF8Codec  as T
import qualified Z.Data.Vector          as V
import qualified Z.Data.Vector.Base     as V
import           Z.Data.Vector.Base64
import           Z.Data.Vector.Hex
import           Z.IO.Buffered
import           Z.IO.Exception
import qualified Z.IO.FileSystem.Base   as FS
import           Z.IO.Resource

-- | A 'BIO'(blocked IO) node.
--
-- A 'BIO' node consist of two functions: 'push' and 'pull'. It can be used to describe different kinds of IO
-- devices:
--
--  * @BIO inp out@ describe an IO state machine(e.g. z_stream in zlib),
--    which takes some input in block, then outputs.
--  * @type Source out = BIO Void out@ described an IO source, which never takes input,
--    but gives output until EOF when 'pull'ed.
--  * @type Sink inp = BIO inp Void@ described an IO sink, which takes input and perform some IO effects,
--    such as writing to terminal or files.
--
-- You can connect these 'BIO' nodes with '>|>', which connect left node's output to right node's input,
-- and return a new 'BIO' node with left node's input type and right node's output type.
--
-- You can run a 'BIO' node in different ways:
--
--   * 'runBIO' will continuously pull value from source, push to sink until source reaches EOF.
--   * 'runSource' will continuously pull value from source, and perform effects along the way.
--   * 'runBlock' will supply a single block of input as whole input, and return output if there's any.
--   * 'runBlocks' will supply a list of blocks as whole input, and return a list of output blocks.
--
-- Note 'BIO' usually contains some IO states, you can consider it as an opaque 'IORef':
--
--   * You shouldn't use a 'BIO' node across multiple 'BIO' chain unless the state can be reset.
--   * You shouldn't use a 'BIO' node across multiple threads unless document states otherwise.
--
-- 'BIO' is simply a convenient way to construct single-thread streaming computation, to use 'BIO'
-- in multiple threads, check "Z.IO.BIO.Concurrent" module.
--
data BIO inp out = BIO
    { BIO inp out -> inp -> IO (Maybe out)
push :: inp -> IO (Maybe out)
      -- ^ Push a block of input, perform some effect, and return output,
      -- if input is not enough to produce any output yet, return 'Nothing'.
    , BIO inp out -> IO (Maybe out)
pull :: IO (Maybe out)
      -- ^ When input reaches EOF, there may be a finalize stage to output
      -- trailing output blocks. return 'Nothing' to indicate current node
      -- reaches EOF too.
    }

-- | Type alias for 'BIO' node which never takes input.
--
-- 'push' is not available by type system, and 'pull' return 'Nothing' when
-- reaches EOF.
type Source out = BIO Void out

-- | Type alias for 'BIO' node which only takes input and perform effects.
--
-- 'push' doesn't produce any meaningful output, and 'pull' usually does a flush.
type Sink inp = BIO inp Void

instance Functor (BIO inp) where
    {-# INLINABLE fmap #-}
    fmap :: (a -> b) -> BIO inp a -> BIO inp b
fmap a -> b
f BIO{IO (Maybe a)
inp -> IO (Maybe a)
pull :: IO (Maybe a)
push :: inp -> IO (Maybe a)
pull :: forall inp out. BIO inp out -> IO (Maybe out)
push :: forall inp out. BIO inp out -> inp -> IO (Maybe out)
..} = (inp -> IO (Maybe b)) -> IO (Maybe b) -> BIO inp b
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO inp -> IO (Maybe b)
push_ IO (Maybe b)
pull_
      where
        push_ :: inp -> IO (Maybe b)
push_ inp
inp = do
            Maybe a
r <- inp -> IO (Maybe a)
push inp
inp
            Maybe b -> IO (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe b -> IO (Maybe b)) -> Maybe b -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$! (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f Maybe a
r
        pull_ :: IO (Maybe b)
pull_ = do
            Maybe a
r <- IO (Maybe a)
pull
            Maybe b -> IO (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe b -> IO (Maybe b)) -> Maybe b -> IO (Maybe b)
forall a b. (a -> b) -> a -> b
$! (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f Maybe a
r

infixl 3 >|>
infixl 3 >~>

-- | Connect two 'BIO' nodes, feed left one's output to right one's input.
(>|>) :: HasCallStack => BIO a b -> BIO b c -> BIO a c
{-# INLINE (>|>) #-}
BIO a -> IO (Maybe b)
pushA IO (Maybe b)
pullA >|> :: BIO a b -> BIO b c -> BIO a c
>|> BIO b -> IO (Maybe c)
pushB IO (Maybe c)
pullB = (a -> IO (Maybe c)) -> IO (Maybe c) -> BIO a c
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO a -> IO (Maybe c)
push_ IO (Maybe c)
pull_
  where
    push_ :: a -> IO (Maybe c)
push_ a
inp = do
        Maybe b
x <- a -> IO (Maybe b)
pushA a
inp
        case Maybe b
x of Just b
x' -> b -> IO (Maybe c)
pushB b
x'
                  Maybe b
_       -> Maybe c -> IO (Maybe c)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe c
forall a. Maybe a
Nothing
    pull_ :: IO (Maybe c)
pull_ = do
        Maybe b
x <- IO (Maybe b)
pullA
        case Maybe b
x of
            Just b
x' -> do
                Maybe c
y <- b -> IO (Maybe c)
pushB b
x'
                case Maybe c
y of Maybe c
Nothing -> IO (Maybe c)
pull_  -- draw input from A until there's an output from B
                          Maybe c
_       -> Maybe c -> IO (Maybe c)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe c
y
            Maybe b
_       -> IO (Maybe c)
pullB

-- | Flipped 'fmap' for easier chaining.
(>~>) :: BIO a b -> (b -> c) -> BIO a c
{-# INLINE (>~>) #-}
>~> :: BIO a b -> (b -> c) -> BIO a c
(>~>) = ((b -> c) -> BIO a b -> BIO a c) -> BIO a b -> (b -> c) -> BIO a c
forall a b c. (a -> b -> c) -> b -> a -> c
flip (b -> c) -> BIO a b -> BIO a c
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap

-- | Connect BIO to an effectful function.
(>!>) :: HasCallStack => BIO a b -> (b -> IO c) -> BIO a c
{-# INLINE (>!>) #-}
>!> :: BIO a b -> (b -> IO c) -> BIO a c
(>!>) BIO{IO (Maybe b)
a -> IO (Maybe b)
pull :: IO (Maybe b)
push :: a -> IO (Maybe b)
pull :: forall inp out. BIO inp out -> IO (Maybe out)
push :: forall inp out. BIO inp out -> inp -> IO (Maybe out)
..} b -> IO c
f = (a -> IO (Maybe c)) -> IO (Maybe c) -> BIO a c
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO a -> IO (Maybe c)
push_ IO (Maybe c)
pull_
  where
    push_ :: a -> IO (Maybe c)
push_ a
x = a -> IO (Maybe b)
push a
x IO (Maybe b) -> (Maybe b -> IO (Maybe c)) -> IO (Maybe c)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ Maybe b
r ->
        case Maybe b
r of Just b
r' -> c -> Maybe c
forall a. a -> Maybe a
Just (c -> Maybe c) -> IO c -> IO (Maybe c)
forall (m :: * -> *) a b. Monad m => (a -> b) -> m a -> m b
<$!> b -> IO c
f b
r'
                  Maybe b
_       -> Maybe c -> IO (Maybe c)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe c
forall a. Maybe a
Nothing
    pull_ :: IO (Maybe c)
pull_ = IO (Maybe b)
pull IO (Maybe b) -> (Maybe b -> IO (Maybe c)) -> IO (Maybe c)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ Maybe b
r ->
        case Maybe b
r of Just b
r' -> c -> Maybe c
forall a. a -> Maybe a
Just (c -> Maybe c) -> IO c -> IO (Maybe c)
forall (m :: * -> *) a b. Monad m => (a -> b) -> m a -> m b
<$!> b -> IO c
f b
r'
                  Maybe b
_       -> Maybe c -> IO (Maybe c)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe c
forall a. Maybe a
Nothing

-- | Connect two 'BIO' source, after first reach EOF, draw element from second.
appendSource :: HasCallStack => Source a -> Source a  -> IO (Source a)
{-# INLINE appendSource #-}
Source a
b1 appendSource :: Source a -> Source a -> IO (Source a)
`appendSource` Source a
b2 = [Source a] -> IO (Source a)
forall a. HasCallStack => [Source a] -> IO (Source a)
concatSource [Source a
b1, Source a
b2]

-- | Fuse two 'BIO' sinks, i.e. everything written to the fused sink will be written to left and right sink.
--
-- Flush result 'BIO' will effectively flush both sink.
joinSink :: HasCallStack => Sink out -> Sink out -> Sink out
{-# INLINE joinSink #-}
Sink out
b1 joinSink :: Sink out -> Sink out -> Sink out
`joinSink` Sink out
b2 = [Sink out] -> Sink out
forall out. HasCallStack => [Sink out] -> Sink out
fuseSink [Sink out
b1, Sink out
b2]

-- | Fuse a list of 'BIO' sinks, everything written to the fused sink will be written to every sink in the list.
--
-- Flush result 'BIO' will effectively flush every sink in the list.
fuseSink :: HasCallStack => [Sink out] -> Sink out
{-# INLINABLE fuseSink #-}
fuseSink :: [Sink out] -> Sink out
fuseSink [Sink out]
ss = (out -> IO (Maybe Void)) -> IO (Maybe Void) -> Sink out
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO out -> IO (Maybe Void)
push_ IO (Maybe Void)
pull_
  where
    push_ :: out -> IO (Maybe Void)
push_ out
inp = [Sink out] -> (Sink out -> IO (Maybe Void)) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Sink out]
ss (\ Sink out
b -> Sink out -> out -> IO (Maybe Void)
forall inp out. BIO inp out -> inp -> IO (Maybe out)
push Sink out
b out
inp) IO () -> IO (Maybe Void) -> IO (Maybe Void)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO (Maybe Void)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Void
forall a. Maybe a
Nothing
    pull_ :: IO (Maybe Void)
pull_ = (Sink out -> IO (Maybe Void)) -> [Sink out] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Sink out -> IO (Maybe Void)
forall inp out. BIO inp out -> IO (Maybe out)
pull [Sink out]
ss IO () -> IO (Maybe Void) -> IO (Maybe Void)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO (Maybe Void)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Void
forall a. Maybe a
Nothing

-- | Connect list of 'BIO' sources, after one reach EOF, draw element from next.
concatSource :: HasCallStack => [Source a] -> IO (Source a)
{-# INLINABLE concatSource #-}
concatSource :: [Source a] -> IO (Source a)
concatSource [Source a]
ss0 = [Source a] -> IO (IORef [Source a])
forall a. a -> IO (IORef a)
newIORef [Source a]
ss0 IO (IORef [Source a])
-> (IORef [Source a] -> IO (Source a)) -> IO (Source a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ IORef [Source a]
ref -> Source a -> IO (Source a)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO :: forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO{ pull :: IO (Maybe a)
pull = IORef [Source a] -> IO (Maybe a)
forall inp a. IORef [BIO inp a] -> IO (Maybe a)
loop IORef [Source a]
ref})
  where
    loop :: IORef [BIO inp a] -> IO (Maybe a)
loop IORef [BIO inp a]
ref = do
        [BIO inp a]
ss <- IORef [BIO inp a] -> IO [BIO inp a]
forall a. IORef a -> IO a
readIORef IORef [BIO inp a]
ref
        case [BIO inp a]
ss of
            []       -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
            (BIO inp a
s:[BIO inp a]
rest) -> do
                Maybe a
r <- BIO inp a -> IO (Maybe a)
forall inp out. BIO inp out -> IO (Maybe out)
pull BIO inp a
s
                case Maybe a
r of
                    Just a
_ -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
r
                    Maybe a
_      -> IORef [BIO inp a] -> [BIO inp a] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [BIO inp a]
ref [BIO inp a]
rest IO () -> IO (Maybe a) -> IO (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef [BIO inp a] -> IO (Maybe a)
loop IORef [BIO inp a]
ref

-- | Zip two 'BIO' source into one, reach EOF when either one reached EOF.
zipSource :: HasCallStack => Source a -> Source b -> IO (Source (a,b))
{-# INLINABLE zipSource #-}
zipSource :: Source a -> Source b -> IO (Source (a, b))
zipSource (BIO Void -> IO (Maybe a)
_ IO (Maybe a)
pullA) (BIO Void -> IO (Maybe b)
_ IO (Maybe b)
pullB) = do
    IORef Bool
finRef <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
    Source (a, b) -> IO (Source (a, b))
forall (m :: * -> *) a. Monad m => a -> m a
return (Source (a, b) -> IO (Source (a, b)))
-> Source (a, b) -> IO (Source (a, b))
forall a b. (a -> b) -> a -> b
$ BIO :: forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO { pull :: IO (Maybe (a, b))
pull = do
        Bool
fin <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
finRef
        if Bool
fin
        then Maybe (a, b) -> IO (Maybe (a, b))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (a, b)
forall a. Maybe a
Nothing
        else do
            Maybe a
mA <- IO (Maybe a)
pullA
            Maybe b
mB <- IO (Maybe b)
pullB
            let r :: Maybe (a, b)
r = (,) (a -> b -> (a, b)) -> Maybe a -> Maybe (b -> (a, b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe a
mA Maybe (b -> (a, b)) -> Maybe b -> Maybe (a, b)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe b
mB
            case Maybe (a, b)
r of
                Just (a, b)
_ -> Maybe (a, b) -> IO (Maybe (a, b))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (a, b)
r
                Maybe (a, b)
_      -> IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
finRef Bool
True IO () -> IO (Maybe (a, b)) -> IO (Maybe (a, b))
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe (a, b) -> IO (Maybe (a, b))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (a, b)
forall a. Maybe a
Nothing
            }

-- | Zip two 'BIO' node into one, reach EOF when either one reached EOF.
--
-- The output item number should match, unmatched output will be discarded.
zipBIO :: HasCallStack => BIO a b -> BIO a c -> IO (BIO a (b, c))
{-# INLINABLE zipBIO #-}
zipBIO :: BIO a b -> BIO a c -> IO (BIO a (b, c))
zipBIO (BIO a -> IO (Maybe b)
pushA IO (Maybe b)
pullA) (BIO a -> IO (Maybe c)
pushB IO (Maybe c)
pullB) = do
    IORef Bool
finRef <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
    IORef (Seq b)
aSeqRef <- Seq b -> IO (IORef (Seq b))
forall a. a -> IO (IORef a)
newIORef Seq b
forall a. Seq a
Seq.Empty
    IORef (Seq c)
bSeqRef <- Seq c -> IO (IORef (Seq c))
forall a. a -> IO (IORef a)
newIORef Seq c
forall a. Seq a
Seq.Empty
    BIO a (b, c) -> IO (BIO a (b, c))
forall (m :: * -> *) a. Monad m => a -> m a
return ((a -> IO (Maybe (b, c))) -> IO (Maybe (b, c)) -> BIO a (b, c)
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO (IORef (Seq b) -> IORef (Seq c) -> a -> IO (Maybe (b, c))
push_ IORef (Seq b)
aSeqRef IORef (Seq c)
bSeqRef) (IORef Bool -> IORef (Seq b) -> IORef (Seq c) -> IO (Maybe (b, c))
pull_ IORef Bool
finRef IORef (Seq b)
aSeqRef IORef (Seq c)
bSeqRef))
  where
    push_ :: IORef (Seq b) -> IORef (Seq c) -> a -> IO (Maybe (b, c))
push_ IORef (Seq b)
aSeqRef IORef (Seq c)
bSeqRef a
x = do
        Maybe b
ma <- a -> IO (Maybe b)
pushA a
x
        Maybe c
mb <- a -> IO (Maybe c)
pushB a
x
        Maybe b -> (b -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe b
ma (\ b
a -> IORef (Seq b) -> (Seq b -> Seq b) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (Seq b)
aSeqRef (b
a b -> Seq b -> Seq b
forall a. a -> Seq a -> Seq a
:<|))
        Maybe c -> (c -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe c
mb (\ c
b -> IORef (Seq c) -> (Seq c -> Seq c) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef (Seq c)
bSeqRef (c
b c -> Seq c -> Seq c
forall a. a -> Seq a -> Seq a
:<|))
        Seq b
aSeq <- IORef (Seq b) -> IO (Seq b)
forall a. IORef a -> IO a
readIORef IORef (Seq b)
aSeqRef
        Seq c
bSeq <- IORef (Seq c) -> IO (Seq c)
forall a. IORef a -> IO a
readIORef IORef (Seq c)
bSeqRef
        case Seq b
aSeq of
            (!Seq b
as :|> b
a) -> case Seq c
bSeq of
                (!Seq c
bs :|> c
b) -> do
                    IORef (Seq b) -> Seq b -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Seq b)
aSeqRef Seq b
as
                    IORef (Seq c) -> Seq c -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Seq c)
bSeqRef Seq c
bs
                    Maybe (b, c) -> IO (Maybe (b, c))
forall (m :: * -> *) a. Monad m => a -> m a
return ((b, c) -> Maybe (b, c)
forall a. a -> Maybe a
Just (b
a, c
b))
                Seq c
_ -> Maybe (b, c) -> IO (Maybe (b, c))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (b, c)
forall a. Maybe a
Nothing
            Seq b
_ -> Maybe (b, c) -> IO (Maybe (b, c))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (b, c)
forall a. Maybe a
Nothing

    pull_ :: IORef Bool -> IORef (Seq b) -> IORef (Seq c) -> IO (Maybe (b, c))
pull_ IORef Bool
finRef IORef (Seq b)
aSeqRef IORef (Seq c)
bSeqRef = do
        Bool
fin <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
finRef
        if Bool
fin
        then Maybe (b, c) -> IO (Maybe (b, c))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (b, c)
forall a. Maybe a
Nothing
        else do
            Seq b
aSeq <- IORef (Seq b) -> IO (Seq b)
forall a. IORef a -> IO a
readIORef IORef (Seq b)
aSeqRef
            Seq c
bSeq <- IORef (Seq c) -> IO (Seq c)
forall a. IORef a -> IO a
readIORef IORef (Seq c)
bSeqRef
            Maybe b
ma <- case Seq b
aSeq of (Seq b
_ :|> b
a) -> Maybe b -> IO (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Maybe b
forall a. a -> Maybe a
Just b
a)
                               Seq b
_         -> IO (Maybe b)
pullA
            Maybe c
mb <- case Seq c
bSeq of (Seq c
_ :|> c
b) -> Maybe c -> IO (Maybe c)
forall (m :: * -> *) a. Monad m => a -> m a
return (c -> Maybe c
forall a. a -> Maybe a
Just c
b)
                               Seq c
_         -> IO (Maybe c)
pullB
            case Maybe b
ma of
                Just b
a -> case Maybe c
mb of
                    Just c
b -> Maybe (b, c) -> IO (Maybe (b, c))
forall (m :: * -> *) a. Monad m => a -> m a
return ((b, c) -> Maybe (b, c)
forall a. a -> Maybe a
Just (b
a, c
b))
                    Maybe c
_      -> IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
finRef Bool
True IO () -> IO (Maybe (b, c)) -> IO (Maybe (b, c))
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe (b, c) -> IO (Maybe (b, c))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (b, c)
forall a. Maybe a
Nothing
                Maybe b
_ -> IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
finRef Bool
True IO () -> IO (Maybe (b, c)) -> IO (Maybe (b, c))
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe (b, c) -> IO (Maybe (b, c))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (b, c)
forall a. Maybe a
Nothing

-------------------------------------------------------------------------------
-- Run BIO

-- | Run a 'BIO' loop (source >|> ... >|> sink).
runBIO :: HasCallStack => BIO Void Void -> IO ()
{-# INLINABLE runBIO #-}
runBIO :: BIO Void Void -> IO ()
runBIO BIO{IO (Maybe Void)
Void -> IO (Maybe Void)
pull :: IO (Maybe Void)
push :: Void -> IO (Maybe Void)
pull :: forall inp out. BIO inp out -> IO (Maybe out)
push :: forall inp out. BIO inp out -> inp -> IO (Maybe out)
..} = IO (Maybe Void)
pull IO (Maybe Void) -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Drain a 'BIO' source into a List in memory.
runSource :: HasCallStack => Source x -> IO [x]
{-# INLINABLE runSource #-}
runSource :: Source x -> IO [x]
runSource BIO{IO (Maybe x)
Void -> IO (Maybe x)
pull :: IO (Maybe x)
push :: Void -> IO (Maybe x)
pull :: forall inp out. BIO inp out -> IO (Maybe out)
push :: forall inp out. BIO inp out -> inp -> IO (Maybe out)
..} = IO (Maybe x) -> [x] -> IO [x]
forall (m :: * -> *) a. Monad m => m (Maybe a) -> [a] -> m [a]
loop IO (Maybe x)
pull []
  where
    loop :: m (Maybe a) -> [a] -> m [a]
loop m (Maybe a)
f [a]
acc = do
        Maybe a
r <- m (Maybe a)
f
        case Maybe a
r of Just a
r' -> m (Maybe a) -> [a] -> m [a]
loop m (Maybe a)
f (a
r'a -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
acc)
                  Maybe a
_       -> [a] -> m [a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([a] -> [a]
forall a. [a] -> [a]
List.reverse [a]
acc)

-- | Drain a source without collecting result.
runSource_ :: HasCallStack => Source x -> IO ()
{-# INLINABLE runSource_ #-}
runSource_ :: Source x -> IO ()
runSource_ BIO{IO (Maybe x)
Void -> IO (Maybe x)
pull :: IO (Maybe x)
push :: Void -> IO (Maybe x)
pull :: forall inp out. BIO inp out -> IO (Maybe out)
push :: forall inp out. BIO inp out -> inp -> IO (Maybe out)
..} = IO (Maybe x) -> IO ()
forall (m :: * -> *) a. Monad m => m (Maybe a) -> m ()
loop IO (Maybe x)
pull
  where
    loop :: m (Maybe a) -> m ()
loop m (Maybe a)
f = do
        Maybe a
r <- m (Maybe a)
f
        case Maybe a
r of Just a
_ -> m (Maybe a) -> m ()
loop m (Maybe a)
f
                  Maybe a
_      -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Supply a single block of input, then run BIO node until EOF.
--
-- Note many 'BIO' node will be closed or not be able to take new input after drained.
runBlock :: HasCallStack => BIO inp out -> inp -> IO [out]
{-# INLINABLE runBlock #-}
runBlock :: BIO inp out -> inp -> IO [out]
runBlock BIO{IO (Maybe out)
inp -> IO (Maybe out)
pull :: IO (Maybe out)
push :: inp -> IO (Maybe out)
pull :: forall inp out. BIO inp out -> IO (Maybe out)
push :: forall inp out. BIO inp out -> inp -> IO (Maybe out)
..} inp
inp = do
    Maybe out
x <- inp -> IO (Maybe out)
push inp
inp
    let acc :: [out]
acc = case Maybe out
x of Just out
x' -> [out
x']
                        Maybe out
_       -> []
    IO (Maybe out) -> [out] -> IO [out]
forall (m :: * -> *) a. Monad m => m (Maybe a) -> [a] -> m [a]
loop IO (Maybe out)
pull [out]
acc
  where
    loop :: m (Maybe a) -> [a] -> m [a]
loop m (Maybe a)
f [a]
acc = do
        Maybe a
r <- m (Maybe a)
f
        case Maybe a
r of Just a
r' -> m (Maybe a) -> [a] -> m [a]
loop m (Maybe a)
f (a
r'a -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
acc)
                  Maybe a
_       -> [a] -> m [a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([a] -> [a]
forall a. [a] -> [a]
List.reverse [a]
acc)

-- | Supply a single block of input, then run BIO node until EOF with collecting result.
--
-- Note many 'BIO' node will be closed or not be able to take new input after drained.
runBlock_ :: HasCallStack => BIO inp out -> inp -> IO ()
{-# INLINABLE runBlock_ #-}
runBlock_ :: BIO inp out -> inp -> IO ()
runBlock_ BIO{IO (Maybe out)
inp -> IO (Maybe out)
pull :: IO (Maybe out)
push :: inp -> IO (Maybe out)
pull :: forall inp out. BIO inp out -> IO (Maybe out)
push :: forall inp out. BIO inp out -> inp -> IO (Maybe out)
..} inp
inp = do
    Maybe out
_ <- inp -> IO (Maybe out)
push inp
inp
    IO (Maybe out) -> IO ()
forall (m :: * -> *) a. Monad m => m (Maybe a) -> m ()
loop IO (Maybe out)
pull
  where
    loop :: m (Maybe a) -> m ()
loop m (Maybe a)
f = do
        Maybe a
r <- m (Maybe a)
f
        case Maybe a
r of Just a
_ -> m (Maybe a) -> m ()
loop m (Maybe a)
f
                  Maybe a
_      -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Wrap 'runBlock' into a pure interface.
--
-- You can wrap a stateful BIO computation(including the creation of 'BIO' node),
-- when you can guarantee a computation is pure, e.g. compressing, decoding, etc.
unsafeRunBlock :: HasCallStack => IO (BIO inp out) -> inp -> [out]
{-# INLINABLE unsafeRunBlock #-}
unsafeRunBlock :: IO (BIO inp out) -> inp -> [out]
unsafeRunBlock IO (BIO inp out)
new inp
inp = IO [out] -> [out]
forall a. IO a -> a
unsafePerformIO (IO (BIO inp out)
new IO (BIO inp out) -> (BIO inp out -> IO [out]) -> IO [out]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ BIO inp out
bio -> BIO inp out -> inp -> IO [out]
forall inp out. HasCallStack => BIO inp out -> inp -> IO [out]
runBlock BIO inp out
bio inp
inp)

-- | Supply blocks of input, then run BIO node until EOF.
--
-- Note many 'BIO' node will be closed or not be able to take new input after drained.
runBlocks :: HasCallStack => BIO inp out -> [inp] -> IO [out]
{-# INLINABLE runBlocks #-}
runBlocks :: BIO inp out -> [inp] -> IO [out]
runBlocks BIO{IO (Maybe out)
inp -> IO (Maybe out)
pull :: IO (Maybe out)
push :: inp -> IO (Maybe out)
pull :: forall inp out. BIO inp out -> IO (Maybe out)
push :: forall inp out. BIO inp out -> inp -> IO (Maybe out)
..} = [out] -> [inp] -> IO [out]
loop []
  where
    loop :: [out] -> [inp] -> IO [out]
loop [out]
acc (inp
inp:[inp]
inps) = do
        Maybe out
r <- inp -> IO (Maybe out)
push inp
inp
        case Maybe out
r of
            Just out
r' -> [out] -> [inp] -> IO [out]
loop (out
r'out -> [out] -> [out]
forall a. a -> [a] -> [a]
:[out]
acc) [inp]
inps
            Maybe out
_       -> [out] -> [inp] -> IO [out]
loop [out]
acc [inp]
inps
    loop [out]
acc [] = [out] -> IO [out]
loop' [out]
acc
    loop' :: [out] -> IO [out]
loop' [out]
acc = do
        Maybe out
r <- IO (Maybe out)
pull
        case Maybe out
r of
            Just out
r' -> [out] -> IO [out]
loop' (out
r'out -> [out] -> [out]
forall a. a -> [a] -> [a]
:[out]
acc)
            Maybe out
_       -> [out] -> IO [out]
forall (m :: * -> *) a. Monad m => a -> m a
return ([out] -> [out]
forall a. [a] -> [a]
List.reverse [out]
acc)

-- | Supply blocks of input, then run BIO node until EOF with collecting result.
--
-- Note many 'BIO' node will be closed or not be able to take new input after drained.
runBlocks_ :: HasCallStack => BIO inp out -> [inp] -> IO ()
{-# INLINABLE runBlocks_ #-}
runBlocks_ :: BIO inp out -> [inp] -> IO ()
runBlocks_ BIO inp out
bio (inp
inp:[inp]
inps) = BIO inp out -> inp -> IO (Maybe out)
forall inp out. BIO inp out -> inp -> IO (Maybe out)
push BIO inp out
bio inp
inp IO (Maybe out) -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> BIO inp out -> [inp] -> IO ()
forall inp out. HasCallStack => BIO inp out -> [inp] -> IO ()
runBlocks_ BIO inp out
bio [inp]
inps
runBlocks_ BIO inp out
bio [] = IO ()
loop
  where
    loop :: IO ()
loop = do
        Maybe out
r <- BIO inp out -> IO (Maybe out)
forall inp out. BIO inp out -> IO (Maybe out)
pull BIO inp out
bio
        case Maybe out
r of
            Just out
_ -> IO ()
loop
            Maybe out
_      -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Wrap 'runBlocks' into a pure interface.
--
-- Similar to 'unsafeRunBlock', but with a list of input blocks.
unsafeRunBlocks :: HasCallStack => IO (BIO inp out) -> [inp] -> [out]
{-# INLINABLE unsafeRunBlocks #-}
unsafeRunBlocks :: IO (BIO inp out) -> [inp] -> [out]
unsafeRunBlocks IO (BIO inp out)
new [inp]
inps = IO [out] -> [out]
forall a. IO a -> a
unsafePerformIO (IO (BIO inp out)
new IO (BIO inp out) -> (BIO inp out -> IO [out]) -> IO [out]
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ BIO inp out
bio -> BIO inp out -> [inp] -> IO [out]
forall inp out. HasCallStack => BIO inp out -> [inp] -> IO [out]
runBlocks BIO inp out
bio [inp]
inps)

-------------------------------------------------------------------------------
-- Source

-- | Source a list from memory.
--
sourceFromList :: [a] -> IO (Source a)
sourceFromList :: [a] -> IO (Source a)
sourceFromList [a]
xs0 = do
    IORef [a]
xsRef <- [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef [a]
xs0
    Source a -> IO (Source a)
forall (m :: * -> *) a. Monad m => a -> m a
return BIO :: forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO{ pull :: IO (Maybe a)
pull = IORef [a] -> IO (Maybe a)
forall a. IORef [a] -> IO (Maybe a)
popper IORef [a]
xsRef }
  where
    popper :: IORef [a] -> IO (Maybe a)
popper IORef [a]
xsRef = do
        [a]
xs <- IORef [a] -> IO [a]
forall a. IORef a -> IO a
readIORef IORef [a]
xsRef
        case [a]
xs of
            (a
x:[a]
xs') -> do
                IORef [a] -> [a] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [a]
xsRef [a]
xs'
                Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
x)
            [a]
_ -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing

-- | Turn a 'BufferedInput' into 'BIO' source, map EOF to Nothing.
--
sourceFromBuffered :: HasCallStack => BufferedInput -> Source V.Bytes
{-# INLINABLE sourceFromBuffered #-}
sourceFromBuffered :: BufferedInput -> Source Bytes
sourceFromBuffered BufferedInput
i = BIO :: forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO{ pull :: IO (Maybe Bytes)
pull = do
    HasCallStack => BufferedInput -> IO Bytes
BufferedInput -> IO Bytes
readBuffer BufferedInput
i IO Bytes -> (Bytes -> IO (Maybe Bytes)) -> IO (Maybe Bytes)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ Bytes
x -> if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
x then Maybe Bytes -> IO (Maybe Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Bytes
forall a. Maybe a
Nothing
                                        else Maybe Bytes -> IO (Maybe Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just Bytes
x)}

-- | Turn a `IO` action into 'Source'
sourceFromIO :: HasCallStack => IO (Maybe a) -> Source a
{-# INLINABLE sourceFromIO #-}
sourceFromIO :: IO (Maybe a) -> Source a
sourceFromIO IO (Maybe a)
io = BIO :: forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO{ pull :: IO (Maybe a)
pull = IO (Maybe a)
io }

-- | Turn a UTF8 encoded 'BufferedInput' into 'BIO' source, map EOF to Nothing.
--
sourceTextFromBuffered :: HasCallStack => BufferedInput -> Source T.Text
{-# INLINABLE sourceTextFromBuffered #-}
sourceTextFromBuffered :: BufferedInput -> Source Text
sourceTextFromBuffered BufferedInput
i = BIO :: forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO{ pull :: IO (Maybe Text)
pull = do
    HasCallStack => BufferedInput -> IO Text
BufferedInput -> IO Text
readBufferText BufferedInput
i IO Text -> (Text -> IO (Maybe Text)) -> IO (Maybe Text)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ Text
x -> if Text -> Bool
T.null Text
x then Maybe Text -> IO (Maybe Text)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Text
forall a. Maybe a
Nothing
                                            else Maybe Text -> IO (Maybe Text)
forall (m :: * -> *) a. Monad m => a -> m a
return (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
x)}

-- | Turn a 'JSON' encoded 'BufferedInput' into 'BIO' source, ignoring any
-- whitespaces bewteen JSON objects. If EOF reached, then return Nothing.
-- Throw 'OtherError' with name "EJSON" if JSON value is not parsed or converted.
sourceJSONFromBuffered :: forall a. (JSON.JSON a, HasCallStack) => BufferedInput -> Source a
{-# INLINABLE sourceJSONFromBuffered #-}
sourceJSONFromBuffered :: BufferedInput -> Source a
sourceJSONFromBuffered = ParseChunks IO Bytes DecodeError a -> BufferedInput -> Source a
forall e a.
(HasCallStack, Print e) =>
ParseChunks IO Bytes e a -> BufferedInput -> Source a
sourceParseChunksFromBuffered ParseChunks IO Bytes DecodeError a
forall a (m :: * -> *).
(JSON a, Monad m) =>
ParseChunks m Bytes DecodeError a
JSON.decodeChunks

-- | Turn buffered input device into a packet source, throw 'OtherError' with name @EPARSE@ if parsing fail.
sourceParserFromBuffered :: HasCallStack => P.Parser a -> BufferedInput -> Source a
{-# INLINABLE sourceParserFromBuffered #-}
sourceParserFromBuffered :: Parser a -> BufferedInput -> Source a
sourceParserFromBuffered Parser a
p = ParseChunks IO Bytes ParseError a -> BufferedInput -> Source a
forall e a.
(HasCallStack, Print e) =>
ParseChunks IO Bytes e a -> BufferedInput -> Source a
sourceParseChunksFromBuffered (Parser a -> ParseChunks IO Bytes ParseError a
forall (m :: * -> *) a.
Monad m =>
Parser a -> ParseChunks m Bytes ParseError a
P.parseChunks Parser a
p)

-- | Turn buffered input device into a packet source, throw 'OtherError' with name @EPARSE@ if parsing fail.
sourceParseChunksFromBuffered :: (HasCallStack, T.Print e) => P.ParseChunks IO V.Bytes e a -> BufferedInput -> Source a
{-# INLINABLE sourceParseChunksFromBuffered #-}
sourceParseChunksFromBuffered :: ParseChunks IO Bytes e a -> BufferedInput -> Source a
sourceParseChunksFromBuffered ParseChunks IO Bytes e a
cp BufferedInput
bi = BIO :: forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO{ pull :: IO (Maybe a)
pull = do
    Bytes
bs <- HasCallStack => BufferedInput -> IO Bytes
BufferedInput -> IO Bytes
readBuffer BufferedInput
bi
    if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
bs
       then Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
       else do
           (Bytes
rest, Either e a
r) <- ParseChunks IO Bytes e a
cp (HasCallStack => BufferedInput -> IO Bytes
BufferedInput -> IO Bytes
readBuffer BufferedInput
bi) Bytes
bs
           HasCallStack => Bytes -> BufferedInput -> IO ()
Bytes -> BufferedInput -> IO ()
unReadBuffer Bytes
rest BufferedInput
bi
           case Either e a
r of Right a
v -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
v)
                     Left e
e  -> Text -> Text -> IO (Maybe a)
forall a. HasCallStack => Text -> Text -> IO a
throwOtherError Text
"EPARSE" (e -> Text
forall a. Print a => a -> Text
T.toText e
e) }

-- | Turn a file into a 'V.Bytes' source.
initSourceFromFile :: HasCallStack => CBytes -> Resource (Source V.Bytes)
{-# INLINABLE initSourceFromFile #-}
initSourceFromFile :: CBytes -> Resource (Source Bytes)
initSourceFromFile CBytes
p = do
    File
f <- HasCallStack => CBytes -> FileFlag -> FileFlag -> Resource File
CBytes -> FileFlag -> FileFlag -> Resource File
FS.initFile CBytes
p FileFlag
FS.O_RDONLY FileFlag
FS.DEFAULT_FILE_MODE
    IO (Source Bytes) -> Resource (Source Bytes)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (HasCallStack => BufferedInput -> Source Bytes
BufferedInput -> Source Bytes
sourceFromBuffered (BufferedInput -> Source Bytes)
-> IO BufferedInput -> IO (Source Bytes)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> File -> IO BufferedInput
forall i. Input i => i -> IO BufferedInput
newBufferedInput File
f)

--------------------------------------------------------------------------------
-- Sink

-- | Turn a 'BufferedOutput' into a 'V.Bytes' sink.
sinkToBuffered :: HasCallStack => BufferedOutput -> Sink V.Bytes
{-# INLINABLE sinkToBuffered #-}
sinkToBuffered :: BufferedOutput -> Sink Bytes
sinkToBuffered BufferedOutput
bo = (Bytes -> IO (Maybe Void)) -> IO (Maybe Void) -> Sink Bytes
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO Bytes -> IO (Maybe Void)
push_ IO (Maybe Void)
pull_
  where
    push_ :: Bytes -> IO (Maybe Void)
push_ Bytes
inp = HasCallStack => BufferedOutput -> Bytes -> IO ()
BufferedOutput -> Bytes -> IO ()
writeBuffer BufferedOutput
bo Bytes
inp IO () -> IO (Maybe Void) -> IO (Maybe Void)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO (Maybe Void)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Void
forall a. Maybe a
Nothing
    pull_ :: IO (Maybe Void)
pull_ = HasCallStack => BufferedOutput -> IO ()
BufferedOutput -> IO ()
flushBuffer BufferedOutput
bo IO () -> IO (Maybe Void) -> IO (Maybe Void)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO (Maybe Void)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Void
forall a. Maybe a
Nothing

-- | Turn a 'BufferedOutput' into a 'B.Builder' sink.
--
sinkBuilderToBuffered :: HasCallStack => BufferedOutput -> Sink (B.Builder a)
{-# INLINABLE sinkBuilderToBuffered #-}
sinkBuilderToBuffered :: BufferedOutput -> Sink (Builder a)
sinkBuilderToBuffered BufferedOutput
bo = (Builder a -> IO (Maybe Void))
-> IO (Maybe Void) -> Sink (Builder a)
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO Builder a -> IO (Maybe Void)
push_ IO (Maybe Void)
pull_
  where
    push_ :: Builder a -> IO (Maybe Void)
push_ Builder a
inp = BufferedOutput -> Builder a -> IO ()
forall a. HasCallStack => BufferedOutput -> Builder a -> IO ()
writeBuilder BufferedOutput
bo Builder a
inp IO () -> IO (Maybe Void) -> IO (Maybe Void)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO (Maybe Void)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Void
forall a. Maybe a
Nothing
    pull_ :: IO (Maybe Void)
pull_ = HasCallStack => BufferedOutput -> IO ()
BufferedOutput -> IO ()
flushBuffer BufferedOutput
bo IO () -> IO (Maybe Void) -> IO (Maybe Void)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO (Maybe Void)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Void
forall a. Maybe a
Nothing

-- | Turn a file into a 'V.Bytes' sink.
--
-- Note the file will be opened in @'FS.O_APPEND' .|. 'FS.O_CREAT' .|. 'FS.O_WRONLY'@ mode,
-- bytes will be written after the end of the original file if there'are old bytes.
initSinkToFile :: HasCallStack => CBytes -> Resource (Sink V.Bytes)
{-# INLINABLE initSinkToFile #-}
initSinkToFile :: CBytes -> Resource (Sink Bytes)
initSinkToFile CBytes
p = do
    File
f <- HasCallStack => CBytes -> FileFlag -> FileFlag -> Resource File
CBytes -> FileFlag -> FileFlag -> Resource File
FS.initFile CBytes
p (FileFlag
FS.O_APPEND FileFlag -> FileFlag -> FileFlag
forall a. Bits a => a -> a -> a
.|. FileFlag
FS.O_CREAT FileFlag -> FileFlag -> FileFlag
forall a. Bits a => a -> a -> a
.|. FileFlag
FS.O_WRONLY) FileFlag
FS.DEFAULT_FILE_MODE
    IO (Sink Bytes) -> Resource (Sink Bytes)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (HasCallStack => BufferedOutput -> Sink Bytes
BufferedOutput -> Sink Bytes
sinkToBuffered (BufferedOutput -> Sink Bytes)
-> IO BufferedOutput -> IO (Sink Bytes)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> File -> IO BufferedOutput
forall o. Output o => o -> IO BufferedOutput
newBufferedOutput File
f)

-- | Turn an `IO` action into 'BIO' sink.
--
-- 'push' will call `IO` action with input chunk, `pull` has no effect.
sinkToIO :: HasCallStack => (a -> IO ()) -> Sink a
{-# INLINABLE sinkToIO #-}
sinkToIO :: (a -> IO ()) -> Sink a
sinkToIO a -> IO ()
f = (a -> IO (Maybe Void)) -> IO (Maybe Void) -> Sink a
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO a -> IO (Maybe Void)
push_ IO (Maybe Void)
forall a. IO (Maybe a)
pull_
  where
    push_ :: a -> IO (Maybe Void)
push_ a
x = a -> IO ()
f a
x IO () -> IO (Maybe Void) -> IO (Maybe Void)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO (Maybe Void)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Void
forall a. Maybe a
Nothing
    pull_ :: IO (Maybe a)
pull_ = Maybe a -> IO (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing

-- | Sink to a list in memory.
--
-- The list's 'IORef' is not thread safe here,
-- and list items are in reversed order during sinking(will be reversed when flushed, i.e. pulled),
-- Please don't use it in multiple thread.
--
sinkToList :: IO (IORef [a], Sink a)
sinkToList :: IO (IORef [a], Sink a)
sinkToList = do
    IORef [a]
xsRef <- [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef []
    (IORef [a], Sink a) -> IO (IORef [a], Sink a)
forall (m :: * -> *) a. Monad m => a -> m a
return (IORef [a]
xsRef, (a -> IO (Maybe Void)) -> IO (Maybe Void) -> Sink a
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO (\ a
x -> IORef [a] -> ([a] -> [a]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef [a]
xsRef (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:) IO () -> IO (Maybe Void) -> IO (Maybe Void)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO (Maybe Void)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Void
forall a. Maybe a
Nothing)
                       (IORef [a] -> ([a] -> [a]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef IORef [a]
xsRef [a] -> [a]
forall a. [a] -> [a]
reverse IO () -> IO (Maybe Void) -> IO (Maybe Void)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe Void -> IO (Maybe Void)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Void
forall a. Maybe a
Nothing))

--------------------------------------------------------------------------------
-- Nodes

-- | BIO node from a pure function.
--
-- BIO node made with this funtion are stateless, thus can be reused across chains.
pureBIO :: (a -> b) -> BIO a b
pureBIO :: (a -> b) -> BIO a b
pureBIO a -> b
f = (a -> IO (Maybe b)) -> IO (Maybe b) -> BIO a b
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO (\ a
x -> let !r :: b
r = a -> b
f a
x in Maybe b -> IO (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Maybe b
forall a. a -> Maybe a
Just b
r)) (Maybe b -> IO (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing)

-- | BIO node from an IO function.
--
-- BIO node made with this funtion may not be stateless, it depends on if the IO function use
-- IO state.
ioBIO :: (HasCallStack => a -> IO b) -> BIO a b
ioBIO :: (HasCallStack => a -> IO b) -> BIO a b
ioBIO HasCallStack => a -> IO b
f = (a -> IO (Maybe b)) -> IO (Maybe b) -> BIO a b
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO (\ a
x -> b -> Maybe b
forall a. a -> Maybe a
Just (b -> Maybe b) -> IO b -> IO (Maybe b)
forall (m :: * -> *) a b. Monad m => (a -> b) -> m a -> m b
<$!> a -> IO b
HasCallStack => a -> IO b
f a
x) (Maybe b -> IO (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing)

-- | Make a chunk size divider.
--
-- A divider size divide each chunk's size to the nearest multiplier to granularity,
-- last trailing chunk is directly returned.
newReChunk :: Int                -- ^ chunk granularity
           -> IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newReChunk #-}
newReChunk :: Int -> IO (BIO Bytes Bytes)
newReChunk Int
n = do
    IORef Bytes
trailingRef <- Bytes -> IO (IORef Bytes)
forall a. a -> IO (IORef a)
newIORef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Bytes -> IO (Maybe Bytes)) -> IO (Maybe Bytes) -> BIO Bytes Bytes
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO (IORef Bytes -> Bytes -> IO (Maybe Bytes)
push_ IORef Bytes
trailingRef) (IORef Bytes -> IO (Maybe Bytes)
forall (v :: * -> *) a. Vec v a => IORef (v a) -> IO (Maybe (v a))
pull_ IORef Bytes
trailingRef))
  where
    push_ :: IORef Bytes -> Bytes -> IO (Maybe Bytes)
push_ IORef Bytes
trailingRef Bytes
bs = do
        Bytes
trailing <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
trailingRef
        let chunk :: Bytes
chunk =  Bytes
trailing Bytes -> Bytes -> Bytes
forall (v :: * -> *) a. Vec v a => v a -> v a -> v a
`V.append` Bytes
bs
            l :: Int
l = Bytes -> Int
forall (v :: * -> *) a. Vec v a => v a -> Int
V.length Bytes
chunk
        if Int
l Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
        then do
            let l' :: Int
l' = Int
l Int -> Int -> Int
forall a. Num a => a -> a -> a
- (Int
l Int -> Int -> Int
forall a. Integral a => a -> a -> a
`rem` Int
n)
                (Bytes
chunk', Bytes
rest) = Int -> Bytes -> (Bytes, Bytes)
forall (v :: * -> *) a. Vec v a => Int -> v a -> (v a, v a)
V.splitAt Int
l' Bytes
chunk
            IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
rest
            Maybe Bytes -> IO (Maybe Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just Bytes
chunk')
        else do
            IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
chunk
            Maybe Bytes -> IO (Maybe Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Bytes
forall a. Maybe a
Nothing
    pull_ :: IORef (v a) -> IO (Maybe (v a))
pull_ IORef (v a)
trailingRef = do
        v a
trailing <- IORef (v a) -> IO (v a)
forall a. IORef a -> IO a
readIORef IORef (v a)
trailingRef
        if v a -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null v a
trailing
        then Maybe (v a) -> IO (Maybe (v a))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (v a)
forall a. Maybe a
Nothing
        else do
            IORef (v a) -> v a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (v a)
trailingRef v a
forall (v :: * -> *) a. Vec v a => v a
V.empty
            Maybe (v a) -> IO (Maybe (v a))
forall (m :: * -> *) a. Monad m => a -> m a
return (v a -> Maybe (v a)
forall a. a -> Maybe a
Just v a
trailing)

-- | Read buffer and parse with 'Parser'.
--
-- This function will continuously draw data from input before parsing finish.
-- Unconsumed bytes will be returned to buffer.
--
-- Return 'Nothing' if reach EOF before parsing, throw 'OtherError' with name @EPARSE@ if parsing fail.
newParserNode :: HasCallStack => P.Parser a -> IO (BIO V.Bytes a)
{-# INLINABLE newParserNode #-}
newParserNode :: Parser a -> IO (BIO Bytes a)
newParserNode Parser a
p = do
    -- type LastParseState = Either V.Bytes (V.Bytes -> P.Result)
    IORef (Either Bytes (Bytes -> Result a))
resultRef <- Either Bytes (Bytes -> Result a)
-> IO (IORef (Either Bytes (Bytes -> Result a)))
forall a. a -> IO (IORef a)
newIORef (Bytes -> Either Bytes (Bytes -> Result a)
forall a b. a -> Either a b
Left Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty)
    BIO Bytes a -> IO (BIO Bytes a)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Bytes -> IO (Maybe a)) -> IO (Maybe a) -> BIO Bytes a
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO (IORef (Either Bytes (Bytes -> Result a)) -> Bytes -> IO (Maybe a)
push_ IORef (Either Bytes (Bytes -> Result a))
resultRef) (IORef (Either Bytes (Bytes -> Result a)) -> IO (Maybe a)
pull_ IORef (Either Bytes (Bytes -> Result a))
resultRef))
  where
    push_ :: IORef (Either Bytes (Bytes -> Result a)) -> Bytes -> IO (Maybe a)
push_ IORef (Either Bytes (Bytes -> Result a))
resultRef Bytes
bs = do
        Either Bytes (Bytes -> Result a)
lastResult <- IORef (Either Bytes (Bytes -> Result a))
-> IO (Either Bytes (Bytes -> Result a))
forall a. IORef a -> IO a
readIORef IORef (Either Bytes (Bytes -> Result a))
resultRef
        let (Bytes
chunk, Bytes -> Result a
f) = case Either Bytes (Bytes -> Result a)
lastResult of
                Left Bytes
trailing -> (Bytes
trailing Bytes -> Bytes -> Bytes
forall (v :: * -> *) a. Vec v a => v a -> v a -> v a
`V.append` Bytes
bs, Parser a -> Bytes -> Result a
forall a. Parser a -> Bytes -> Result a
P.parseChunk Parser a
p)
                Right Bytes -> Result a
x       -> (Bytes
bs, Bytes -> Result a
x)
        case Bytes -> Result a
f Bytes
chunk of
            P.Success a
a Bytes
trailing' -> do
                IORef (Either Bytes (Bytes -> Result a))
-> Either Bytes (Bytes -> Result a) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Either Bytes (Bytes -> Result a))
resultRef (Bytes -> Either Bytes (Bytes -> Result a)
forall a b. a -> Either a b
Left Bytes
trailing')
                Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
            P.Failure ParseError
e Bytes
_ ->
                Text -> Text -> IO (Maybe a)
forall a. HasCallStack => Text -> Text -> IO a
throwOtherError Text
"EPARSE" (ParseError -> Text
forall a. Print a => a -> Text
T.toText ParseError
e)
            P.Partial Bytes -> Result a
f' -> do
                IORef (Either Bytes (Bytes -> Result a))
-> Either Bytes (Bytes -> Result a) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Either Bytes (Bytes -> Result a))
resultRef ((Bytes -> Result a) -> Either Bytes (Bytes -> Result a)
forall a b. b -> Either a b
Right Bytes -> Result a
f')
                Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing

    pull_ :: IORef (Either Bytes (Bytes -> Result a)) -> IO (Maybe a)
pull_ IORef (Either Bytes (Bytes -> Result a))
resultRef = do
        Either Bytes (Bytes -> Result a)
lastResult <- IORef (Either Bytes (Bytes -> Result a))
-> IO (Either Bytes (Bytes -> Result a))
forall a. IORef a -> IO a
readIORef IORef (Either Bytes (Bytes -> Result a))
resultRef
        case Either Bytes (Bytes -> Result a)
lastResult of
            Left Bytes
trailing ->
                if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
trailing
                then Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
                else IORef (Either Bytes (Bytes -> Result a))
-> (Bytes -> Result a) -> Bytes -> IO (Maybe a)
forall b t a.
IORef (Either Bytes b) -> (t -> Result a) -> t -> IO (Maybe a)
lastChunk IORef (Either Bytes (Bytes -> Result a))
resultRef (Parser a -> Bytes -> Result a
forall a. Parser a -> Bytes -> Result a
P.parseChunk Parser a
p) Bytes
trailing
            Right Bytes -> Result a
f -> IORef (Either Bytes (Bytes -> Result a))
-> (Bytes -> Result a) -> Bytes -> IO (Maybe a)
forall b t a.
IORef (Either Bytes b) -> (t -> Result a) -> t -> IO (Maybe a)
lastChunk IORef (Either Bytes (Bytes -> Result a))
resultRef Bytes -> Result a
f Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty

    lastChunk :: IORef (Either Bytes b) -> (t -> Result a) -> t -> IO (Maybe a)
lastChunk IORef (Either Bytes b)
resultRef t -> Result a
f t
chunk =
        case t -> Result a
f t
chunk of
            P.Success a
a Bytes
trailing' -> do
                IORef (Either Bytes b) -> Either Bytes b -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Either Bytes b)
resultRef (Bytes -> Either Bytes b
forall a b. a -> Either a b
Left Bytes
trailing')
                Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
            P.Failure ParseError
e Bytes
_ ->
                Text -> Text -> IO (Maybe a)
forall a. HasCallStack => Text -> Text -> IO a
throwOtherError Text
"EPARSE" (ParseError -> Text
forall a. Print a => a -> Text
T.toText ParseError
e)
            P.Partial ParseStep a
_ ->
                Text -> Text -> IO (Maybe a)
forall a. HasCallStack => Text -> Text -> IO a
throwOtherError Text
"EPARSE" Text
"last chunk partial parse"

-- | Make a new UTF8 decoder, which decode bytes streams into text streams.
--
-- If there're invalid UTF8 bytes, an 'OtherError' with name 'EINVALIDUTF8' will be thrown.`
--
-- Note this node is supposed to be used with preprocess node such as decompressor, parser, etc.
-- where bytes boundary cannot be controlled, UTF8 decoder will concat trailing bytes from last block to next one.
-- Use this node directly with 'sourceFromBuffered' will not be as efficient as directly use
-- 'sourceTextFromBuffered', because 'BufferedInput' provides push back capability,
-- trailing bytes can be pushed back to reading buffer then returned with next block input together.
--
newUTF8Decoder :: HasCallStack => IO (BIO V.Bytes T.Text)
{-# INLINABLE newUTF8Decoder #-}
newUTF8Decoder :: IO (BIO Bytes Text)
newUTF8Decoder = do
    IORef Bytes
trailingRef <- Bytes -> IO (IORef Bytes)
forall a. a -> IO (IORef a)
newIORef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
    BIO Bytes Text -> IO (BIO Bytes Text)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Bytes -> IO (Maybe Text)) -> IO (Maybe Text) -> BIO Bytes Text
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO (IORef Bytes -> Bytes -> IO (Maybe Text)
push_ IORef Bytes
trailingRef) (IORef Bytes -> IO (Maybe Text)
forall (v :: * -> *) a a. Vec v a => IORef (v a) -> IO (Maybe a)
pull_ IORef Bytes
trailingRef))
  where
    push_ :: IORef Bytes -> Bytes -> IO (Maybe Text)
push_ IORef Bytes
trailingRef Bytes
bs = do
        Bytes
trailing <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
trailingRef
        let chunk :: Bytes
chunk =  Bytes
trailing Bytes -> Bytes -> Bytes
forall (v :: * -> *) a. Vec v a => v a -> v a -> v a
`V.append` Bytes
bs
            (V.PrimVector PrimArray Word8
arr Int
s Int
l) = Bytes
chunk
        if Int
l Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 Bool -> Bool -> Bool
&& PrimArray Word8 -> Int -> Int
T.decodeCharLen PrimArray Word8
arr Int
s Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
l
        then do
            let (Int
i, Maybe Word8
_) = (Word8 -> Bool) -> Bytes -> (Int, Maybe Word8)
forall (v :: * -> *) a.
Vec v a =>
(a -> Bool) -> v a -> (Int, Maybe a)
V.findR (\ Word8
w -> Word8
w Word8 -> Word8 -> Bool
forall a. Ord a => a -> a -> Bool
>= Word8
0b11000000 Bool -> Bool -> Bool
|| Word8
w Word8 -> Word8 -> Bool
forall a. Ord a => a -> a -> Bool
<= Word8
0b01111111) Bytes
chunk
            if (Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== -Int
1)
            then Text -> Text -> IO (Maybe Text)
forall a. HasCallStack => Text -> Text -> IO a
throwOtherError Text
"EINVALIDUTF8" Text
"invalid UTF8 bytes"
            else do
                if PrimArray Word8 -> Int -> Int
T.decodeCharLen PrimArray Word8
arr (Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
i) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
l Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
i
                then do
                    IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef (IArray PrimVector Word8 -> Int -> Int -> Bytes
forall (v :: * -> *) a. Vec v a => IArray v a -> Int -> Int -> v a
V.fromArr IArray PrimVector Word8
PrimArray Word8
arr (Int
sInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
i) (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
i))
                    Maybe Text -> IO (Maybe Text)
forall (m :: * -> *) a. Monad m => a -> m a
return (Text -> Maybe Text
forall a. a -> Maybe a
Just (HasCallStack => Bytes -> Text
Bytes -> Text
T.validate (IArray PrimVector Word8 -> Int -> Int -> Bytes
forall (v :: * -> *) a. Vec v a => IArray v a -> Int -> Int -> v a
V.fromArr IArray PrimVector Word8
PrimArray Word8
arr Int
s Int
i)))
                else do
                    IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
                    Maybe Text -> IO (Maybe Text)
forall (m :: * -> *) a. Monad m => a -> m a
return (Text -> Maybe Text
forall a. a -> Maybe a
Just (HasCallStack => Bytes -> Text
Bytes -> Text
T.validate Bytes
chunk))
        else do
            IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
chunk
            Maybe Text -> IO (Maybe Text)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Text
forall a. Maybe a
Nothing

    pull_ :: IORef (v a) -> IO (Maybe a)
pull_ IORef (v a)
trailingRef = do
        v a
trailing <- IORef (v a) -> IO (v a)
forall a. IORef a -> IO a
readIORef IORef (v a)
trailingRef
        if v a -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null v a
trailing
        then Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
        else Text -> Text -> IO (Maybe a)
forall a. HasCallStack => Text -> Text -> IO a
throwOtherError Text
"EINVALIDUTF8" Text
"invalid UTF8 bytes"

-- | Make a new stream splitter based on magic byte.
--
newMagicSplitter :: Word8 -> IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newMagicSplitter #-}
newMagicSplitter :: Word8 -> IO (BIO Bytes Bytes)
newMagicSplitter Word8
magic = do
    IORef Bytes
trailingRef <- Bytes -> IO (IORef Bytes)
forall a. a -> IO (IORef a)
newIORef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Bytes -> IO (Maybe Bytes)) -> IO (Maybe Bytes) -> BIO Bytes Bytes
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO (IORef Bytes -> Bytes -> IO (Maybe Bytes)
push_ IORef Bytes
trailingRef) (IORef Bytes -> IO (Maybe Bytes)
pull_ IORef Bytes
trailingRef))
  where
    push_ :: IORef Bytes -> Bytes -> IO (Maybe Bytes)
push_ IORef Bytes
trailingRef Bytes
bs = do
        Bytes
trailing <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
trailingRef
        let chunk :: Bytes
chunk =  Bytes
trailing Bytes -> Bytes -> Bytes
forall (v :: * -> *) a. Vec v a => v a -> v a -> v a
`V.append` Bytes
bs
        case Word8 -> Bytes -> Maybe Int
forall (v :: * -> *) a. (Vec v a, Eq a) => a -> v a -> Maybe Int
V.elemIndex Word8
magic Bytes
chunk of
            Just Int
i -> do
                let (Bytes
line, Bytes
rest) = Int -> Bytes -> (Bytes, Bytes)
forall (v :: * -> *) a. Vec v a => Int -> v a -> (v a, v a)
V.splitAt (Int
iInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1) Bytes
chunk
                IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
rest
                Maybe Bytes -> IO (Maybe Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just Bytes
line)
            Maybe Int
_ -> do
                IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
chunk
                Maybe Bytes -> IO (Maybe Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Bytes
forall a. Maybe a
Nothing

    pull_ :: IORef Bytes -> IO (Maybe Bytes)
pull_ IORef Bytes
trailingRef = do
        Bytes
chunk <- IORef Bytes -> IO Bytes
forall a. IORef a -> IO a
readIORef IORef Bytes
trailingRef
        if Bytes -> Bool
forall (v :: * -> *) a. Vec v a => v a -> Bool
V.null Bytes
chunk
        then Maybe Bytes -> IO (Maybe Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Bytes
forall a. Maybe a
Nothing
        else case Word8 -> Bytes -> Maybe Int
forall (v :: * -> *) a. (Vec v a, Eq a) => a -> v a -> Maybe Int
V.elemIndex Word8
magic Bytes
chunk of
            Just Int
i -> do
                let (Bytes
line, Bytes
rest) = Int -> Bytes -> (Bytes, Bytes)
forall (v :: * -> *) a. Vec v a => Int -> v a -> (v a, v a)
V.splitAt (Int
iInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1) Bytes
chunk
                IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
rest
                Maybe Bytes -> IO (Maybe Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just Bytes
line)
            Maybe Int
_ -> do
                IORef Bytes -> Bytes -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bytes
trailingRef Bytes
forall (v :: * -> *) a. Vec v a => v a
V.empty
                Maybe Bytes -> IO (Maybe Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bytes -> Maybe Bytes
forall a. a -> Maybe a
Just Bytes
chunk)

-- | Make a new stream splitter based on linefeed(@\r\n@ or @\n@).
--
-- The result bytes doesn't contain linefeed.
newLineSplitter :: IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newLineSplitter #-}
newLineSplitter :: IO (BIO Bytes Bytes)
newLineSplitter = do
    BIO Bytes Bytes
s <- Word8 -> IO (BIO Bytes Bytes)
newMagicSplitter Word8
10
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Bytes
s BIO Bytes Bytes -> (Bytes -> Bytes) -> BIO Bytes Bytes
forall a b c. BIO a b -> (b -> c) -> BIO a c
>~> Bytes -> Bytes
forall a. (Prim a, Eq a, Num a) => PrimVector a -> PrimVector a
dropLineEnd)
  where
    dropLineEnd :: PrimVector a -> PrimVector a
dropLineEnd bs :: PrimVector a
bs@(V.PrimVector PrimArray a
arr Int
s Int
l) =
        case PrimVector a
bs PrimVector a -> Int -> Maybe a
forall (v :: * -> *) a. Vec v a => v a -> Int -> Maybe a
`V.indexMaybe` (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
2) of
            Just a
r | a
r a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
13   -> PrimArray a -> Int -> Int -> PrimVector a
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray a
arr Int
s (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
2)
                   | Bool
otherwise -> PrimArray a -> Int -> Int -> PrimVector a
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray a
arr Int
s (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1)
            Maybe a
_ | PrimVector a -> a
forall (v :: * -> *) a. (Vec v a, HasCallStack) => v a -> a
V.head PrimVector a
bs a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
10 -> PrimArray a -> Int -> Int -> PrimVector a
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray a
arr Int
s (Int
lInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1)
              | Bool
otherwise -> PrimArray a -> Int -> Int -> PrimVector a
forall a. PrimArray a -> Int -> Int -> PrimVector a
V.PrimVector PrimArray a
arr Int
s Int
l

-- | Make a new base64 encoder node.
newBase64Encoder :: IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newBase64Encoder #-}
newBase64Encoder :: IO (BIO Bytes Bytes)
newBase64Encoder = do
    BIO Bytes Bytes
re <- Int -> IO (BIO Bytes Bytes)
newReChunk Int
3
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Bytes
re BIO Bytes Bytes -> (Bytes -> Bytes) -> BIO Bytes Bytes
forall a b c. BIO a b -> (b -> c) -> BIO a c
>~> Bytes -> Bytes
base64Encode)

-- | Make a new base64 decoder node.
newBase64Decoder :: HasCallStack => IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newBase64Decoder #-}
newBase64Decoder :: IO (BIO Bytes Bytes)
newBase64Decoder = do
    BIO Bytes Bytes
re <- Int -> IO (BIO Bytes Bytes)
newReChunk Int
4
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Bytes
re BIO Bytes Bytes -> (Bytes -> Bytes) -> BIO Bytes Bytes
forall a b c. BIO a b -> (b -> c) -> BIO a c
>~> HasCallStack => Bytes -> Bytes
Bytes -> Bytes
base64Decode')

-- | Make a hex encoder node.
--
-- Hex encoder is stateless, it can be reused across chains.
hexEncoder :: Bool   -- ^ uppercase?
           -> BIO V.Bytes V.Bytes
{-# INLINABLE hexEncoder #-}
hexEncoder :: Bool -> BIO Bytes Bytes
hexEncoder Bool
upper = (Bytes -> Bytes) -> BIO Bytes Bytes
forall a b. (a -> b) -> BIO a b
pureBIO (Bool -> Bytes -> Bytes
hexEncode Bool
upper)

-- | Make a new hex decoder node.
newHexDecoder :: IO (BIO V.Bytes V.Bytes)
{-# INLINABLE newHexDecoder #-}
newHexDecoder :: IO (BIO Bytes Bytes)
newHexDecoder = do
    BIO Bytes Bytes
re <- Int -> IO (BIO Bytes Bytes)
newReChunk Int
2
    BIO Bytes Bytes -> IO (BIO Bytes Bytes)
forall (m :: * -> *) a. Monad m => a -> m a
return (BIO Bytes Bytes
re BIO Bytes Bytes -> (Bytes -> Bytes) -> BIO Bytes Bytes
forall a b c. BIO a b -> (b -> c) -> BIO a c
>~> HasCallStack => Bytes -> Bytes
Bytes -> Bytes
hexDecode')

-- | Make a new BIO node which counts items flow throught it.
--
-- Returned 'Counter' is increased atomically, it's safe to read \/ reset the counter from other threads.
newCounterNode :: IO (Counter, BIO a a)
{-# INLINABLE newCounterNode #-}
newCounterNode :: IO (Counter, BIO a a)
newCounterNode = do
    Counter
c <- Int -> IO Counter
newCounter Int
0
    (Counter, BIO a a) -> IO (Counter, BIO a a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Counter
c, (a -> IO (Maybe a)) -> IO (Maybe a) -> BIO a a
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO (Counter -> a -> IO (Maybe a)
forall a. Counter -> a -> IO (Maybe a)
push_ Counter
c) (Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing))
  where
    push_ :: Counter -> a -> IO (Maybe a)
push_ Counter
c a
x = do
        Counter -> Int -> IO ()
atomicAddCounter_ Counter
c Int
1
        Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
x)

-- | Make a new BIO node which counts items, and label item with a sequence number.
--
-- Returned 'Counter' is increased atomically, it's safe to read \/ reset the counter from other threads.
newSeqNumNode :: IO (Counter, BIO a (Int, a))
{-# INLINABLE newSeqNumNode #-}
newSeqNumNode :: IO (Counter, BIO a (Int, a))
newSeqNumNode = do
    Counter
c <- Int -> IO Counter
newCounter Int
0
    (Counter, BIO a (Int, a)) -> IO (Counter, BIO a (Int, a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Counter
c, (a -> IO (Maybe (Int, a))) -> IO (Maybe (Int, a)) -> BIO a (Int, a)
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO (Counter -> a -> IO (Maybe (Int, a))
forall b. Counter -> b -> IO (Maybe (Int, b))
push_ Counter
c) (Maybe (Int, a) -> IO (Maybe (Int, a))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Int, a)
forall a. Maybe a
Nothing))
  where
    push_ :: Counter -> b -> IO (Maybe (Int, b))
push_ Counter
c b
x = do
        !Int
i <- Counter -> Int -> IO Int
atomicAddCounter Counter
c Int
1
        Maybe (Int, b) -> IO (Maybe (Int, b))
forall (m :: * -> *) a. Monad m => a -> m a
return ((Int, b) -> Maybe (Int, b)
forall a. a -> Maybe a
Just (Int
i, b
x))

-- | Make a BIO node grouping items into fixed size arrays.
--
newGroupingNode :: Int -> IO (BIO a (A.SmallArray a))
{-# INLINABLE newGroupingNode #-}
newGroupingNode :: Int -> IO (BIO a (SmallArray a))
newGroupingNode Int
n
    | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 =  Int -> IO (BIO a (SmallArray a))
forall a. Int -> IO (BIO a (SmallArray a))
newGroupingNode Int
1
    | Bool
otherwise = do
        Counter
c <- Int -> IO Counter
newCounter Int
0
        IORef (SmallMutableArray RealWorld a)
arrRef <- SmallMutableArray RealWorld a
-> IO (IORef (SmallMutableArray RealWorld a))
forall a. a -> IO (IORef a)
newIORef (SmallMutableArray RealWorld a
 -> IO (IORef (SmallMutableArray RealWorld a)))
-> IO (SmallMutableArray RealWorld a)
-> IO (IORef (SmallMutableArray RealWorld a))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> IO (MArr SmallArray RealWorld a)
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
Int -> m (MArr arr s a)
A.newArr Int
n
        BIO a (SmallArray a) -> IO (BIO a (SmallArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return ((a -> IO (Maybe (SmallArray a)))
-> IO (Maybe (SmallArray a)) -> BIO a (SmallArray a)
forall inp out.
(inp -> IO (Maybe out)) -> IO (Maybe out) -> BIO inp out
BIO (Counter
-> IORef (SmallMutableArray RealWorld a)
-> a
-> IO (Maybe (SmallArray a))
push_ Counter
c IORef (SmallMutableArray RealWorld a)
arrRef) (Counter
-> IORef (MArr SmallArray RealWorld a) -> IO (Maybe (SmallArray a))
forall (arr :: * -> *) a.
Arr arr a =>
Counter -> IORef (MArr arr RealWorld a) -> IO (Maybe (arr a))
pull_ Counter
c IORef (MArr SmallArray RealWorld a)
IORef (SmallMutableArray RealWorld a)
arrRef))
  where
    push_ :: Counter
-> IORef (SmallMutableArray RealWorld a)
-> a
-> IO (Maybe (SmallArray a))
push_ Counter
c IORef (SmallMutableArray RealWorld a)
arrRef a
x = do
        Int
i <- Counter -> IO Int
forall a. Prim a => PrimIORef a -> IO a
readPrimIORef Counter
c
        if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
        then do
            SmallMutableArray RealWorld a
marr <- IORef (SmallMutableArray RealWorld a)
-> IO (SmallMutableArray RealWorld a)
forall a. IORef a -> IO a
readIORef IORef (SmallMutableArray RealWorld a)
arrRef
            MArr SmallArray RealWorld a -> Int -> a -> IO ()
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> Int -> a -> m ()
A.writeArr MArr SmallArray RealWorld a
SmallMutableArray RealWorld a
marr Int
i a
x
            Counter -> Int -> IO ()
forall a. Prim a => PrimIORef a -> a -> IO ()
writePrimIORef Counter
c Int
0
            IORef (SmallMutableArray RealWorld a)
-> SmallMutableArray RealWorld a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (SmallMutableArray RealWorld a)
arrRef (SmallMutableArray RealWorld a -> IO ())
-> IO (SmallMutableArray RealWorld a) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Int -> IO (MArr SmallArray RealWorld a)
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
Int -> m (MArr arr s a)
A.newArr Int
n
            Maybe (SmallArray a) -> IO (Maybe (SmallArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (SmallArray a) -> IO (Maybe (SmallArray a)))
-> (SmallArray a -> Maybe (SmallArray a))
-> SmallArray a
-> IO (Maybe (SmallArray a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SmallArray a -> Maybe (SmallArray a)
forall a. a -> Maybe a
Just (SmallArray a -> IO (Maybe (SmallArray a)))
-> IO (SmallArray a) -> IO (Maybe (SmallArray a))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MArr SmallArray RealWorld a -> IO (SmallArray a)
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> m (arr a)
A.unsafeFreezeArr MArr SmallArray RealWorld a
SmallMutableArray RealWorld a
marr
        else do
            SmallMutableArray RealWorld a
marr <- IORef (SmallMutableArray RealWorld a)
-> IO (SmallMutableArray RealWorld a)
forall a. IORef a -> IO a
readIORef IORef (SmallMutableArray RealWorld a)
arrRef
            MArr SmallArray RealWorld a -> Int -> a -> IO ()
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> Int -> a -> m ()
A.writeArr MArr SmallArray RealWorld a
SmallMutableArray RealWorld a
marr Int
i a
x
            Counter -> Int -> IO ()
forall a. Prim a => PrimIORef a -> a -> IO ()
writePrimIORef Counter
c (Int
iInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
            Maybe (SmallArray a) -> IO (Maybe (SmallArray a))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (SmallArray a)
forall a. Maybe a
Nothing
    pull_ :: Counter -> IORef (MArr arr RealWorld a) -> IO (Maybe (arr a))
pull_ Counter
c IORef (MArr arr RealWorld a)
arrRef = do
        Int
i <- Counter -> IO Int
forall a. Prim a => PrimIORef a -> IO a
readPrimIORef Counter
c
        if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
0
        then do
            MArr arr RealWorld a
marr <- IORef (MArr arr RealWorld a) -> IO (MArr arr RealWorld a)
forall a. IORef a -> IO a
readIORef IORef (MArr arr RealWorld a)
arrRef
#if MIN_VERSION_base(4,14,0)
            MArr arr RealWorld a -> Int -> IO ()
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> Int -> m ()
A.shrinkMutableArr MArr arr RealWorld a
marr Int
i
            Maybe (arr a) -> IO (Maybe (arr a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (arr a) -> IO (Maybe (arr a)))
-> (arr a -> Maybe (arr a)) -> arr a -> IO (Maybe (arr a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. arr a -> Maybe (arr a)
forall a. a -> Maybe a
Just (arr a -> IO (Maybe (arr a))) -> IO (arr a) -> IO (Maybe (arr a))
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MArr arr RealWorld a -> IO (arr a)
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> m (arr a)
A.unsafeFreezeArr MArr arr RealWorld a
marr
#else
            marr' <- A.resizeMutableArr marr i
            return . Just =<< A.unsafeFreezeArr marr'
#endif
        else Maybe (arr a) -> IO (Maybe (arr a))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (arr a)
forall a. Maybe a
Nothing