module Data.Store.Streaming
(
Message (..)
, encodeMessage
, PeekMessage
, FillByteBuffer
, peekMessage
, decodeMessage
, peekMessageBS
, decodeMessageBS
#ifndef mingw32_HOST_OS
, ReadMoreData(..)
, peekMessageFd
, decodeMessageFd
#endif
, conduitEncode
, conduitDecode
) where
import Control.Exception (throwIO)
import Control.Monad (unless)
import Control.Monad.IO.Class
import Control.Monad.Trans.Resource (MonadResource)
import Data.ByteString (ByteString)
import qualified Data.Conduit as C
import qualified Data.Conduit.List as C
import Data.Store
import Data.Store.Impl (getSize)
import Data.Store.Core (decodeIOWithFromPtr, unsafeEncodeWith)
import qualified Data.Text as T
import Data.Word
import Foreign.Ptr
import Prelude
import System.IO.ByteBuffer (ByteBuffer)
import qualified System.IO.ByteBuffer as BB
import Control.Monad.Trans.Free.Church (FT, iterTM, wrap)
import Control.Monad.Trans.Maybe (MaybeT(MaybeT), runMaybeT)
import Control.Monad.Trans.Class (lift)
import System.Posix.Types (Fd(..))
import GHC.Conc (threadWaitRead)
import Data.Store.Streaming.Internal
newtype Message a = Message { fromMessage :: a } deriving (Eq, Show)
encodeMessage :: Store a => Message a -> ByteString
encodeMessage (Message x) =
unsafeEncodeWith pokeFunc totalLength
where
bodyLength = getSize x
totalLength = headerLength + bodyLength
pokeFunc = do
poke messageMagic
poke bodyLength
poke x
type PeekMessage i m a = FT ((->) i) m a
needMoreInput :: Monad m => PeekMessage i m i
needMoreInput = wrap return
type FillByteBuffer i m = ByteBuffer -> Int -> i -> m ()
decodeFromPtr :: (MonadIO m, Store a) => Ptr Word8 -> Int -> m a
decodeFromPtr ptr n = liftIO $ decodeIOWithFromPtr peek ptr n
peekSized :: (MonadIO m, Store a) => FillByteBuffer i m -> ByteBuffer -> Int -> PeekMessage i m a
peekSized fill bb n = go
where
go = do
mbPtr <- BB.unsafeConsume bb n
case mbPtr of
Left needed -> do
inp <- needMoreInput
lift (fill bb needed inp)
go
Right ptr -> decodeFromPtr ptr n
peekMessageHeader :: MonadIO m => FillByteBuffer i m -> ByteBuffer -> PeekMessage i m SizeTag
peekMessageHeader fill bb = go
where
go = do
messageMagic' <- peekSized fill bb magicLength
unless (messageMagic == messageMagic') $
liftIO . throwIO $ PeekException 0 . T.pack $ "Wrong message magic, " ++ show messageMagic'
peekSized fill bb sizeTagLength
peekMessage :: (MonadIO m, Store a) => FillByteBuffer i m -> ByteBuffer -> PeekMessage i m (Message a)
peekMessage fill bb =
fmap Message (peekSized fill bb =<< peekMessageHeader fill bb)
decodeMessage :: (Store a, MonadIO m) => FillByteBuffer i m -> ByteBuffer -> m (Maybe i) -> m (Maybe (Message a))
decodeMessage fill bb getInp = do
mbRes <- runMaybeT (iterTM (\consumeInp -> consumeInp =<< MaybeT getInp) (peekMessage fill bb))
case mbRes of
Just x -> return (Just x)
Nothing -> do
available <- BB.availableBytes bb
unless (available == 0) $ liftIO $ throwIO $ PeekException available $ T.pack $
"Data.Store.Streaming.decodeMessage: could not get enough bytes to decode message"
return Nothing
peekMessageBS :: (MonadIO m, Store a) => ByteBuffer -> PeekMessage ByteString m (Message a)
peekMessageBS = peekMessage (\bb _ bs -> BB.copyByteString bb bs)
decodeMessageBS :: (MonadIO m, Store a)
=> ByteBuffer -> m (Maybe ByteString) -> m (Maybe (Message a))
decodeMessageBS = decodeMessage (\bb _ bs -> BB.copyByteString bb bs)
#ifndef mingw32_HOST_OS
data ReadMoreData = ReadMoreData
deriving (Eq, Show)
peekMessageFd :: (MonadIO m, Store a) => ByteBuffer -> Fd -> PeekMessage ReadMoreData m (Message a)
peekMessageFd bb fd =
peekMessage (\bb_ needed ReadMoreData -> do _ <- BB.fillFromFd bb_ fd needed; return ()) bb
decodeMessageFd :: (MonadIO m, Store a) => ByteBuffer -> Fd -> m (Message a)
decodeMessageFd bb fd = do
mbMsg <- decodeMessage
(\bb_ needed ReadMoreData -> do _ <- BB.fillFromFd bb_ fd needed; return ()) bb
(liftIO (threadWaitRead fd) >> return (Just ReadMoreData))
case mbMsg of
Just msg -> return msg
Nothing -> liftIO (fail "decodeMessageFd: impossible: got Nothing")
#endif
conduitEncode :: (Monad m, Store a) => C.Conduit (Message a) m ByteString
conduitEncode = C.map encodeMessage
conduitDecode :: (MonadIO m, MonadResource m, Store a)
=> Maybe Int
-> C.Conduit ByteString m (Message a)
conduitDecode bufSize =
C.bracketP
(BB.new bufSize)
BB.free
go
where
go buffer = do
mmessage <- decodeMessageBS buffer C.await
case mmessage of
Nothing -> return ()
Just message -> C.yield message >> go buffer