module Data.Store.Streaming
(
Message (..)
, encodeMessage
, PeekMessage (..)
, peekMessage
, decodeMessage
, conduitEncode
, conduitDecode
) where
import Control.Exception (assert, throwIO)
import Control.Monad (liftM)
import Control.Monad.IO.Class
import Control.Monad.Trans.Resource (MonadResource)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Internal as BS
import qualified Data.Conduit as C
import qualified Data.Conduit.List as C
import Data.Store
import Data.Store.Impl (getSize)
import Data.Store.Core (Poke(..), tooManyBytes, decodeIOWithFromPtr)
import qualified Data.Text as T
import Data.Word
import Foreign.Ptr
import qualified Foreign.Storable as Storable
import Prelude
import System.IO.ByteBuffer (ByteBuffer)
import qualified System.IO.ByteBuffer as BB
newtype Message a = Message { fromMessage :: a } deriving (Eq, Show)
type SizeTag = Int
messageMagic :: Word64
messageMagic = 18205256374652458875
magicLength :: Int
magicLength = Storable.sizeOf messageMagic
sizeTagLength :: Int
sizeTagLength = Storable.sizeOf (undefined :: SizeTag)
headerLength :: Int
headerLength = magicLength + sizeTagLength
encodeMessage :: Store a => Message a -> ByteString
encodeMessage (Message x) =
let l = getSize x
totalLength = headerLength + l
in BS.unsafeCreate
totalLength
(\p -> do (offset, ()) <- runPoke (do poke messageMagic
poke l
poke x
) p 0
assert (offset == totalLength) (return ()))
data PeekMessage m a = Done (Message a)
| NeedMoreInput (ByteString -> m (PeekMessage m a))
peekSized :: (MonadIO m, Store a) => ByteBuffer -> Int -> m (PeekMessage m a)
peekSized bb n =
BB.unsafeConsume bb n >>= \case
Right ptr -> liftM (Done . Message) $ decodeFromPtr ptr n
Left _ -> return $ NeedMoreInput (\ bs -> BB.copyByteString bb bs
>> peekSized bb n)
peekMessageHeader :: MonadIO m => ByteBuffer -> m (PeekMessage m SizeTag)
peekMessageHeader bb = do
available <- BB.availableBytes bb
if available < headerLength
then return $ NeedMoreInput (\bs -> BB.copyByteString bb bs
>> peekMessageHeader bb)
else peekSized bb magicLength >>= \case
Done (Message x) | x == messageMagic -> peekSized bb sizeTagLength
Done (Message x) -> liftIO . throwIO $ PeekException 0 . T.pack $ "Wrong message magic, " ++ show x
NeedMoreInput _ -> fail "Internal error in peekMessageHeader."
peekMessage :: (MonadIO m, Store a) => ByteBuffer -> m (PeekMessage m a)
peekMessage bb =
peekMessageHeader bb >>= \case
(Done (Message n)) -> peekSized bb n
NeedMoreInput _ ->
return $ NeedMoreInput (\ bs -> BB.copyByteString bb bs
>> peekMessage bb)
decodeMessage :: (MonadIO m, Store a)
=> ByteBuffer -> m (Maybe ByteString) -> m (Maybe (Message a))
decodeMessage bb getBs =
decodeHeader bb getBs >>= \case
Nothing -> return Nothing
Just n -> decodeSized bb getBs n
decodeHeader :: MonadIO m
=> ByteBuffer
-> m (Maybe ByteString)
-> m (Maybe SizeTag)
decodeHeader bb getBs =
peekMessageHeader bb >>= \case
(Done (Message n)) -> return (Just n)
(NeedMoreInput _) -> getBs >>= \case
Just bs -> BB.copyByteString bb bs >> decodeHeader bb getBs
Nothing -> BB.availableBytes bb >>= \case
0 -> return Nothing
n -> liftIO $ tooManyBytes headerLength n "Data.Store.Message.SizeTag"
decodeSized :: (MonadIO m, Store a)
=> ByteBuffer
-> m (Maybe ByteString)
-> Int
-> m (Maybe (Message a))
decodeSized bb getBs n =
peekSized bb n >>= \case
Done message -> return (Just message)
NeedMoreInput _ -> getBs >>= \case
Just bs -> BB.copyByteString bb bs >> decodeSized bb getBs n
Nothing -> BB.availableBytes bb >>= \ available ->
liftIO $ tooManyBytes n available "Data.Store.Message.Message"
decodeFromPtr :: (MonadIO m, Store a) => Ptr Word8 -> Int -> m a
decodeFromPtr ptr n = liftIO $ decodeIOWithFromPtr peek ptr n
conduitEncode :: (Monad m, Store a) => C.Conduit (Message a) m ByteString
conduitEncode = C.map encodeMessage
conduitDecode :: (MonadIO m, MonadResource m, Store a)
=> Maybe Int
-> C.Conduit ByteString m (Message a)
conduitDecode bufSize =
C.bracketP
(BB.new bufSize)
BB.free
go
where
go buffer = do
mmessage <- decodeMessage buffer C.await
case mmessage of
Nothing -> return ()
Just message -> C.yield message >> go buffer