module Network.Mom.Stompl.Client.Stream
where
import qualified Data.Conduit as C
import Data.Conduit ((.|))
import Data.Conduit.Network (AppData)
import Data.Conduit.Network as N
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.UTF8 as U
import Network.Mom.Stompl.Parser (stompParser)
import qualified Network.Mom.Stompl.Frame as F
import Network.Mom.Stompl.Client.Exception
import Control.Monad (forever)
import Control.Monad.Trans (liftIO)
import Control.Monad.IO.Class (MonadIO)
import Control.Concurrent
import qualified Data.Attoparsec.ByteString as A
type EH = StomplException -> IO ()
maxStep :: Int
maxStep :: Int
maxStep = Int
1024
sender :: AppData -> Chan F.Frame -> IO ()
sender :: AppData -> Chan Frame -> IO ()
sender AppData
ad Chan Frame
ip = ConduitT () Void (ResourceT IO) () -> IO ()
forall (m :: * -> *) r.
MonadUnliftIO m =>
ConduitT () Void (ResourceT m) r -> m r
C.runConduitRes (Chan Frame -> ConduitT () Frame (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
Chan Frame -> ConduitT () Frame m ()
pipeSource Chan Frame
ip ConduitT () Frame (ResourceT IO) ()
-> ConduitM Frame Void (ResourceT IO) ()
-> ConduitT () Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT Frame ByteString (ResourceT IO) ()
forall (m :: * -> *). MonadIO m => ConduitT Frame ByteString m ()
stream ConduitT Frame ByteString (ResourceT IO) ()
-> ConduitM ByteString Void (ResourceT IO) ()
-> ConduitM Frame Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| AppData -> ConduitM ByteString Void (ResourceT IO) ()
forall ad (m :: * -> *) o.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT ByteString o m ()
N.appSink AppData
ad)
receiver :: AppData -> Chan F.Frame -> EH -> IO ()
receiver :: AppData -> Chan Frame -> EH -> IO ()
receiver AppData
ad Chan Frame
ip EH
eh = ConduitT () Void (ResourceT IO) () -> IO ()
forall (m :: * -> *) r.
MonadUnliftIO m =>
ConduitT () Void (ResourceT m) r -> m r
C.runConduitRes (AppData -> ConduitT () ByteString (ResourceT IO) ()
forall ad (m :: * -> *) i.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT i ByteString m ()
appSource AppData
ad ConduitT () ByteString (ResourceT IO) ()
-> ConduitM ByteString Void (ResourceT IO) ()
-> ConduitT () Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| EH -> ConduitT ByteString Frame (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
EH -> ConduitT ByteString Frame m ()
parseC EH
eh ConduitT ByteString Frame (ResourceT IO) ()
-> ConduitM Frame Void (ResourceT IO) ()
-> ConduitM ByteString Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| Chan Frame -> ConduitM Frame Void (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
Chan Frame -> ConduitT Frame Void m ()
pipeSink Chan Frame
ip)
pipeSink :: MonadIO m => Chan F.Frame -> C.ConduitT F.Frame C.Void m ()
pipeSink :: Chan Frame -> ConduitT Frame Void m ()
pipeSink Chan Frame
ch = (Frame -> ConduitT Frame Void m ()) -> ConduitT Frame Void m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
C.awaitForever (IO () -> ConduitT Frame Void m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT Frame Void m ())
-> (Frame -> IO ()) -> Frame -> ConduitT Frame Void m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Chan Frame -> Frame -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan Frame
ch)
pipeSource :: MonadIO m => Chan F.Frame -> C.ConduitT () F.Frame m ()
pipeSource :: Chan Frame -> ConduitT () Frame m ()
pipeSource Chan Frame
ch = ConduitT () Frame m () -> ConduitT () Frame m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO Frame -> ConduitT () Frame m Frame
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Chan Frame -> IO Frame
forall a. Chan a -> IO a
readChan Chan Frame
ch) ConduitT () Frame m Frame
-> (Frame -> ConduitT () Frame m ()) -> ConduitT () Frame m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Frame -> ConduitT () Frame m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield)
stream :: MonadIO m => C.ConduitT F.Frame B.ByteString m ()
stream :: ConduitT Frame ByteString m ()
stream = (Frame -> ConduitT Frame ByteString m ())
-> ConduitT Frame ByteString m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
C.awaitForever (ByteString -> ConduitT Frame ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield (ByteString -> ConduitT Frame ByteString m ())
-> (Frame -> ByteString) -> Frame -> ConduitT Frame ByteString m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Frame -> ByteString
F.putFrame)
parseC :: MonadIO m => EH -> C.ConduitT B.ByteString F.Frame m ()
parseC :: EH -> ConduitT ByteString Frame m ()
parseC EH
eh = ConduitT ByteString Frame m ()
goOn
where goOn :: ConduitT ByteString Frame m ()
goOn = (ByteString -> Result Frame)
-> Int -> ConduitT ByteString Frame m ()
go (Parser Frame -> ByteString -> Result Frame
forall a. Parser a -> ByteString -> Result a
A.parse Parser Frame
stompParser) Int
0
go :: (ByteString -> Result Frame)
-> Int -> ConduitT ByteString Frame m ()
go ByteString -> Result Frame
prs Int
step = do
Maybe ByteString
mbNew <- ConduitT ByteString Frame m (Maybe ByteString)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await
case Maybe ByteString
mbNew of
Maybe ByteString
Nothing -> () -> ConduitT ByteString Frame m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just ByteString
s -> case (ByteString -> Result Frame)
-> ByteString
-> Either String (ByteString -> Result Frame, [Frame])
parseAll ByteString -> Result Frame
prs ByteString
s of
Left String
e -> IO () -> ConduitT ByteString Frame m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (EH
eh EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ProtocolException String
e)
ConduitT ByteString Frame m ()
-> ConduitT ByteString Frame m () -> ConduitT ByteString Frame m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT ByteString Frame m ()
goOn
Right (ByteString -> Result Frame
prs', [Frame]
fs) -> do
Int
step' <- if [Frame] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Frame]
fs then Int -> ConduitT ByteString Frame m Int
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
stepInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
else (Frame -> ConduitT ByteString Frame m ())
-> [Frame] -> ConduitT ByteString Frame m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Frame -> ConduitT ByteString Frame m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield [Frame]
fs ConduitT ByteString Frame m ()
-> ConduitT ByteString Frame m Int
-> ConduitT ByteString Frame m Int
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
Int -> ConduitT ByteString Frame m Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
0
if Int
step' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxStep
then IO () -> ConduitT ByteString Frame m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (EH
eh EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ProtocolException
String
"Message too long!")
else (ByteString -> Result Frame)
-> Int -> ConduitT ByteString Frame m ()
go ByteString -> Result Frame
prs' Int
step'
type Parser = B.ByteString -> A.Result F.Frame
parseAll :: Parser -> B.ByteString ->
Either String (Parser, [F.Frame])
parseAll :: (ByteString -> Result Frame)
-> ByteString
-> Either String (ByteString -> Result Frame, [Frame])
parseAll ByteString -> Result Frame
prs ByteString
s = case ByteString -> Result Frame
prs ByteString
s of
A.Fail ByteString
_ [String]
_ String
e -> String -> Either String (ByteString -> Result Frame, [Frame])
forall a b. a -> Either a b
Left (String -> Either String (ByteString -> Result Frame, [Frame]))
-> String -> Either String (ByteString -> Result Frame, [Frame])
forall a b. (a -> b) -> a -> b
$ ByteString -> String
U.toString ByteString
s String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
": " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
e
r :: Result Frame
r@(A.Partial ByteString -> Result Frame
_) -> (ByteString -> Result Frame, [Frame])
-> Either String (ByteString -> Result Frame, [Frame])
forall a b. b -> Either a b
Right (Result Frame -> ByteString -> Result Frame
forall i r. Monoid i => IResult i r -> i -> IResult i r
A.feed Result Frame
r, [])
A.Done ByteString
s' Frame
f ->
if ByteString -> Bool
B.null ByteString
s'
then (ByteString -> Result Frame, [Frame])
-> Either String (ByteString -> Result Frame, [Frame])
forall a b. b -> Either a b
Right (Parser Frame -> ByteString -> Result Frame
forall a. Parser a -> ByteString -> Result a
A.parse Parser Frame
stompParser, [Frame
f])
else case (ByteString -> Result Frame)
-> ByteString
-> Either String (ByteString -> Result Frame, [Frame])
parseAll (Parser Frame -> ByteString -> Result Frame
forall a. Parser a -> ByteString -> Result a
A.parse Parser Frame
stompParser) ByteString
s' of
Left String
e -> String -> Either String (ByteString -> Result Frame, [Frame])
forall a b. a -> Either a b
Left String
e
Right (ByteString -> Result Frame
prs', [Frame]
fs) -> (ByteString -> Result Frame, [Frame])
-> Either String (ByteString -> Result Frame, [Frame])
forall a b. b -> Either a b
Right (ByteString -> Result Frame
prs',Frame
fFrame -> [Frame] -> [Frame]
forall a. a -> [a] -> [a]
:[Frame]
fs)