Copyright | (c) Dong Han 2017-2020 |
---|---|
License | BSD |
Maintainer | winterland1989@gmail.com |
Stability | experimental |
Portability | non-portable |
Safe Haskell | None |
Language | Haskell2010 |
This module provides BIO
(block IO) type to facilitate writing streaming programs. A BIO
node usually:
- Process input in unit of block(or item).
- Running in constant spaces, which means the memory usage won't accumulate.
- Keep some state in IO, which is sealed in
BIO
closure.
Some examples of such nodes are:
- Compressor / decompressor, e.g. zlib, etc.
- Codec, e.g. utf8 codec, base64 codec.
- Ciphers.
- Packet parsers.
We use BIO inp out
type to represent all the objects above, BIO Void out
to represent an IO
source,
and BIO inp Void
to represent an IO
sink, which can all be connected with >|>
to build a larger BIO
node.
import Z.Data.CBytes (CBytes) import Z.IO import Z.IO.BIO import Z.IO.BIO.Zlib base64AndCompressFile :: HasCallStack => CBytes -> CBytes -> IO () base64AndCompressFile origin target = do base64Enc <- newBase64Encoder (_, zlibCompressor) <- newCompress defaultCompressConfig{compressWindowBits = 31} withResource (initSourceFromFile origin) $ src -> withResource (initSinkToFile target) $ sink -> run_ $ src . base64Enc . zlibCompressor . sink > base64AndCompressFile "test" "test.gz" -- run 'zcat "test.gz" | base64 -d' will give you original file
This module is intended to be imported qualified:
import Z.IO.BIO (BIO, Source, Sink)
import qualified Z.IO.BIO as BIO
Synopsis
- type BIO inp out = (Maybe out -> IO ()) -> Maybe inp -> IO ()
- pattern EOF :: Maybe a
- type Source x = BIO Void x
- type Sink x = BIO x ()
- appendSource :: HasCallStack => Source a -> Source a -> Source a
- concatSource :: HasCallStack => [Source a] -> Source a
- concatSource' :: HasCallStack => Source (Source a) -> Source a
- joinSink :: HasCallStack => Sink out -> Sink out -> Sink out
- fuseSink :: HasCallStack => [Sink out] -> Sink out
- discard :: a -> IO ()
- step :: HasCallStack => BIO inp out -> inp -> IO [out]
- step_ :: HasCallStack => BIO inp out -> inp -> IO ()
- run :: HasCallStack => BIO inp out -> IO [out]
- run_ :: HasCallStack => BIO inp out -> IO ()
- runBlock :: HasCallStack => BIO inp out -> inp -> IO [out]
- runBlock_ :: HasCallStack => BIO inp out -> inp -> IO ()
- unsafeRunBlock :: HasCallStack => IO (BIO inp out) -> inp -> [out]
- runBlocks :: HasCallStack => BIO inp out -> [inp] -> IO [out]
- runBlocks_ :: HasCallStack => BIO inp out -> [inp] -> IO ()
- unsafeRunBlocks :: HasCallStack => IO (BIO inp out) -> [inp] -> [out]
- fromPure :: (a -> b) -> BIO a b
- fromIO :: HasCallStack => (a -> IO b) -> BIO a b
- filter :: (a -> Bool) -> BIO a a
- filterIO :: (a -> IO Bool) -> BIO a a
- fold' :: Fold a b -> Source a -> IO b
- foldIO' :: FoldM IO a b -> Source a -> IO b
- initSourceFromFile :: HasCallStack => CBytes -> Resource (Source Bytes)
- initSourceFromFile' :: HasCallStack => CBytes -> Int -> Resource (Source Bytes)
- sourceFromIO :: HasCallStack => IO (Maybe a) -> Source a
- sourceFromList :: Foldable f => f a -> Source a
- sourceFromBuffered :: HasCallStack => BufferedInput -> Source Bytes
- sourceTextFromBuffered :: HasCallStack => BufferedInput -> Source Text
- sourceJSONFromBuffered :: forall a. (JSON a, HasCallStack) => BufferedInput -> Source a
- sourceParserFromBuffered :: HasCallStack => Parser a -> BufferedInput -> Source a
- sourceParseChunkFromBuffered :: (HasCallStack, Print e) => (Bytes -> Result e a) -> BufferedInput -> Source a
- sinkToIO :: HasCallStack => (a -> IO ()) -> Sink a
- sinkToList :: IO (MVar [a], Sink a)
- initSinkToFile :: HasCallStack => CBytes -> Resource (Sink Bytes)
- sinkToBuffered :: HasCallStack => BufferedOutput -> Sink Bytes
- sinkBuilderToBuffered :: HasCallStack => BufferedOutput -> Sink (Builder a)
- newReChunk :: Int -> IO (BIO Bytes Bytes)
- newUTF8Decoder :: HasCallStack => IO (BIO Bytes Text)
- newParser :: HasCallStack => Parser a -> IO (BIO Bytes a)
- newMagicSplitter :: Word8 -> IO (BIO Bytes Bytes)
- newLineSplitter :: IO (BIO Bytes Bytes)
- newBase64Encoder :: IO (BIO Bytes Bytes)
- newBase64Decoder :: HasCallStack => IO (BIO Bytes Bytes)
- hexEncode :: Bool -> BIO Bytes Bytes
- newHexDecoder :: IO (BIO Bytes Bytes)
- counter :: Counter -> BIO a a
- seqNum :: Counter -> BIO a (Int, a)
- newGrouping :: Vec v a => Int -> IO (BIO a (v a))
- ungrouping :: BIO (Vector a) a
- consumed :: TVar Bool -> BIO a a
- zip :: BIO a b -> BIO a c -> BIO a (b, c)
- newTQueuePair :: Int -> IO (Sink a, Source a)
- newTBQueuePair :: Int -> Natural -> IO (Sink a, Source a)
- newBroadcastTChanPair :: Int -> IO (Sink a, IO (Source a))
- newCompress :: HasCallStack => CompressConfig -> IO (ZStream, BIO Bytes Bytes)
- compressReset :: ZStream -> IO ()
- data CompressConfig = CompressConfig {}
- defaultCompressConfig :: CompressConfig
- newDecompress :: DecompressConfig -> IO (ZStream, BIO Bytes Bytes)
- decompressReset :: ZStream -> IO ()
- data DecompressConfig = DecompressConfig {}
- defaultDecompressConfig :: DecompressConfig
- type MemLevel = CInt
- defaultMemLevel :: MemLevel
- data ZStream
The BIO type
= (Maybe out -> IO ()) | Pass |
-> Maybe inp |
|
-> IO () |
A BIO
(blocked IO) node.
A BIO
node is a push based stream transformer. It can be used to describe different kinds of IO
devices:
BIO inp out
describe an IO state machine(e.g. z_stream in zlib), which takes some input in block, then outputs.type Source out = BIO Void out
described an IO source, which never takes input, but gives output until EOF by looping.type Sink inp = BIO inp Void
described an IO sink, which takes input and perform some IO effects, such as writing to terminal or files.
You can connect these BIO
nodes with >|>
, which connect left node's output to right node's input,
and return a new BIO
node with left node's input type and right node's output type.
You can run a BIO
node in different ways:
step
/step_
to supply a single chunk of input and step the BIO node.run
/run_
will supply EOF directly, which will effectively pull all values from source, and push to sink until source reaches EOF.runBlock
/runBlock_
will supply a single block of input as whole input and run the BIO node.runBlocks
/runBlocks_
will supply a list of blocks as whole input and run the BIO node.
Note BIO
usually contains some IO states, you can consider it as an opaque IORef
:
- You shouldn't use a
BIO
node across multipleBIO
chain unless the state can be reset. - You shouldn't use a
BIO
node across multiple threads unless document states otherwise.
BIO
is simply a convenient way to construct single-thread streaming computation, to use BIO
in multiple threads, check Z.IO.BIO.Concurrent module.
Basic combinators
appendSource :: HasCallStack => Source a -> Source a -> Source a Source #
Connect two BIO
source, after first reach EOF, draw elements from second.
concatSource :: HasCallStack => [Source a] -> Source a Source #
Connect list of BIO
sources, after one reach EOF, draw element from next.
concatSource' :: HasCallStack => Source (Source a) -> Source a Source #
Connect list of BIO
sources, after one reach EOF, draw element from next.
Run BIO chain
step :: HasCallStack => BIO inp out -> inp -> IO [out] Source #
Supply a single chunk of input to a BIO
and collect result.
step_ :: HasCallStack => BIO inp out -> inp -> IO () Source #
Supply a single chunk of input to a BIO
without collecting result.
runBlock :: HasCallStack => BIO inp out -> inp -> IO [out] Source #
Run a BIO
loop with a single chunk of input and EOF, and collect result.
runBlock_ :: HasCallStack => BIO inp out -> inp -> IO () Source #
Run a BIO
loop with a single chunk of input and EOF, without collecting result.
unsafeRunBlock :: HasCallStack => IO (BIO inp out) -> inp -> [out] Source #
runBlocks_ :: HasCallStack => BIO inp out -> [inp] -> IO () Source #
unsafeRunBlocks :: HasCallStack => IO (BIO inp out) -> [inp] -> [out] Source #
Wrap runBlocks
into a pure interface.
Similar to unsafeRunBlock
, but with a list of input blocks.
Make new BIO
fromPure :: (a -> b) -> BIO a b Source #
BIO node from a pure function.
BIO node made with this funtion are stateless, thus can be reused across chains.
fromIO :: HasCallStack => (a -> IO b) -> BIO a b Source #
BIO node from an IO function.
BIO node made with this funtion may not be stateless, it depends on if the IO function use IO state.
filter :: (a -> Bool) -> BIO a a Source #
BIO node from a pure filter.
BIO node made with this funtion are stateless, thus can be reused across chains.
filterIO :: (a -> IO Bool) -> BIO a a Source #
BIO node from an impure filter.
BIO node made with this funtion may not be stateless, it depends on if the IO function use
Use with fold
Source
initSourceFromFile :: HasCallStack => CBytes -> Resource (Source Bytes) Source #
Turn a file into a Bytes
source.
initSourceFromFile' :: HasCallStack => CBytes -> Int -> Resource (Source Bytes) Source #
Turn a file into a Bytes
source with given chunk size.
sourceFromIO :: HasCallStack => IO (Maybe a) -> Source a Source #
sourceFromBuffered :: HasCallStack => BufferedInput -> Source Bytes Source #
Turn a BufferedInput
into BIO
source, map EOF to EOF.
sourceTextFromBuffered :: HasCallStack => BufferedInput -> Source Text Source #
Turn a UTF8 encoded BufferedInput
into BIO
source, map EOF to EOF.
sourceJSONFromBuffered :: forall a. (JSON a, HasCallStack) => BufferedInput -> Source a Source #
Turn a JSON
encoded BufferedInput
into BIO
source, ignoring any
whitespaces bewteen JSON objects. If EOF reached, then return EOF
.
Throw OtherError
with name EJSON if JSON value is not parsed or converted.
sourceParserFromBuffered :: HasCallStack => Parser a -> BufferedInput -> Source a Source #
Turn buffered input device into a packet source, throw OtherError
with name EPARSE
if parsing fail.
sourceParseChunkFromBuffered :: (HasCallStack, Print e) => (Bytes -> Result e a) -> BufferedInput -> Source a Source #
Turn buffered input device into a packet source, throw OtherError
with name EPARSE
if parsing fail.
Sink
sinkToList :: IO (MVar [a], Sink a) Source #
Sink to a list in memory.
The MVar
will be empty during sinking, and will be filled after sink receives an EOF.
initSinkToFile :: HasCallStack => CBytes -> Resource (Sink Bytes) Source #
sinkToBuffered :: HasCallStack => BufferedOutput -> Sink Bytes Source #
Turn a BufferedOutput
into a Bytes
sink.
sinkBuilderToBuffered :: HasCallStack => BufferedOutput -> Sink (Builder a) Source #
Turn a BufferedOutput
into a Builder
sink.
Bytes specific
Make a chunk size divider.
A divider size divide each chunk's size to the nearest multiplier to granularity, last trailing chunk is directly returned.
newUTF8Decoder :: HasCallStack => IO (BIO Bytes Text) Source #
Make a new UTF8 decoder, which decode bytes streams into text streams.
If there're invalid UTF8 bytes, an OtherError
with name EINVALIDUTF8
will be thrown.`
Note this node is supposed to be used with preprocess node such as decompressor, parser, etc.
where bytes boundary cannot be controlled, UTF8 decoder will concat trailing bytes from last block to next one.
Use this node directly with sourceFromBuffered
will not be as efficient as directly use
sourceTextFromBuffered
, because BufferedInput
provides push back capability,
trailing bytes can be pushed back to reading buffer then returned with next block input together.
newParser :: HasCallStack => Parser a -> IO (BIO Bytes a) Source #
Read buffer and parse with Parser
.
This function will turn a Parser
into a BIO
, throw OtherError
with name EPARSE
if parsing fail.
newMagicSplitter :: Word8 -> IO (BIO Bytes Bytes) Source #
Make a new stream splitter based on magic byte.
newLineSplitter :: IO (BIO Bytes Bytes) Source #
Make a new stream splitter based on linefeed(rn
or n
).
The result bytes doesn't contain linefeed.
newBase64Decoder :: HasCallStack => IO (BIO Bytes Bytes) Source #
Make a new base64 decoder node.
Make a hex encoder node.
Hex encoder is stateless, it can be reused across chains.
Generic BIO
counter :: Counter -> BIO a a Source #
Make a new BIO node which counts items flow throught it.
Counter
is increased atomically, it's safe to read / reset the counter from other threads.
seqNum :: Counter -> BIO a (Int, a) Source #
Make a new BIO node which counts items, and label item with a sequence number.
Counter
is increased atomically, it's safe to read / reset the counter from other threads.
newGrouping :: Vec v a => Int -> IO (BIO a (v a)) Source #
Make a BIO node grouping items into fixed size arrays.
Trailing items are directly returned.
ungrouping :: BIO (Vector a) a Source #
A BIO node flatten items.
Concurrent helpers
zip :: BIO a b -> BIO a c -> BIO a (b, c) Source #
Zip two BIO node by running them concurrently.
This implementation use MVar
to synchronize two BIO's output, which has some implications:
- Two node should output same numebr of results.
- If the number differs, one node maybe
Make an unbounded queue and a pair of sink and souce connected to it.
Make an bounded queue and a pair of sink and souce connected to it.
newBroadcastTChanPair Source #
Make a broadcast chan and a sink connected to it, and a function return sources to receive broadcast message.
Zlib BIO
newCompress :: HasCallStack => CompressConfig -> IO (ZStream, BIO Bytes Bytes) Source #
Make a new compress node.
The returned BIO
node can be reused only if you call compressReset
on the ZStream
.
compressReset :: ZStream -> IO () Source #
Reset compressor's state so that related BIO
can be reused.
data CompressConfig Source #
Instances
newDecompress :: DecompressConfig -> IO (ZStream, BIO Bytes Bytes) Source #
Make a new decompress node.
The returned BIO
node can be reused only if you call decompressReset
on the ZStream
.
decompressReset :: ZStream -> IO () Source #
Reset decompressor's state so that related BIO
can be reused.
data DecompressConfig Source #
Instances
The MemLevel
specifies how much memory should be allocated for the internal compression state. 1 uses minimum memory but is slow and reduces compression ratio; 9 uses maximum memory for optimal speed. The default value is 8.