module Streamly.Binary
( decodeStream,
decodeStreamGet,
encodeStream,
encodeStreamPut,
)
where
import Control.Exception (Exception)
import Control.Monad.Fail (MonadFail)
import Data.Binary
import Data.Binary.Get
import Data.Binary.Put
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BL
import Streamly (SerialT)
import Streamly.Internal.Data.Pipe.Types
import Streamly.Internal.Prelude (transform)
import qualified Streamly.Prelude as S
decodeStream :: (Binary a, MonadFail m) => SerialT m BS.ByteString -> SerialT m a
decodeStream = decodeStreamGet get
decodeStreamGet :: MonadFail m => Get a -> SerialT m BS.ByteString -> SerialT m a
decodeStreamGet g = transform $ Pipe consume (produce g) (runGetIncremental g)
encodeStream :: (Binary a, MonadFail m) => SerialT m a -> SerialT m BS.ByteString
encodeStream = encodeStreamPut put
encodeStreamPut :: (MonadFail m) => (a -> Put) -> SerialT m a -> SerialT m BS.ByteString
encodeStreamPut p = S.concatMap (S.fromList . BL.toChunks) . S.map (runPut . p)
consume :: MonadFail m => Decoder a -> BS.ByteString -> m (Step (PipeState (Decoder a) (Decoder a)) a)
consume d@Done {} input = return $ Continue (Produce $ pushChunk d input)
consume (Partial f) input =
if BS.null input
then return (Continue (Consume (f Nothing)))
else return (Continue (Produce (f (Just input))))
consume (Fail _ _ msg) _ = fail msg
produce :: MonadFail m => Get a -> Decoder a -> m (Step (PipeState (Decoder a) (Decoder a)) a)
produce g (Done unused _ output) =
if BS.null unused
then return $ Yield output (Consume (runGetIncremental g))
else return $ Yield output (Produce (runGetIncremental g `pushChunk` unused))
produce _ d@(Partial _) = return $ Continue (Consume d)
produce _ (Fail _ _ msg) = fail msg