module Data.Store.Streaming
(
Message (..)
, encodeMessage
, PeekMessage
, FillByteBuffer
, peekMessage
, decodeMessage
, peekMessageBS
, decodeMessageBS
#ifndef mingw32_HOST_OS
, ReadMoreData(..)
, peekMessageFd
, decodeMessageFd
#endif
, conduitEncode
, conduitDecode
) where
import Control.Exception (throwIO)
import Control.Monad (unless)
import Control.Monad.IO.Class
import Control.Monad.Trans.Resource (MonadResource)
import Data.ByteString (ByteString)
import qualified Data.Conduit as C
import qualified Data.Conduit.List as C
import Data.Store
import Data.Store.Impl (getSize)
import Data.Store.Core (decodeIOWithFromPtr, unsafeEncodeWith)
import qualified Data.Text as T
import Data.Word
import Foreign.Ptr
import Prelude
import System.IO.ByteBuffer (ByteBuffer)
import qualified System.IO.ByteBuffer as BB
import Control.Monad.Trans.Free.Church (FT, iterTM, wrap)
import Control.Monad.Trans.Maybe (MaybeT(MaybeT), runMaybeT)
import Control.Monad.Trans.Class (lift)
import System.Posix.Types (Fd(..))
import GHC.Conc (threadWaitRead)
import Data.Store.Streaming.Internal
newtype Message a = Message { fromMessage :: a } deriving (Eq, Show)
encodeMessage :: Store a => Message a -> ByteString
encodeMessage (Message x) =
unsafeEncodeWith pokeFunc totalLength
where
bodyLength = getSize x
totalLength = headerLength + bodyLength
pokeFunc = do
poke messageMagic
poke bodyLength
poke x
type PeekMessage i m a = FT ((->) i) m a
needMoreInput :: PeekMessage i m i
needMoreInput = wrap return
type FillByteBuffer i m = ByteBuffer -> Int -> i -> m ()
decodeFromPtr :: (MonadIO m, Store a) => Ptr Word8 -> Int -> m a
decodeFromPtr ptr n = liftIO $ decodeIOWithFromPtr peek ptr n
peekSized :: (MonadIO m, Store a) => FillByteBuffer i m -> ByteBuffer -> Int -> PeekMessage i m a
peekSized fill bb n = go
where
go = do
mbPtr <- BB.unsafeConsume bb n
case mbPtr of
Left needed -> do
inp <- needMoreInput
lift (fill bb needed inp)
go
Right ptr -> decodeFromPtr ptr n
peekMessageMagic :: MonadIO m => FillByteBuffer i m -> ByteBuffer -> PeekMessage i m ()
peekMessageMagic fill bb =
peekSized fill bb magicLength >>= \case
mm | mm == messageMagic -> return ()
mm -> liftIO . throwIO $ PeekException 0 . T.pack $
"Wrong message magic, " ++ show mm
peekMessageSizeTag :: MonadIO m => FillByteBuffer i m -> ByteBuffer -> PeekMessage i m SizeTag
peekMessageSizeTag fill bb = peekSized fill bb sizeTagLength
peekMessage :: (MonadIO m, Store a) => FillByteBuffer i m -> ByteBuffer -> PeekMessage i m (Message a)
peekMessage fill bb =
fmap Message $ do
peekMessageMagic fill bb
peekMessageSizeTag fill bb >>= peekSized fill bb
decodeMessage :: (Store a, MonadIO m) => FillByteBuffer i m -> ByteBuffer -> m (Maybe i) -> m (Maybe (Message a))
decodeMessage fill bb getInp =
maybeDecode (peekMessageMagic fill bb) >>= \case
Just () -> maybeDecode (peekMessageSizeTag fill bb >>= peekSized fill bb) >>= \case
Just x -> return (Just (Message x))
Nothing -> do
available <- BB.availableBytes bb
liftIO $ throwIO $ PeekException available $ T.pack
"Data.Store.Streaming.decodeMessage: could not get enough bytes to decode message"
Nothing -> do
available <- BB.availableBytes bb
unless (available == 0) $ liftIO $ throwIO $ PeekException available $ T.pack
"Data.Store.Streaming.decodeMessage: could not get enough bytes to decode message"
return Nothing
where
maybeDecode m = runMaybeT (iterTM (\consumeInp -> consumeInp =<< MaybeT getInp) m)
peekMessageBS :: (MonadIO m, Store a) => ByteBuffer -> PeekMessage ByteString m (Message a)
peekMessageBS = peekMessage (\bb _ bs -> BB.copyByteString bb bs)
decodeMessageBS :: (MonadIO m, Store a)
=> ByteBuffer -> m (Maybe ByteString) -> m (Maybe (Message a))
decodeMessageBS = decodeMessage (\bb _ bs -> BB.copyByteString bb bs)
#ifndef mingw32_HOST_OS
data ReadMoreData = ReadMoreData
deriving (Eq, Show)
peekMessageFd :: (MonadIO m, Store a) => ByteBuffer -> Fd -> PeekMessage ReadMoreData m (Message a)
peekMessageFd bb fd =
peekMessage (\bb_ needed ReadMoreData -> do _ <- BB.fillFromFd bb_ fd needed; return ()) bb
decodeMessageFd :: (MonadIO m, Store a) => ByteBuffer -> Fd -> m (Message a)
decodeMessageFd bb fd = do
mbMsg <- decodeMessage
(\bb_ needed ReadMoreData -> do _ <- BB.fillFromFd bb_ fd needed; return ()) bb
(liftIO (threadWaitRead fd) >> return (Just ReadMoreData))
case mbMsg of
Just msg -> return msg
Nothing -> liftIO (fail "decodeMessageFd: impossible: got Nothing")
#endif
conduitEncode :: (Monad m, Store a) => C.Conduit (Message a) m ByteString
conduitEncode = C.map encodeMessage
conduitDecode :: (MonadResource m, Store a)
=> Maybe Int
-> C.Conduit ByteString m (Message a)
conduitDecode bufSize =
C.bracketP
(BB.new bufSize)
BB.free
go
where
go buffer = do
mmessage <- decodeMessageBS buffer C.await
case mmessage of
Nothing -> return ()
Just message -> C.yield message >> go buffer