module Network.NSQ.Connection
( defaultConfig
, establish
) where
import Control.Monad.Reader
import Control.Monad.State.Strict
import Data.List
import Data.Maybe
import Prelude hiding (log)
import Network.HostName
import qualified Data.Text as T
import qualified Data.ByteString as BS
import qualified Pipes.Network.TCP as PNT
import qualified Pipes.Attoparsec as PA
import qualified Pipes.Prelude as PP
import Pipes
import Control.Applicative
import Control.Concurrent.Async
import Control.Concurrent.STM
import System.Log.Logger (debugM, errorM, warningM, infoM)
import Network.NSQ.Types (NSQConnection(..), Message, Command, server, port, logName, LogName, identConf)
import qualified Network.NSQ.Types as NSQ
import qualified Network.NSQ.Identify as NSQ
import qualified Network.NSQ.Parser as NSQ
defaultConfig :: String -> IO NSQConnection
defaultConfig serverHost = do
localHost <- T.pack <$> getHostName
let localClientId = T.takeWhile (/= '.') localHost
let localIdent = NSQ.defaultIdentify localClientId localHost
return $ NSQConnection serverHost 4150 "NSQ.Connection." localIdent
establish :: NSQConnection -> TQueue Message -> TQueue Command -> IO ()
establish conn topicQueue reply = PNT.withSocketsDo $
PNT.connect (server conn) (show $ port conn) (\(sock, _) -> do
let send = (log "send" $ logName conn) >-> PNT.toSocket sock
let recv = PNT.fromSocket sock 8192 >-> (log "recv" $ logName conn)
establishNSQ conn recv send
race_
(handleNSQ conn recv send topicQueue)
(runEffect $ handleReply reply >-> showCommand >-> send)
)
handleReply :: (Monad m, MonadIO m) => TQueue Command -> Producer NSQ.Command m ()
handleReply queue = forever $ do
cmd <- liftIO $ atomically $ readTQueue queue
yield cmd
handleNSQ :: (Monad m, MonadIO m) => NSQConnection -> Producer BS.ByteString m () -> Consumer BS.ByteString m () -> TQueue Message -> m ()
handleNSQ sc recv send topicQueue = do
runEffect $ (nsqParserErrorLogging (logName sc) recv) >-> (command (logName sc) topicQueue) >-> showCommand >-> send
return ()
establishNSQ :: (Monad m, MonadIO m) => NSQConnection -> Producer BS.ByteString m () -> Consumer BS.ByteString m () -> m ()
establishNSQ sc recv send = do
runEffect $ (initialHandshake $ identConf sc) >-> showCommand >-> send
runEffect $ (nsqParserErrorLogging (logName sc) recv) >-> identReply
return ()
where
initialHandshake im = do
yield $ NSQ.Protocol
yield $ NSQ.Identify im
return ()
identReply = do
identR <- await
liftIO $ debugM (logName sc) ("IDENT: " ++ show identR)
return ()
nsqParserErrorLogging :: MonadIO m => LogName -> Producer BS.ByteString m () -> Producer NSQ.Message m ()
nsqParserErrorLogging l producer = do
(result, rest) <- lift $ runStateT (PA.parse NSQ.message) producer
case result of
Nothing -> liftIO $ errorM l "Pipe is exhausted for nsq parser\n"
Just y -> do
case y of
Right x -> (liftIO $ debugM l ("msg: " ++ show x)) >> yield x
Left x -> liftIO $ errorM l (show x)
nsqParserErrorLogging l rest
showCommand :: Monad m => Pipe NSQ.Command BS.ByteString m ()
showCommand = PP.map NSQ.encode
log :: MonadIO m => String -> LogName -> Pipe BS.ByteString BS.ByteString m r
log w l = forever $ do
x <- await
liftIO $ debugM l (w ++ ": " ++ show x)
yield x
command :: (Monad m, MonadIO m) => LogName -> TQueue Message -> Pipe NSQ.Message NSQ.Command m ()
command l topicQueue = forever $ do
msg <- await
case msg of
NSQ.OK -> return ()
NSQ.Heartbeat -> yield $ NSQ.NOP
NSQ.CloseWait -> liftIO $ infoM l ("Error: Server closed queue")
NSQ.Error e -> liftIO $ errorM l ("Error: " ++ show e)
NSQ.Message _ _ _ _ -> liftIO $ atomically $ writeTQueue topicQueue msg
NSQ.CatchAllMessage f m -> liftIO $ warningM l ("Error: Frame - " ++ show f ++ " - Msg - " ++ show m)