Copyright | (c) Dong Han 2017-2020 |
---|---|
License | BSD |
Maintainer | winterland1989@gmail.com |
Stability | experimental |
Portability | non-portable |
Safe Haskell | None |
Language | Haskell2010 |
This module provides BIO
(block IO) type to facilitate writing streaming programs. A BIO
node usually:
- Process input in unit of block(or item).
- Running in constant spaces, which means the memory usage won't accumulate.
- Keep some state in IO, which is sealed in
BIO
closure.
Some examples of such nodes are:
- Compressor / decompressor, e.g. zlib, etc.
- Codec, e.g. utf8 codec, base64 codec.
- Ciphers.
- Packet parsers.
We use BIO inp out
type to represent all the objects above, BIO Void out
to represent an IO
source,
and BIO inp Void
to represent an IO
sink, which can all be connected with >|>
to build a larger BIO
node.
import Z.Data.CBytes (CBytes) import Z.IO import Z.IO.BIO import Z.IO.BIO.Zlib base64AndCompressFile :: HasCallStack => CBytes -> CBytes -> IO () base64AndCompressFile origin target = do base64Enc <- newBase64Encoder (_, zlibCompressor) <- newCompress defaultCompressConfig{compressWindowBits = 31} withResource (initSourceFromFile origin) $ src -> withResource (initSinkToFile target) $ sink -> runBIO $ src >|> base64Enc >|> zlibCompressor >|> sink > base64AndCompressFile "test" "test.gz" -- run 'zcat "test.gz" | base64 -d' will give you original file
Synopsis
- data BIO inp out = BIO {}
- type Source out = BIO Void out
- type Sink inp = BIO inp Void
- (>|>) :: BIO a b -> BIO b c -> BIO a c
- (>~>) :: BIO a b -> (b -> c) -> BIO a c
- (>!>) :: BIO a b -> (HasCallStack => b -> IO c) -> BIO a c
- appendSource :: Source a -> Source a -> IO (Source a)
- concatSource :: [Source a] -> IO (Source a)
- zipSource :: Source a -> Source b -> IO (Source (a, b))
- zipBIO :: BIO a b -> BIO a c -> IO (BIO a (b, c))
- joinSink :: Sink out -> Sink out -> Sink out
- fuseSink :: [Sink out] -> Sink out
- runBIO :: HasCallStack => BIO Void Void -> IO ()
- runSource :: HasCallStack => Source x -> IO [x]
- runSource_ :: HasCallStack => Source x -> IO ()
- runBlock :: HasCallStack => BIO inp out -> inp -> IO [out]
- runBlock_ :: HasCallStack => BIO inp out -> inp -> IO ()
- unsafeRunBlock :: HasCallStack => IO (BIO inp out) -> inp -> [out]
- runBlocks :: HasCallStack => BIO inp out -> [inp] -> IO [out]
- runBlocks_ :: HasCallStack => BIO inp out -> [inp] -> IO ()
- unsafeRunBlocks :: HasCallStack => IO (BIO inp out) -> [inp] -> [out]
- pureBIO :: (a -> b) -> BIO a b
- ioBIO :: (HasCallStack => a -> IO b) -> BIO a b
- sourceFromList :: [a] -> IO (Source a)
- initSourceFromFile :: HasCallStack => CBytes -> Resource (Source Bytes)
- sourceFromBuffered :: HasCallStack => BufferedInput -> Source Bytes
- sourceFromInput :: (HasCallStack, Input i) => i -> IO (Source Bytes)
- sourceTextFromBuffered :: HasCallStack => BufferedInput -> Source Text
- sourceTextFromInput :: (HasCallStack, Input i) => i -> IO (Source Text)
- sourceJSONFromBuffered :: forall a. (JSON a, HasCallStack) => BufferedInput -> Source a
- sourceJSONFromInput :: (HasCallStack, Input i, JSON a) => i -> IO (Source a)
- sourceParsedBufferInput :: HasCallStack => Parser a -> BufferedInput -> Source a
- sourceParsedInput :: (Input i, HasCallStack) => Parser a -> i -> IO (Source a)
- sinkToList :: IO (IORef [a], Sink a)
- sinkToBuffered :: HasCallStack => BufferedOutput -> Sink Bytes
- sinkBuilderToBuffered :: HasCallStack => BufferedOutput -> Sink (Builder a)
- sinkToOutput :: HasCallStack => Output o => o -> IO (Sink Bytes)
- initSinkToFile :: HasCallStack => CBytes -> Resource (Sink Bytes)
- sinkBuilderToOutput :: (Output o, HasCallStack) => o -> IO (Sink (Builder ()))
- sinkToIO :: HasCallStack => (a -> IO ()) -> Sink a
- newParserNode :: HasCallStack => Parser a -> IO (BIO Bytes a)
- newReChunk :: Int -> IO (BIO Bytes Bytes)
- newUTF8Decoder :: HasCallStack => IO (BIO Bytes Text)
- newMagicSplitter :: Word8 -> IO (BIO Bytes Bytes)
- newLineSplitter :: IO (BIO Bytes Bytes)
- newBase64Encoder :: IO (BIO Bytes Bytes)
- newBase64Decoder :: IO (BIO Bytes Bytes)
- hexEncoder :: Bool -> BIO Bytes Bytes
- newHexDecoder :: IO (BIO Bytes Bytes)
- newCounterNode :: IO (Counter, BIO a a)
- newSeqNumNode :: IO (Counter, BIO a (Int, a))
- newGroupingNode :: Int -> IO (BIO a (SmallArray a))
The BIO type
A BIO
(blocked IO) node.
A BIO
node consist of two functions: push
and pull
. It can be used to describe different kinds of IO
devices:
BIO inp out
describe an IO state machine(e.g. z_stream in zlib), which takes some input in block, then outputs.type Source out = BIO Void out
described an IO source, which never takes input, but gives output until EOF whenpull
ed.type Sink inp = BIO inp Void
described an IO sink, which takes input and perform some IO effects, such as writing to terminal or files.
You can connect these BIO
nodes with >|>
, which connect left node's output to right node's input,
and return a new BIO
node with left node's input type and right node's output type.
You can run a BIO
node in different ways:
runBIO
will continuously pull value from source, push to sink until source reaches EOF.runSource
will continuously pull value from source, and perform effects along the way.runBlock
will supply a single block of input as whole input, and return output if there's any.runBlocks
will supply a list of blocks as whole input, and return a list of output blocks.
Note BIO
usually contains some IO states, you can consider it as an opaque IORef
:
- You shouldn't use a
BIO
node across multipleBIO
chain unless the state can be reset. - You shouldn't use a
BIO
node across multiple threads unless document states otherwise.
BIO
is simply a convenient way to construct single-thread streaming computation, to use BIO
in multiple threads, check Z.IO.BIO.Concurrent module.
BIO | |
|
Basic combinators
(>|>) :: BIO a b -> BIO b c -> BIO a c infixl 3 Source #
Connect two BIO
nodes, feed left one's output to right one's input.
(>!>) :: BIO a b -> (HasCallStack => b -> IO c) -> BIO a c Source #
Connect BIO to an effectful function.
appendSource :: Source a -> Source a -> IO (Source a) Source #
Connect two BIO
source, after first reach EOF, draw element from second.
concatSource :: [Source a] -> IO (Source a) Source #
Connect list of BIO
sources, after one reach EOF, draw element from next.
zipSource :: Source a -> Source b -> IO (Source (a, b)) Source #
Zip two BIO
source into one, reach EOF when either one reached EOF.
zipBIO :: BIO a b -> BIO a c -> IO (BIO a (b, c)) Source #
Zip two BIO
node into one, reach EOF when either one reached EOF.
The output item number should match, unmatched output will be discarded.
Run BIO chain
runSource_ :: HasCallStack => Source x -> IO () Source #
Drain a source without collecting result.
runBlock :: HasCallStack => BIO inp out -> inp -> IO [out] Source #
Supply a single block of input, then run BIO node until EOF.
Note many BIO
node will be closed or not be able to take new input after drained.
runBlock_ :: HasCallStack => BIO inp out -> inp -> IO () Source #
Supply a single block of input, then run BIO node until EOF with collecting result.
Note many BIO
node will be closed or not be able to take new input after drained.
unsafeRunBlock :: HasCallStack => IO (BIO inp out) -> inp -> [out] Source #
Wrap a stream computation into a pure interface.
You can wrap a stateful BIO computation(including the creation of BIO
node),
when you can guarantee a computation is pure, e.g. compressing, decoding, etc.
runBlocks :: HasCallStack => BIO inp out -> [inp] -> IO [out] Source #
Supply blocks of input, then run BIO node until EOF.
Note many BIO
node will be closed or not be able to take new input after drained.
runBlocks_ :: HasCallStack => BIO inp out -> [inp] -> IO () Source #
Supply blocks of input, then run BIO node until EOF with collecting result.
Note many BIO
node will be closed or not be able to take new input after drained.
unsafeRunBlocks :: HasCallStack => IO (BIO inp out) -> [inp] -> [out] Source #
Wrap a stream computation into a pure interface.
Similar to unsafeRunBlock
, but with a list of input blocks.
Make new BIO
pureBIO :: (a -> b) -> BIO a b Source #
BIO node from a pure function.
BIO node made with this funtion are stateless, thus can be reused across chains.
ioBIO :: (HasCallStack => a -> IO b) -> BIO a b Source #
BIO node from an IO function.
BIO node made with this funtion may not be stateless, it depends on if the IO function use IO state.
Source
sourceFromList :: [a] -> IO (Source a) Source #
Source a list from memory.
initSourceFromFile :: HasCallStack => CBytes -> Resource (Source Bytes) Source #
Turn a file into a Bytes
source.
sourceFromBuffered :: HasCallStack => BufferedInput -> Source Bytes Source #
Turn a BufferedInput
into BIO
source, map EOF to Nothing.
sourceFromInput :: (HasCallStack, Input i) => i -> IO (Source Bytes) Source #
Turn an input device into a Bytes
source.
sourceTextFromBuffered :: HasCallStack => BufferedInput -> Source Text Source #
Turn a UTF8 encoded BufferedInput
into BIO
source, map EOF to Nothing.
sourceTextFromInput :: (HasCallStack, Input i) => i -> IO (Source Text) Source #
Turn an input device into a Text
source.
sourceJSONFromBuffered :: forall a. (JSON a, HasCallStack) => BufferedInput -> Source a Source #
Turn a JSON
encoded BufferedInput
into BIO
source, ignoring any
whitespaces bewteen JSON objects. If EOF reached, then return Nothing.
Throw OtherError
with name EJSON if JSON value is not parsed or converted.
sourceJSONFromInput :: (HasCallStack, Input i, JSON a) => i -> IO (Source a) Source #
Turn an input device into a JSON
source.
Throw OtherError
with name EJSON if JSON value is not parsed or converted.
sourceParsedBufferInput :: HasCallStack => Parser a -> BufferedInput -> Source a Source #
Turn buffered input device into a packet source, throw OtherError
with name EPARSE
if parsing fail.
sourceParsedInput :: (Input i, HasCallStack) => Parser a -> i -> IO (Source a) Source #
Turn input device into a packet source.
Sink
sinkToList :: IO (IORef [a], Sink a) Source #
Sink to a list in memory.
The list's IORef
is not thread safe here,
and list items are in reversed order during sinking(will be reversed when flushed, i.e. pulled),
Please don't use it in multiple thread.
sinkToBuffered :: HasCallStack => BufferedOutput -> Sink Bytes Source #
Turn a BufferedOutput
into a Bytes
sink.
sinkBuilderToBuffered :: HasCallStack => BufferedOutput -> Sink (Builder a) Source #
Turn a BufferedOutput
into a Builder
sink.
sinkToOutput :: HasCallStack => Output o => o -> IO (Sink Bytes) Source #
initSinkToFile :: HasCallStack => CBytes -> Resource (Sink Bytes) Source #
sinkBuilderToOutput :: (Output o, HasCallStack) => o -> IO (Sink (Builder ())) Source #
Bytes specific
newParserNode :: HasCallStack => Parser a -> IO (BIO Bytes a) Source #
Read buffer and parse with Parser
.
This function will continuously draw data from input before parsing finish. Unconsumed bytes will be returned to buffer.
Return Nothing
if reach EOF before parsing, throw OtherError
with name EPARSE
if parsing fail.
Make a chunk size divider.
A divider size divide each chunk's size to the nearest multiplier to granularity, last trailing chunk is directly returned.
newUTF8Decoder :: HasCallStack => IO (BIO Bytes Text) Source #
Make a new UTF8 decoder, which decode bytes streams into text streams.
If there're invalid UTF8 bytes, an OtherError
with name EINVALIDUTF8
will be thrown.`
Note this node is supposed to be used with preprocess node such as compressor, decoder, etc. where bytes
boundary cannot be controlled, UTF8 decoder will concat trailing bytes from last block to next one.
Use this node directly with sourceFromBuffered
/ sourceFromInput
will not be as efficient as directly use
sourceTextFromBuffered
/ sourceTextFromInput
, because BufferedInput
provides push back capability,
trailing bytes can be pushde back to reading buffer and returned with next block input together.
newMagicSplitter :: Word8 -> IO (BIO Bytes Bytes) Source #
Make a new stream splitter based on magic byte.
newLineSplitter :: IO (BIO Bytes Bytes) Source #
Make a new stream splitter based on linefeed(rn
or n
).
The result bytes doesn't contain linefeed.
Make a hex encoder node.
Hex encoder is stateless, it can be reused across chains.
Generic BIO
newCounterNode :: IO (Counter, BIO a a) Source #
Make a new BIO node which counts items flow throught it.
Returned Counter
is increased atomically, it's safe to read / reset the counter from other threads.
newSeqNumNode :: IO (Counter, BIO a (Int, a)) Source #
Make a new BIO node which counts items, and label item with a sequence number.
Returned Counter
is increased atomically, it's safe to read / reset the counter from other threads.
newGroupingNode :: Int -> IO (BIO a (SmallArray a)) Source #
Make a BIO node grouping items into fixed size arrays.