module Network.Mom.Stompl.Client.Stream
where

  import qualified Data.Conduit as C
  import           Data.Conduit ((.|))

  import           Data.Conduit.Network (AppData)  
  import           Data.Conduit.Network  as N

  import qualified Data.ByteString.Char8 as B
  import qualified Data.ByteString.UTF8  as U

  import           Network.Mom.Stompl.Parser (stompParser)
  import qualified Network.Mom.Stompl.Frame as F
  import           Network.Mom.Stompl.Client.Exception

  import           Control.Monad (forever)
  import           Control.Monad.Trans (liftIO)
  import           Control.Monad.IO.Class (MonadIO)
  import           Control.Concurrent

  import qualified Data.Attoparsec.ByteString as A 

  ------------------------------------------------------------------------
  -- Error Handler
  ------------------------------------------------------------------------
  type EH = StomplException -> IO ()

  ------------------------------------------------------------------------
  -- A TCP/IP fragment read by the Conduit Client has 4096 bytes.
  -- We allow 1000 fragments = 1024 * 4096 Bytes = 4MB
  ------------------------------------------------------------------------
  maxStep :: Int
  maxStep :: Int
maxStep = Int
1024

  ------------------------------------------------------------------------
  -- Sender thread: get a Frame from a pipe, convert it into a ByteString
  --                and send it through a socket 
  ------------------------------------------------------------------------
  sender :: AppData -> Chan F.Frame -> IO ()
  sender :: AppData -> Chan Frame -> IO ()
sender AppData
ad Chan Frame
ip =  ConduitT () Void (ResourceT IO) () -> IO ()
forall (m :: * -> *) r.
MonadUnliftIO m =>
ConduitT () Void (ResourceT m) r -> m r
C.runConduitRes (Chan Frame -> ConduitT () Frame (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
Chan Frame -> ConduitT () Frame m ()
pipeSource Chan Frame
ip ConduitT () Frame (ResourceT IO) ()
-> ConduitM Frame Void (ResourceT IO) ()
-> ConduitT () Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT Frame ByteString (ResourceT IO) ()
forall (m :: * -> *). MonadIO m => ConduitT Frame ByteString m ()
stream ConduitT Frame ByteString (ResourceT IO) ()
-> ConduitM ByteString Void (ResourceT IO) ()
-> ConduitM Frame Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| AppData -> ConduitM ByteString Void (ResourceT IO) ()
forall ad (m :: * -> *) o.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT ByteString o m ()
N.appSink AppData
ad)

  ------------------------------------------------------------------------
  -- Receiver thread: get a ByteStream through a socket,
  --                  parse it to a Frame and send it through a pipe
  ------------------------------------------------------------------------
  receiver :: AppData -> Chan F.Frame -> EH -> IO ()
  receiver :: AppData -> Chan Frame -> EH -> IO ()
receiver AppData
ad Chan Frame
ip EH
eh = ConduitT () Void (ResourceT IO) () -> IO ()
forall (m :: * -> *) r.
MonadUnliftIO m =>
ConduitT () Void (ResourceT m) r -> m r
C.runConduitRes (AppData -> ConduitT () ByteString (ResourceT IO) ()
forall ad (m :: * -> *) i.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT i ByteString m ()
appSource AppData
ad ConduitT () ByteString (ResourceT IO) ()
-> ConduitM ByteString Void (ResourceT IO) ()
-> ConduitT () Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| EH -> ConduitT ByteString Frame (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
EH -> ConduitT ByteString Frame m ()
parseC EH
eh ConduitT ByteString Frame (ResourceT IO) ()
-> ConduitM Frame Void (ResourceT IO) ()
-> ConduitM ByteString Void (ResourceT IO) ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| Chan Frame -> ConduitM Frame Void (ResourceT IO) ()
forall (m :: * -> *).
MonadIO m =>
Chan Frame -> ConduitT Frame Void m ()
pipeSink Chan Frame
ip) 

  ------------------------------------------------------------------------
  -- Put a frame into a pipe (a channel)
  ------------------------------------------------------------------------
  pipeSink :: MonadIO m => Chan F.Frame -> C.ConduitT F.Frame C.Void m ()
  pipeSink :: Chan Frame -> ConduitT Frame Void m ()
pipeSink Chan Frame
ch = (Frame -> ConduitT Frame Void m ()) -> ConduitT Frame Void m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
C.awaitForever (IO () -> ConduitT Frame Void m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT Frame Void m ())
-> (Frame -> IO ()) -> Frame -> ConduitT Frame Void m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Chan Frame -> Frame -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan Frame
ch)

  ------------------------------------------------------------------------
  -- Read a frame from a pipe (a channel)
  ------------------------------------------------------------------------
  pipeSource :: MonadIO m => Chan F.Frame -> C.ConduitT () F.Frame m ()
  pipeSource :: Chan Frame -> ConduitT () Frame m ()
pipeSource Chan Frame
ch = ConduitT () Frame m () -> ConduitT () Frame m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO Frame -> ConduitT () Frame m Frame
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Chan Frame -> IO Frame
forall a. Chan a -> IO a
readChan Chan Frame
ch) ConduitT () Frame m Frame
-> (Frame -> ConduitT () Frame m ()) -> ConduitT () Frame m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Frame -> ConduitT () Frame m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield)

  ------------------------------------------------------------------------
  -- Convert a frame to a ByteString
  ------------------------------------------------------------------------
  stream :: MonadIO m => C.ConduitT F.Frame B.ByteString m () 
  stream :: ConduitT Frame ByteString m ()
stream = (Frame -> ConduitT Frame ByteString m ())
-> ConduitT Frame ByteString m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
C.awaitForever (ByteString -> ConduitT Frame ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield (ByteString -> ConduitT Frame ByteString m ())
-> (Frame -> ByteString) -> Frame -> ConduitT Frame ByteString m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Frame -> ByteString
F.putFrame)

  ------------------------------------------------------------------------
  -- Parse a Frame from a ByteString
  ------------------------------------------------------------------------
  parseC :: MonadIO m => EH -> C.ConduitT B.ByteString F.Frame m ()
  parseC :: EH -> ConduitT ByteString Frame m ()
parseC EH
eh = ConduitT ByteString Frame m ()
goOn
    where goOn :: ConduitT ByteString Frame m ()
goOn = (ByteString -> Result Frame)
-> Int -> ConduitT ByteString Frame m ()
go (Parser Frame -> ByteString -> Result Frame
forall a. Parser a -> ByteString -> Result a
A.parse Parser Frame
stompParser) Int
0 -- start with a clean parser
          go :: (ByteString -> Result Frame)
-> Int -> ConduitT ByteString Frame m ()
go ByteString -> Result Frame
prs Int
step = do
            Maybe ByteString
mbNew <- ConduitT ByteString Frame m (Maybe ByteString)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await
            case Maybe ByteString
mbNew of 
              Maybe ByteString
Nothing -> () -> ConduitT ByteString Frame m ()
forall (m :: * -> *) a. Monad m => a -> m a
return () -- socket was closed
              Just ByteString
s  -> case (ByteString -> Result Frame)
-> ByteString
-> Either String (ByteString -> Result Frame, [Frame])
parseAll ByteString -> Result Frame
prs ByteString
s of
                           -- parse error: call the error handler ---------
                           Left String
e -> IO () -> ConduitT ByteString Frame m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (EH
eh EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ProtocolException String
e)
                                     ConduitT ByteString Frame m ()
-> ConduitT ByteString Frame m () -> ConduitT ByteString Frame m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT ByteString Frame m ()
goOn
                           -- we got a result -----------------------------
                           Right (ByteString -> Result Frame
prs', [Frame]
fs) -> do
                             -- Do we have (at least) 1 frame to send? ----
                             Int
step' <- if [Frame] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Frame]
fs then Int -> ConduitT ByteString Frame m Int
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
stepInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1) 
                                                 else (Frame -> ConduitT ByteString Frame m ())
-> [Frame] -> ConduitT ByteString Frame m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Frame -> ConduitT ByteString Frame m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield [Frame]
fs ConduitT ByteString Frame m ()
-> ConduitT ByteString Frame m Int
-> ConduitT ByteString Frame m Int
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
                                                      Int -> ConduitT ByteString Frame m Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
0
                             -- Too many fragments ------------------------
                             if Int
step' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxStep 
                               then IO () -> ConduitT ByteString Frame m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (EH
eh EH -> EH
forall a b. (a -> b) -> a -> b
$ String -> StomplException
ProtocolException 
                                                 String
"Message too long!") 

                             -- Continue with the current parser ----------
                               else (ByteString -> Result Frame)
-> Int -> ConduitT ByteString Frame m ()
go ByteString -> Result Frame
prs' Int
step'

  ------------------------------------------------------------------------
  -- A parser is something that converts a ByteString into a Frame
  ------------------------------------------------------------------------
  type Parser = B.ByteString -> A.Result F.Frame

  ------------------------------------------------------------------------
  -- Continue parsing until we have a complete frame
  ------------------------------------------------------------------------
  parseAll :: Parser -> B.ByteString -> 
              Either String (Parser, [F.Frame])
  parseAll :: (ByteString -> Result Frame)
-> ByteString
-> Either String (ByteString -> Result Frame, [Frame])
parseAll ByteString -> Result Frame
prs ByteString
s = case ByteString -> Result Frame
prs ByteString
s of
                     -- We failed ----------------------------------------
                     A.Fail ByteString
_ [String]
_   String
e  -> String -> Either String (ByteString -> Result Frame, [Frame])
forall a b. a -> Either a b
Left (String -> Either String (ByteString -> Result Frame, [Frame]))
-> String -> Either String (ByteString -> Result Frame, [Frame])
forall a b. (a -> b) -> a -> b
$ ByteString -> String
U.toString ByteString
s String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
": " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
e

                     -- We have a partial result and continue -------------
                     --    feeding this partial result --------------------
                     r :: Result Frame
r@(A.Partial ByteString -> Result Frame
_) -> (ByteString -> Result Frame, [Frame])
-> Either String (ByteString -> Result Frame, [Frame])
forall a b. b -> Either a b
Right (Result Frame -> ByteString -> Result Frame
forall i r. Monoid i => IResult i r -> i -> IResult i r
A.feed Result Frame
r, [])

                     -- We are done ---------------------------------------
                     A.Done ByteString
s' Frame
f     -> 
                       if ByteString -> Bool
B.null ByteString
s' 
                         then (ByteString -> Result Frame, [Frame])
-> Either String (ByteString -> Result Frame, [Frame])
forall a b. b -> Either a b
Right (Parser Frame -> ByteString -> Result Frame
forall a. Parser a -> ByteString -> Result a
A.parse Parser Frame
stompParser, [Frame
f])
                         -- but there may be a leftover -------------------
                         else case (ByteString -> Result Frame)
-> ByteString
-> Either String (ByteString -> Result Frame, [Frame])
parseAll (Parser Frame -> ByteString -> Result Frame
forall a. Parser a -> ByteString -> Result a
A.parse Parser Frame
stompParser) ByteString
s' of
                                Left String
e           -> String -> Either String (ByteString -> Result Frame, [Frame])
forall a b. a -> Either a b
Left String
e
                                Right (ByteString -> Result Frame
prs', [Frame]
fs) -> (ByteString -> Result Frame, [Frame])
-> Either String (ByteString -> Result Frame, [Frame])
forall a b. b -> Either a b
Right (ByteString -> Result Frame
prs',Frame
fFrame -> [Frame] -> [Frame]
forall a. a -> [a] -> [a]
:[Frame]
fs)