Copyright | (c) Dong Han 2017-2020 |
---|---|
License | BSD |
Maintainer | winterland1989@gmail.com |
Stability | experimental |
Portability | non-portable |
Safe Haskell | None |
Language | Haskell2010 |
This module provides BIO
(block IO) type to facilitate writing streaming programs. A BIO
node usually:
- Process input in unit of block(or item).
- Running in constant spaces, which means the memory usage won't accumulate.
- Keep some state in IO, which is sealed in
BIO
closure.
Some examples of such nodes are:
- Compressor / decompressor, e.g. zlib, etc.
- Codec, e.g. utf8 codec, base64 codec.
- Ciphers.
- Packet parsers.
We use BIO inp out
type to represent all the objects above, BIO Void out
to represent an IO
source,
and BIO inp Void
to represent an IO
sink, which can all be connected with >|>
to build a larger BIO
node.
import Z.Data.CBytes (CBytes) import Z.IO import Z.IO.BIO import Z.IO.BIO.Zlib base64AndCompressFile :: HasCallStack => CBytes -> CBytes -> IO () base64AndCompressFile origin target = do base64Enc <- newBase64Encoder (_, zlibCompressor) <- newCompress defaultCompressConfig{compressWindowBits = 31} withResource (initSourceFromFile origin) $ src -> withResource (initSinkToFile target) $ sink -> runBIO_ $ src . base64Enc . zlibCompressor . sink > base64AndCompressFile "test" "test.gz" -- run 'zcat "test.gz" | base64 -d' will give you original file
Synopsis
- type BIO inp out = (Maybe out -> IO ()) -> Maybe inp -> IO ()
- pattern EOF :: Maybe a
- type Source x = BIO Void x
- type Sink x = BIO x Void
- appendSource :: HasCallStack => Source a -> Source a -> Source a
- concatSource :: HasCallStack => [Source a] -> Source a
- concatSource' :: HasCallStack => Source (Source a) -> Source a
- joinSink :: HasCallStack => Sink out -> Sink out -> Sink out
- fuseSink :: HasCallStack => [Sink out] -> Sink out
- discard :: a -> IO ()
- stepBIO :: HasCallStack => BIO inp out -> inp -> IO [out]
- stepBIO_ :: HasCallStack => BIO inp out -> inp -> IO ()
- runBIO :: HasCallStack => BIO inp out -> IO [out]
- runBIO_ :: HasCallStack => BIO inp out -> IO ()
- runBlock :: HasCallStack => BIO inp out -> inp -> IO [out]
- runBlock_ :: HasCallStack => BIO inp out -> inp -> IO ()
- unsafeRunBlock :: HasCallStack => IO (BIO inp out) -> inp -> [out]
- runBlocks :: HasCallStack => BIO inp out -> [inp] -> IO [out]
- runBlocks_ :: HasCallStack => BIO inp out -> [inp] -> IO ()
- unsafeRunBlocks :: HasCallStack => IO (BIO inp out) -> [inp] -> [out]
- pureBIO :: (a -> b) -> BIO a b
- ioBIO :: HasCallStack => (a -> IO b) -> BIO a b
- initSourceFromFile :: HasCallStack => CBytes -> Resource (Source Bytes)
- initSourceFromFile' :: HasCallStack => CBytes -> Int -> Resource (Source Bytes)
- sourceFromIO :: HasCallStack => IO (Maybe a) -> Source a
- sourceFromList :: Foldable f => f a -> Source a
- sourceFromBuffered :: HasCallStack => BufferedInput -> Source Bytes
- sourceTextFromBuffered :: HasCallStack => BufferedInput -> Source Text
- sourceJSONFromBuffered :: forall a. (JSON a, HasCallStack) => BufferedInput -> Source a
- sourceParserFromBuffered :: HasCallStack => Parser a -> BufferedInput -> Source a
- sourceParseChunkFromBuffered :: (HasCallStack, Print e) => (Bytes -> Result e a) -> BufferedInput -> Source a
- sinkToIO :: HasCallStack => (a -> IO ()) -> Sink a
- sinkToList :: IO (MVar [a], Sink a)
- initSinkToFile :: HasCallStack => CBytes -> Resource (Sink Bytes)
- sinkToBuffered :: HasCallStack => BufferedOutput -> Sink Bytes
- sinkBuilderToBuffered :: HasCallStack => BufferedOutput -> Sink (Builder a)
- newReChunk :: Int -> IO (BIO Bytes Bytes)
- newUTF8Decoder :: HasCallStack => IO (BIO Bytes Text)
- newParserNode :: HasCallStack => Parser a -> IO (BIO Bytes a)
- newMagicSplitter :: Word8 -> IO (BIO Bytes Bytes)
- newLineSplitter :: IO (BIO Bytes Bytes)
- newBase64Encoder :: IO (BIO Bytes Bytes)
- newBase64Decoder :: HasCallStack => IO (BIO Bytes Bytes)
- hexEncoder :: Bool -> BIO Bytes Bytes
- newHexDecoder :: IO (BIO Bytes Bytes)
- counterNode :: Counter -> BIO a a
- seqNumNode :: Counter -> BIO a (Int, a)
- newGroupingNode :: Int -> IO (BIO a (Vector a))
- ungroupingNode :: BIO (Vector a) a
- consumedNode :: IORef Bool -> BIO a a
The BIO type
= (Maybe out -> IO ()) | Pass |
-> Maybe inp |
|
-> IO () |
A BIO
(blocked IO) node.
A BIO
node is a push based stream transformer. It can be used to describe different kinds of IO
devices:
BIO inp out
describe an IO state machine(e.g. z_stream in zlib), which takes some input in block, then outputs.type Source out = BIO Void out
described an IO source, which never takes input, but gives output until EOF by looping.type Sink inp = BIO inp Void
described an IO sink, which takes input and perform some IO effects, such as writing to terminal or files.
You can connect these BIO
nodes with >|>
, which connect left node's output to right node's input,
and return a new BIO
node with left node's input type and right node's output type.
You can run a BIO
node in different ways:
stepBIO
/stepBIO_
to supply a single chunk of input and step the BIO node.runBIO
/runBIO_
will supply EOF directly, which will effectively pull all values from source, and push to sink until source reaches EOF.runBlock
/runBlock_
will supply a single block of input as whole input and run the BIO node.runBlocks
/runBlocks_
will supply a list of blocks as whole input and run the BIO node.
Note BIO
usually contains some IO states, you can consider it as an opaque IORef
:
- You shouldn't use a
BIO
node across multipleBIO
chain unless the state can be reset. - You shouldn't use a
BIO
node across multiple threads unless document states otherwise.
BIO
is simply a convenient way to construct single-thread streaming computation, to use BIO
in multiple threads, check Z.IO.BIO.Concurrent module.
Basic combinators
appendSource :: HasCallStack => Source a -> Source a -> Source a Source #
Connect two BIO
source, after first reach EOF, draw elements from second.
concatSource :: HasCallStack => [Source a] -> Source a Source #
Connect list of BIO
sources, after one reach EOF, draw element from next.
concatSource' :: HasCallStack => Source (Source a) -> Source a Source #
Connect list of BIO
sources, after one reach EOF, draw element from next.
Run BIO chain
stepBIO :: HasCallStack => BIO inp out -> inp -> IO [out] Source #
Supply a single chunk of input to a BIO
and collect result.
stepBIO_ :: HasCallStack => BIO inp out -> inp -> IO () Source #
Supply a single chunk of input to a BIO
without collecting result.
runBlock :: HasCallStack => BIO inp out -> inp -> IO [out] Source #
Run a BIO
loop with a single chunk of input and EOF, and collect result.
runBlock_ :: HasCallStack => BIO inp out -> inp -> IO () Source #
Run a BIO
loop with a single chunk of input and EOF, without collecting result.
unsafeRunBlock :: HasCallStack => IO (BIO inp out) -> inp -> [out] Source #
runBlocks_ :: HasCallStack => BIO inp out -> [inp] -> IO () Source #
unsafeRunBlocks :: HasCallStack => IO (BIO inp out) -> [inp] -> [out] Source #
Wrap runBlocks
into a pure interface.
Similar to unsafeRunBlock
, but with a list of input blocks.
Make new BIO
pureBIO :: (a -> b) -> BIO a b Source #
BIO node from a pure function.
BIO node made with this funtion are stateless, thus can be reused across chains.
ioBIO :: HasCallStack => (a -> IO b) -> BIO a b Source #
BIO node from an IO function.
BIO node made with this funtion may not be stateless, it depends on if the IO function use IO state.
Source
initSourceFromFile :: HasCallStack => CBytes -> Resource (Source Bytes) Source #
Turn a file into a Bytes
source.
initSourceFromFile' :: HasCallStack => CBytes -> Int -> Resource (Source Bytes) Source #
Turn a file into a Bytes
source with given chunk size.
sourceFromIO :: HasCallStack => IO (Maybe a) -> Source a Source #
sourceFromBuffered :: HasCallStack => BufferedInput -> Source Bytes Source #
Turn a BufferedInput
into BIO
source, map EOF to EOF.
sourceTextFromBuffered :: HasCallStack => BufferedInput -> Source Text Source #
Turn a UTF8 encoded BufferedInput
into BIO
source, map EOF to EOF.
sourceJSONFromBuffered :: forall a. (JSON a, HasCallStack) => BufferedInput -> Source a Source #
Turn a JSON
encoded BufferedInput
into BIO
source, ignoring any
whitespaces bewteen JSON objects. If EOF reached, then return EOF
.
Throw OtherError
with name EJSON if JSON value is not parsed or converted.
sourceParserFromBuffered :: HasCallStack => Parser a -> BufferedInput -> Source a Source #
Turn buffered input device into a packet source, throw OtherError
with name EPARSE
if parsing fail.
sourceParseChunkFromBuffered :: (HasCallStack, Print e) => (Bytes -> Result e a) -> BufferedInput -> Source a Source #
Turn buffered input device into a packet source, throw OtherError
with name EPARSE
if parsing fail.
Sink
sinkToList :: IO (MVar [a], Sink a) Source #
Sink to a list in memory.
The MVar
will be empty during sinking, and will be filled after sink receives an EOF.
initSinkToFile :: HasCallStack => CBytes -> Resource (Sink Bytes) Source #
sinkToBuffered :: HasCallStack => BufferedOutput -> Sink Bytes Source #
Turn a BufferedOutput
into a Bytes
sink.
sinkBuilderToBuffered :: HasCallStack => BufferedOutput -> Sink (Builder a) Source #
Turn a BufferedOutput
into a Builder
sink.
Bytes specific
Make a chunk size divider.
A divider size divide each chunk's size to the nearest multiplier to granularity, last trailing chunk is directly returned.
newUTF8Decoder :: HasCallStack => IO (BIO Bytes Text) Source #
Make a new UTF8 decoder, which decode bytes streams into text streams.
If there're invalid UTF8 bytes, an OtherError
with name EINVALIDUTF8
will be thrown.`
Note this node is supposed to be used with preprocess node such as decompressor, parser, etc.
where bytes boundary cannot be controlled, UTF8 decoder will concat trailing bytes from last block to next one.
Use this node directly with sourceFromBuffered
will not be as efficient as directly use
sourceTextFromBuffered
, because BufferedInput
provides push back capability,
trailing bytes can be pushed back to reading buffer then returned with next block input together.
newParserNode :: HasCallStack => Parser a -> IO (BIO Bytes a) Source #
Read buffer and parse with Parser
.
This function will turn a Parser
into a BIO
, throw OtherError
with name EPARSE
if parsing fail.
newMagicSplitter :: Word8 -> IO (BIO Bytes Bytes) Source #
Make a new stream splitter based on magic byte.
newLineSplitter :: IO (BIO Bytes Bytes) Source #
Make a new stream splitter based on linefeed(rn
or n
).
The result bytes doesn't contain linefeed.
newBase64Decoder :: HasCallStack => IO (BIO Bytes Bytes) Source #
Make a new base64 decoder node.
Make a hex encoder node.
Hex encoder is stateless, it can be reused across chains.
Generic BIO
counterNode :: Counter -> BIO a a Source #
Make a new BIO node which counts items flow throught it.
Counter
is increased atomically, it's safe to read / reset the counter from other threads.
seqNumNode :: Counter -> BIO a (Int, a) Source #
Make a new BIO node which counts items, and label item with a sequence number.
Counter
is increased atomically, it's safe to read / reset the counter from other threads.
newGroupingNode :: Int -> IO (BIO a (Vector a)) Source #
Make a BIO node grouping items into fixed size arrays.
Trailing items are directly returned.
ungroupingNode :: BIO (Vector a) a Source #
A BIO node flatten items.