{-# LANGUAGE OverloadedStrings #-}
{-|
Module      : Network.NSQ.Connection
Description : Protocol/parser layer for the NSQ client library.

This is the low level client connection to the nsqd. It is recommended to use the
higher level library when those come out.
-}
module Network.NSQ.Connection
    ( defaultConfig
    , establish

    ) where

import Control.Monad.Reader
import Control.Monad.State.Strict
import Data.List
import Data.Maybe
import Prelude hiding (log)
import Network.HostName

import qualified Data.Text as T
import qualified Data.ByteString as BS

import qualified Pipes.Network.TCP as PNT
import qualified Pipes.Attoparsec as PA
import qualified Pipes.Prelude as PP
import Pipes

import Control.Applicative
import Control.Concurrent.Async
import Control.Concurrent.STM

import System.Log.Logger (debugM, errorM, warningM, infoM)

import Network.NSQ.Types (NSQConnection(..), Message, Command, server, port, logName, LogName, identConf)
import qualified Network.NSQ.Types as NSQ
import qualified Network.NSQ.Identify as NSQ
import qualified Network.NSQ.Parser as NSQ


-- TODO: standardize on some sort of logger hierchary (nsq server/topic?)
-- NSQ.[subsystem].[topic].[connection] - message
-- NSQ.[subsystem].[custom] ....
--
-- | Attempt to come up with an intelligent default 'NSQConnection' default
-- by discovering your client's hostname and reusing it for the client id
-- for the 'IdentifyMetadata'
defaultConfig :: String -> IO NSQConnection
defaultConfig serverHost = do
    localHost <- T.pack <$> getHostName
    let localClientId = T.takeWhile (/= '.') localHost
    let localIdent = NSQ.defaultIdentify localClientId localHost

    return $ NSQConnection serverHost 4150 "NSQ.Connection." localIdent

-- | Establish a session with the specified nsqd using the provided
-- 'TQueue' so that the actual data processing can be done in a decoupled
-- manner from the hsnsq stack.
--
-- This supports connecting to a specific nsqd, it is however recommended
-- in the future when the feature comes out to user a higher layer that
-- handles the load balancing between multiple nsqd.
establish :: NSQConnection -> TQueue Message -> TQueue Command -> IO ()
establish conn topicQueue reply = PNT.withSocketsDo $
    -- Establish stocket
    PNT.connect (server conn) (show $ port conn) (\(sock, _) -> do

        -- TODO: maybe consider PNT.fromSocketN so that we can adjust fetch size if needed downstream
        let send = (log "send" $ logName conn) >-> PNT.toSocket sock
        let recv = PNT.fromSocket sock 8192 >-> (log "recv" $ logName conn)

        -- Establish NSQ first then go into normal handle mode
        establishNSQ conn recv send

        race_
            (handleNSQ conn recv send topicQueue)
            (runEffect $ handleReply reply >-> showCommand >-> send) -- Handles user replies
    )

-- | Pump a 'Command' into the network from the 'TQueue'
handleReply :: (Monad m, MonadIO m) => TQueue Command -> Producer NSQ.Command m ()
handleReply queue = forever $ do
    cmd <- liftIO $ atomically $ readTQueue queue
    yield cmd

-- | Handles the parsing, error reporting, and logging of the NSQ
-- connection, then eventually pumping the 'Message' into the 'TQueue'
-- for the consumer to process.
handleNSQ :: (Monad m, MonadIO m) => NSQConnection -> Producer BS.ByteString m () -> Consumer BS.ByteString m () -> TQueue Message -> m ()
handleNSQ sc recv send topicQueue = do
    runEffect $ (nsqParserErrorLogging (logName sc) recv) >-> (command (logName sc) topicQueue) >-> showCommand >-> send
    return ()

-- | Establish the connection to the nsqd and run the initial handshake
-- upto completion then hand it off to the 'handleNSQ' for handling the
-- regular protocol/message.
establishNSQ :: (Monad m, MonadIO m) => NSQConnection -> Producer BS.ByteString m () -> Consumer BS.ByteString m () -> m ()
establishNSQ sc recv send = do
    -- Initial handshake to kick off the handshake
    runEffect $ (initialHandshake $ identConf sc) >-> showCommand >-> send

    -- Rest of the handshake process (parsing and dealing with identification)
    runEffect $ (nsqParserErrorLogging (logName sc) recv) >-> identReply

    return ()

    where
        -- Initial Handshake
        initialHandshake im = do
            yield $ NSQ.Protocol
            yield $ NSQ.Identify im
            return ()

        -- Process the ident reply
        identReply = do
            identR <- await

            -- TODO: do stuff with it
            liftIO $ debugM (logName sc) ("IDENT: " ++ show identR)

            return ()

-- | Parses incoming nsq messages and emits any errors to a log and keep going
nsqParserErrorLogging :: MonadIO m => LogName -> Producer BS.ByteString m () -> Producer NSQ.Message m ()
nsqParserErrorLogging l producer = do
    (result, rest) <- lift $ runStateT (PA.parse NSQ.message) producer

    case result of
        Nothing -> liftIO $ errorM l "Pipe is exhausted for nsq parser\n"
        Just y  -> do
            case y of
                Right x -> (liftIO $ debugM l ("msg: " ++ show x)) >> yield x
                Left x  -> liftIO $ errorM l (show x)
            nsqParserErrorLogging l rest

-- | Format outbound NSQ Commands
showCommand :: Monad m => Pipe NSQ.Command BS.ByteString m ()
showCommand = PP.map NSQ.encode

-- | Log anything that passes through this stream to a logfile
log :: MonadIO m => String -> LogName -> Pipe BS.ByteString BS.ByteString m r
log w l = forever $ do
    x <- await
    liftIO $ debugM l (w ++ ": " ++ show x) -- TODO: need a better way to log raw protocol messages
    yield x

-- | Generic processor for processing various messages automatically such
-- as 'Heartbeat' and just passing upstream (to 'TQueue') the actual
-- 'Message' that needs processing.
command :: (Monad m, MonadIO m) => LogName -> TQueue Message -> Pipe NSQ.Message NSQ.Command m ()
command l topicQueue = forever $ do
    msg <- await

    case msg of
        -- TODO: currently no-op
        NSQ.OK                  -> return ()
        NSQ.Heartbeat           -> yield $ NSQ.NOP
        -- TODO: Implement a way to close our connection gracefully
        NSQ.CloseWait           -> liftIO $ infoM l ("Error: Server closed queue")
        -- TODO: should pass it onto the client or have a callback
        NSQ.Error e             -> liftIO $ errorM l ("Error: " ++ show e)
        NSQ.Message _ _ _ _     -> liftIO $ atomically $ writeTQueue topicQueue msg
        -- TODO: should pass it onto the client or have a callback
        NSQ.CatchAllMessage f m -> liftIO $ warningM l ("Error: Frame - " ++ show f ++ " - Msg - " ++ show m)