{-# LANGUAGE TemplateHaskell,OverloadedStrings,RecordWildCards,GeneralizedNewtypeDeriving,PatternGuards,DeriveDataTypeable, ScopedTypeVariables #-}

module Network.Nats (
    -- * How to use this module
    -- |
    -- @
    --
    -- {-\# LANGUAGE OverloadedStrings \#-}
    --
    -- import qualified Data.ByteString.Lazy as BL
    --
    -- nats <- 'connect' \"nats:\/\/user:password\@localhost:4222\"
    --
    -- sid \<- 'subscribe' nats \"news\" Nothing $ \\_ _ msg _ -\> putStrLn $ show msg
    --
    -- 'publish' nats \"news\" \"I got news for you\"
    --
    -- 'unsubscribe' nats sid
    -- 
    -- 'subscribe' nats \"gift\" Nothing $ \\_ _ msg mreply -> do
    --     putStrLn $ show msg
    --     case mreply of
    --        Nothing -> return ()
    --        Just reply -> 'publish' nats reply \"I've got a gift for you.\"
    --  
    -- reply <- 'request' nats \"gift\" \"Do you have anything for me?\"
    --
    -- putStrLn $ show reply
    -- @
    --
    -- The 'connect' call connects to the NATS server and creates a receiver thread. The
    -- callbacks are run synchronously on this thread when a server messages comes.
    -- Client commands are generally acknowledged by the server with an +OK message,
    -- the library waits for acknowledgment only for the 'subscribe' command. The NATS
    -- server usually closes the connection when there is an error.
    
    -- * Comparison to API in other languages
    -- |Compared to API in other languages, the Haskell binding does
    -- not implement timeouts and automatic unsubscribing, the 'request' call is implemented
    -- as a synchronous call. 
    --
    -- The timeouts can be easily implemented using 'System.Timeout' module, automatic unsubscribing
    -- can be done in the callback function.
    
    -- * Error behaviour
    -- |The 'connect' function tries to connect to the NATS server. In case of failure it immediately fails.
    -- If there is an error during operations, the NATS module tries to reconnect to the server.
    -- When there are more servers, the client immediately tries to connect to the next server. If
    -- that fails, it waits 1s before trying the next server in the NatsSettings list.
    --
    -- During the reconnection, the calls 'subscribe' and 'request' will block. The calls
    -- 'publish' and 'unsubscribe' silently fail (unsubscribe is handled locally, NATS is a messaging
    -- system without guarantees, 'publish' is not guaranteed to succeed anyway).
    -- After reconnecting to the server, the module automatically resubscribes to previously subscribed channels.
    --
    -- If there is a network failure, the nats commands 'subscribe' and 'request' 
    -- may fail on an IOexception or NatsException. The 'subscribe'
    -- command is synchronous, it waits until the server responds with +OK. The commands 'publish'
    -- and 'unsubscribe' are asynchronous, no confirmation from server is required and they 
    -- should not raise an exception.
    --
    Nats
    , NatsSID
    , connect
    , connectSettings
    , NatsHost(..)
    , NatsSettings(..)
    , defaultSettings
    -- * Exceptions
    , NatsException
    -- * Access
    , MsgCallback
    , subscribe
    , unsubscribe
    , publish
    , request
    , requestMany
    -- * Termination
    , disconnect
) where


import System.IO
import Control.Concurrent.MVar
import Control.Concurrent
import qualified Network.Socket as S
import Network.Socket (SocketOption(KeepAlive, NoDelay), setSocketOption, getAddrInfo, SockAddr(..))
import Control.Monad (forever, replicateM)
import Data.Dequeue as D
import Control.Applicative ((<$>))
import Data.Typeable
import qualified Data.Foldable as FOLD
import Control.Exception (bracket, bracketOnError, throwIO, catch, IOException, AsyncException, Exception, SomeException,
                          catches, Handler(..))
import System.Random (randomRIO)
import Data.IORef
import System.Timeout
import Control.Concurrent.Async (concurrently)

import qualified Data.Map.Strict as Map
import qualified Data.ByteString.Lazy.Char8 as BL
import qualified Data.ByteString.Char8 as BS
import Data.Char (toLower, isUpper)
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8)

import qualified Data.Aeson as AE
import Data.Aeson.TH (deriveJSON, defaultOptions, fieldLabelModifier)

import qualified Network.URI as URI

-- | How often should we ping the server
pingInterval :: Int
pingInterval = 3000000

-- | Timeout interval for connect operations
timeoutInterval :: Int
timeoutInterval = 1000000

-- | NATS communication error
data NatsException = NatsException String
    deriving (Show, Typeable)
instance Exception NatsException

data NatsConnectionOptions = NatsConnectionOptions {
        natsConnUser :: String
        , natsConnPass :: String
        , natsConnVerbose :: Bool
        , natsConnPedantic :: Bool
        , natsConnSslRequired :: Bool
    } deriving (Show)

defaultConnectionOptions :: NatsConnectionOptions
defaultConnectionOptions = NatsConnectionOptions{natsConnUser="nats",natsConnPass="nats", natsConnVerbose=True,
                                                natsConnPedantic=True, natsConnSslRequired=False}
    
$(deriveJSON defaultOptions{fieldLabelModifier =(
            let insertUnderscore acc chr
                    | isUpper chr = chr : '_' : acc
                    | otherwise   = chr : acc
            in 
                map toLower . drop 1 . reverse . foldl insertUnderscore [] . drop 8
        )} ''NatsConnectionOptions)
    
-- | Server information sent usually upon opening a connection to NATS server
data NatsServerInfo = NatsServerInfo {
--     natsSvrServerId :: T.Text
--     , natsSvrVersion :: T.Text
--     , natsSvrMaxPayload :: Int
      natsSvrAuthRequired :: Bool
    } deriving (Show)
    
$(deriveJSON defaultOptions{fieldLabelModifier =(
            let insertUnderscore acc chr
                    | isUpper chr = chr : '_' : acc
                    | otherwise   = chr : acc
            in 
                map toLower . drop 1 . reverse . foldl insertUnderscore [] . drop 7
        )} ''NatsServerInfo)


newtype NatsSID = NatsSID Int deriving (Num, Ord, Eq)
instance Show NatsSID where
    show (NatsSID num) = show num
instance Read NatsSID where
    readsPrec x1 x2 = map (\(a,rest) -> (NatsSID a, rest)) $ readsPrec x1 x2


type MsgCallback = NatsSID -- ^ SID of subscription
        -> String -- ^ Subject
        -> BL.ByteString -- ^ Message
        -> Maybe String -- ^ Reply subject
        -> IO ()
    
data NatsSubscription = NatsSubscription {
        subSubject :: Subject
      , subQueue :: Maybe Subject
      , subCallback :: MsgCallback
      , subSid :: NatsSID
    }
    
type FifoQueue = D.BankersDequeue (Maybe T.Text -> IO ())
    
-- | Control structure representing a connection to NATS server
data Nats = Nats {
          natsSettings :: NatsSettings
        , natsRuntime :: MVar (Handle, -- Network socket
                               FifoQueue, -- FIFO of sent commands waiting for ack
                               Bool, -- False if we are disconnected
                               MVar () -- Empty mvar that gets full in the moment we connect
                               )
        , natsThreadId :: MVar ThreadId
        , natsNextSid :: IORef NatsSID
        , natsSubMap :: IORef (Map.Map NatsSID NatsSubscription)
    }

-- | Host settings; may have different username/password for each host
data NatsHost = NatsHost {
        natsHHost :: String
      , natsHPort :: Int
      , natsHUser :: String      -- ^ Username for authentication
      , natsHPass :: String  -- ^ Password for authentication
    }
    
-- | Advanced settings for connecting to NATS server
data NatsSettings = NatsSettings {
        natsHosts :: [NatsHost]
      , natsOnConnect :: Nats -> (String, Int) -> IO ()  
        -- ^ Called when a client has successfully connected. This callback is called synchronously
        --   before the processing of incoming messages begins.
      , natsOnDisconnect :: Nats -> String -> IO () 
        -- ^ Called when a client is disconnected.
    }

defaultSettings :: NatsSettings
defaultSettings = (NatsSettings {
        natsHosts = [(NatsHost "localhost" 4222 "nats" "nats")]
      , natsOnConnect = \_ _ -> (return ())
      , natsOnDisconnect = \_ _ -> (return ())
    })
    
-- | Message received by the client from the server
data NatsSvrMessage =
    NatsSvrMsg { msgSubject::String, msgSid::NatsSID, msgText::BS.ByteString, msgReply::Maybe String}
    | NatsSvrOK
    | NatsSvrError T.Text
    | NatsSvrPing
    | NatsSvrPong
    | NatsSvrInfo NatsServerInfo
    deriving (Show)    
    
newtype Subject = Subject String deriving (Show)

subjectToStr :: Subject -> String
subjectToStr (Subject str) = str

makeSubject :: String -> Subject
makeSubject ""        = error "Empty subject"
makeSubject str
    | any (<=' ') str = error $ "Subject contains incorrect characters: " ++ str
    | otherwise       = Subject str
    
-- | Message sent from the client to server
data NatsClntMessage =
    NatsClntPing
    | NatsClntPong
    | NatsClntSubscribe Subject NatsSID (Maybe Subject)
    | NatsClntUnsubscribe NatsSID
    | NatsClntPublish Subject (Maybe Subject) BL.ByteString
    | NatsClntConnect NatsConnectionOptions
    
-- | Encode NATS client message
makeClntMsg :: NatsClntMessage -> BL.ByteString
makeClntMsg = BL.fromChunks . _makeClntMsg
    where
        _makeClntMsg :: NatsClntMessage -> [BS.ByteString]
        _makeClntMsg NatsClntPing = ["PING"]
        _makeClntMsg NatsClntPong = ["PONG"]
        _makeClntMsg (NatsClntSubscribe subject sid (Just queue)) = [BS.pack $ "SUB " ++ (subjectToStr subject) ++ " " ++ (subjectToStr queue) ++ " " ++ (show sid)]
        _makeClntMsg (NatsClntSubscribe subject sid Nothing) = [BS.pack $ "SUB " ++ (subjectToStr subject) ++ " " ++ (show sid)]
        _makeClntMsg (NatsClntUnsubscribe sid) = [ BS.pack $ "UNSUB " ++ (show sid) ]
        _makeClntMsg (NatsClntPublish subj Nothing msg) = 
            (BS.pack $ "PUB " ++ (subjectToStr subj) ++ " " ++ (show $ BL.length msg) ++ "\r\n") : BL.toChunks msg
        _makeClntMsg (NatsClntPublish subj (Just reply) msg) = 
            (BS.pack $ "PUB " ++ (subjectToStr subj) ++ " " ++ (subjectToStr reply) ++ " " ++ (show $ BL.length msg) ++ "\r\n") : BL.toChunks msg
        _makeClntMsg (NatsClntConnect info) = "CONNECT " : (BL.toChunks $ AE.encode info)
    
-- | Decode NATS server message; result is message + payload (payload is 'undefined' in NatsSvrMsg)
decodeMessage :: BS.ByteString -> Maybe (NatsSvrMessage, Maybe Int)
decodeMessage line = decodeMessage_ mid (BS.drop 1 mrest)
    where 
        (mid, mrest) = BS.span (\x -> x/=' ' && x/='\r') line
        decodeMessage_ :: BS.ByteString -> BS.ByteString -> Maybe (NatsSvrMessage, Maybe Int)
        decodeMessage_ "PING" _ = Just (NatsSvrPing, Nothing)
        decodeMessage_ "PONG" _ = Just (NatsSvrPong, Nothing)
        decodeMessage_ "+OK" _ = Just (NatsSvrOK, Nothing)
        decodeMessage_ "-ERR" msg = Just (NatsSvrError (decodeUtf8 msg), Nothing)
        decodeMessage_ "INFO" msg = do
            info <- AE.decode $ BL.fromChunks [msg]
            return $ (NatsSvrInfo info, Nothing)
        decodeMessage_ "MSG" msg = do
            case (map BS.unpack (BS.words msg)) of
                 [subj, sid, len] -> return (NatsSvrMsg subj (read sid) undefined Nothing, Just $ read len)
                 [subj, sid, reply, len] -> return (NatsSvrMsg subj (read sid) undefined (Just $ reply), Just $ read len)
                 _ -> fail ""
        decodeMessage_ _ _ = Nothing

    
-- | Returns next sid and updates MVar
newNatsSid :: Nats -> IO NatsSID
newNatsSid nats = atomicModifyIORef' (natsNextSid nats) $ \sid -> (sid + 1, sid)

-- | Generates a new INBOX name for request/response communication
newInbox :: IO String
newInbox = do
    rnd <- replicateM 13 (randomRIO ('a', 'z'))
    return $ "_INBOX." ++ rnd
    
-- | Create a TCP connection to the server
connectToServer :: String -> Int -> IO Handle
connectToServer hostname port = do
    addrinfos <- getAddrInfo Nothing (Just hostname) Nothing
    let serveraddr = (head addrinfos)
    -- Create a socket
    bracketOnError
        (S.socket (S.addrFamily serveraddr) S.Stream S.defaultProtocol)
        (S.sClose)
        (\sock -> do
            
            setSocketOption sock KeepAlive 1
            setSocketOption sock NoDelay 1
            let connaddr = case (S.addrAddress serveraddr) of
                    SockAddrInet _ haddr -> SockAddrInet (fromInteger $ toInteger port) haddr
                    SockAddrInet6 _ finfo haddr scopeid -> SockAddrInet6 (fromInteger $ toInteger port) finfo haddr scopeid
                    other -> other
            S.connect sock connaddr
            h <- S.socketToHandle sock ReadWriteMode
            hSetBuffering h NoBuffering
            return h
        )
        
        
ensureConnection :: Nats 
    -> Bool -- ^ If true, wait for the connection to become available
    -> ((Handle, FifoQueue) -> IO FifoQueue) -- ^ Action to do when the connection is available
    -> IO ()
-- Block if we are disconnected
ensureConnection nats True f = do
    bracketOnError
        (takeMVar $ natsRuntime nats)
        (putMVar $ natsRuntime nats)
        (\r@(handle, _, x1, x2) -> do
            result <- runAction r
            case result of 
                 Just nqueue -> putMVar (natsRuntime nats) (handle, nqueue, x1, x2)
                 Nothing -> return ()
        )
    where 
        -- Connected
        runAction (handle, queue, True, _) = do
            nqueue <- f (handle, queue)
            return $ Just nqueue
        -- Disconnected, we will wait
        runAction r@(_, _, False, csig) = do
            putMVar (natsRuntime nats) r
            readMVar csig -- Wait for connection to become available
            ensureConnection nats True f
            return Nothing
-- Do not block if we are disconnected
ensureConnection nats False f = modifyMVarMasked_ (natsRuntime nats) runAction
    where
        -- Connected, try to send
        runAction (handle, queue, True, csig) = do
            nqueue <- f (handle, queue)
            return (handle, nqueue, True, csig)
        -- Disconnected, ignore
        runAction (handle, queue, False, csig) =
            return (handle, queue, False, csig)
        
-- | Send a message and register callback if possible
sendMessage :: Nats -> Bool -> NatsClntMessage -> Maybe (Maybe T.Text -> IO ()) -> IO ()
sendMessage nats blockIfDisconnected msg mcb
    | Just cb <- mcb, supportsCallback msg = 
        ensureConnection nats blockIfDisconnected $ \(handle, queue) -> do
            _sendMessage handle msg
            return $ D.pushBack queue cb -- Append callback on the callback queue
    | supportsCallback msg = sendMessage nats blockIfDisconnected msg (Just $ \_ -> return ())
    | Just _ <- mcb, not (supportsCallback msg) = error "Callback not supported"
    | True = ensureConnection nats blockIfDisconnected $ \(handle, queue) -> do
        _sendMessage handle msg
        return queue
    where
        supportsCallback (NatsClntConnect {}) = True
        supportsCallback (NatsClntPublish {}) = True
        supportsCallback (NatsClntSubscribe {}) = True
        supportsCallback (NatsClntUnsubscribe {}) = True
        supportsCallback _ = False

-- Throw exception if an action does not end in specified time
timeoutThrow :: Int -> IO a -> IO a
timeoutThrow t f = do
    res <- timeout t f
    case res of
         Just x -> return x
         Nothing -> throwIO $ NatsException "Reached timeout"
        
_sendMessage :: Handle -> NatsClntMessage -> IO ()
_sendMessage handle cmsg = timeoutThrow timeoutInterval $ do
    let msg = makeClntMsg cmsg
    case () of
       _| BL.length msg < 1024 ->
                BS.hPut handle $ BS.concat $ (BL.toChunks msg) ++ ["\r\n"]
        | True -> do
                BL.hPut handle msg
                BS.hPut handle "\r\n"

-- | Do the authentication handshake if necessary
authenticate :: Handle -> String -> String -> IO ()
authenticate handle user password = do
    info <- BS.hGetLine handle
    case (decodeMessage info) of
        Just (NatsSvrInfo (NatsServerInfo {natsSvrAuthRequired=True}), Nothing) -> do
            let coptions = defaultConnectionOptions{natsConnUser=user, natsConnPass=password}
            BL.hPut handle $ makeClntMsg (NatsClntConnect coptions)
            BS.hPut handle "\r\n"
            response <- BS.hGetLine handle
            case (decodeMessage response) of
                 Just (NatsSvrOK, Nothing) -> return ()
                 Just (NatsSvrError err, Nothing)-> throwIO $ NatsException $ "Authentication error: " ++ (show err)
                 _ -> throwIO $ NatsException $ "Incorrect server response"
        Just (NatsSvrInfo _, Nothing) -> return ()
        _ -> throwIO $ NatsException "Incorrect input from server"
            
-- | Open and authenticate a connection
prepareConnection :: Nats -> NatsHost -> IO ()
prepareConnection nats nhost = timeoutThrow timeoutInterval $
    bracketOnError
        (connectToServer (natsHHost nhost) (natsHPort nhost))
        (hClose)
        (\handle -> do
            authenticate handle (natsHUser nhost) (natsHPass nhost)
            csig <- modifyMVar (natsRuntime nats) $ \(_,_,_, csig) ->
                return $ ((handle, D.empty, True, undefined), csig)
            putMVar csig ()
        )

-- | Main thread that reads events from NATS server and reconnects if necessary
connectionThread :: Nats
                    -> [NatsHost] -- ^ inifinite list of connections to try
                    -> IO ()
connectionThread _ [] = error "Empty list of connections"
connectionThread nats (thisconn:nextconn) = do
    mnewconnlist <- 
        (connectionHandler nats thisconn >> return Nothing) -- connectionHandler never returns...
            `catches` [Handler (\(e :: IOException) -> Just <$> errorHandler e),
                       Handler (\(e :: NatsException) -> Just <$> errorHandler e),
                       Handler (\e -> finalHandler e >> return Nothing)]
    case mnewconnlist of
         Nothing -> return () -- Never happens
         Just newconnlist -> connectionThread nats newconnlist
    where
        finalize :: (Show e) => e -> IO ()
        finalize e = do
            -- Hide existing connection
            (handle, queue, _, _) <- takeMVar (natsRuntime nats)
            finsignal <- newEmptyMVar
            putMVar (natsRuntime nats) (undefined, undefined, False, finsignal)
            -- Close network socket
            hClose handle
            -- Call appropriate callbacks on unfinished calls
            FOLD.mapM_ (\f -> f $ Just (T.pack $ show e)) queue
            -- Call user supplied disconnect
            (natsOnDisconnect $ natsSettings nats) nats (show e)
            
        errorHandler :: (Show e) => e -> IO [NatsHost]
        errorHandler e = do
            finalize e
            tryToConnect nextconn
            where
                tryToConnect connlist@(conn:rest) = do
                    res <- ((prepareConnection nats conn) >> (return $ Just connlist))
                        `catches` [ Handler (\(_ :: IOException) -> return Nothing),
                                    Handler (\(_ :: NatsException) -> return Nothing) ]
                    case res of
                         Just restlist -> return restlist
                         Nothing       -> threadDelay timeoutInterval >> tryToConnect rest
                tryToConnect [] = error "Empty list of connections"
                        
        -- Handler for exiting the thread
        finalHandler :: AsyncException -> IO ()
        finalHandler e = do
            finalize e

pingerThread :: Nats -> IORef (Int, Int) -> IO ()
pingerThread nats pingStatus = forever $ do
    threadDelay pingInterval
    -- todo - kontrola pingu
    ok <- atomicModifyIORef' pingStatus $ \(pings, pongs) -> ((pings+1, pongs), pings - pongs < 2)
    if ok == False
        then throwIO (NatsException "Ping timeouted") 
        else return ()
    sendMessage nats True NatsClntPing Nothing
        
-- | Forever read input from a connection and process it
connectionHandler :: Nats -> NatsHost -> IO ()
connectionHandler nats (NatsHost host port _ _) = do
    (handle, _, _, _) <- readMVar (natsRuntime nats)
    -- Subscribe channels that are supposed to be subscribed
    subscriptions <- readIORef (natsSubMap nats)
    FOLD.forM_ subscriptions $ \(NatsSubscription subject queue _ sid) ->
        sendMessage nats True (NatsClntSubscribe subject sid queue) Nothing
        
    -- Call user function that we are successfully connected
    (natsOnConnect $ natsSettings nats) nats (host, port)
    -- Allocate structures for PING, IORef is probably easiest to manage
    pingStatus <- newIORef (0, 0)
    
    -- Perform the job
    _ <- concurrently
            (pingerThread nats pingStatus)
            (connectionHandler' handle nats pingStatus)
    return ()

connectionHandler' :: Handle -> Nats -> IORef (Int, Int) -> IO ()
connectionHandler' handle nats pingStatus = forever $ do
    line <- BS.hGetLine handle
    case decodeMessage line of
        Just (msg, Nothing) -> 
            handleMessage msg
        Just (msg@(NatsSvrMsg {}), Just paylen) -> do
            payload <- BS.hGet handle (paylen + 2) -- +2 = CRLF
            handleMessage msg{msgText=BS.take paylen payload}
        _ -> 
            putStrLn $ "Incorrect message: " ++ (show line)
            
    where
        -- | Pull callback for OK/ERR status from FIFO queue
        popCb (h, queue, x1, x2) = return ((h, newq, x1, x2), item)
            where
                (item, newq) = D.popFront queue
        handleMessage NatsSvrPing = sendMessage nats True NatsClntPong Nothing
        handleMessage NatsSvrPong = 
            atomicModifyIORef' pingStatus $ 
                \(pings, pongs) -> ((pings, pongs + 1), ())
            
        handleMessage NatsSvrOK = do
            cb <- modifyMVar (natsRuntime nats) $ popCb
            case cb of
                Just f -> f Nothing
                Nothing -> return () -- This should not happen, spurious OK
        handleMessage (NatsSvrError txt) = do
            cb <- modifyMVar (natsRuntime nats) $ popCb
            case cb of
                Just f -> f $ Just txt
                Nothing -> putStrLn $ show txt
        handleMessage (NatsSvrInfo _) = return ()
        handleMessage (NatsSvrMsg {..}) = do
            msubscription <- Map.lookup msgSid <$> readIORef (natsSubMap nats)
            case msubscription of
                Just subscription -> 
                    (subCallback subscription) msgSid msgSubject (BL.fromChunks [msgText]) msgReply
                    `catch`
                        (\(e :: SomeException) -> putStrLn $ (show e))
                -- SID not found in map, force unsubscribe
                Nothing -> sendMessage nats True (NatsClntUnsubscribe msgSid) Nothing 
                        
-- | Connect to a NATS server    
connect :: String -- ^ URI with format: nats:\/\/user:password\@localhost:4222
    -> IO Nats
connect uri = do
    let parsedUri = case (URI.parseURI uri) of 
            Just x -> x
            Nothing -> error ("Error parsing NATS url: " ++ uri)
    if URI.uriScheme parsedUri /= "nats:"
        then error "Incorrect URL scheme"
        else return ()
    
    let (host, port, user, password) = case (URI.uriAuthority parsedUri) of
                Just (URI.URIAuth {..}) -> (uriRegName, 
                                          read $ drop 1 uriPort,
                                          takeWhile (\x -> x /= ':') uriUserInfo,
                                          takeWhile (\x -> x /= '@') $ drop 1 $ dropWhile (\x -> x /= ':') uriUserInfo
                                          )
                Nothing -> error "Missing hostname section"
    connectSettings defaultSettings{
            natsHosts=[(NatsHost host port user password)]
        }

-- | Connect to NATS server using custom settings
connectSettings :: NatsSettings -> IO Nats                
connectSettings settings = do
    csig <- newEmptyMVar
    mruntime <- newMVar (undefined, undefined, False, csig)
    mthreadid <- newEmptyMVar 
    nextsid <- newIORef 1
    submap <- newIORef Map.empty
    let nats = Nats{
              natsSettings=settings
            , natsRuntime=mruntime 
            , natsThreadId=mthreadid 
            , natsNextSid=nextsid 
            , natsSubMap=submap
        }
        hosts = (natsHosts settings)
    
    -- Try to connect to all until one succeeds
    connhost <- tryUntilSuccess hosts $ prepareConnection nats
    threadid <- forkIO $ connectionThread nats (connhost:(cycle hosts))
    putMVar mthreadid threadid
    return nats
    where
        tryUntilSuccess [a] f = f a >> return a
        tryUntilSuccess (a:rest) f = (f a >> return a) `catch` (\(_ :: SomeException) -> tryUntilSuccess rest f)
        tryUntilSuccess [] _ = error "Empty list"
    
-- | Subscribe to a channel, optionally specifying queue group 
subscribe :: Nats 
    -> String -- ^ Subject
    -> (Maybe String) -- ^ Queue
    -> MsgCallback -- ^ Callback
    -> IO NatsSID -- ^ SID of subscription
subscribe nats subject queue cb = 
    let
        ssubject = makeSubject subject
        squeue = makeSubject `fmap` queue
        addToSubTable sid = atomicModifyIORef' (natsSubMap nats) $ \submap ->
                (Map.insert sid (NatsSubscription{subSubject=ssubject, subQueue=squeue, subCallback=cb, subSid=sid}) submap, ()) 
    in do
        mvar <- newEmptyMVar :: IO (MVar (Maybe T.Text))
        sid <- newNatsSid nats
        sendMessage nats True (NatsClntSubscribe ssubject sid squeue) $ Just $ \err -> do
            case err of
                Nothing -> addToSubTable sid
                Just _ -> return ()
            putMVar mvar err

        merr <- takeMVar mvar
        case merr of
            Just err -> throwIO $ NatsException $ T.unpack err
            Nothing -> return $ sid

-- | Unsubscribe from a channel
unsubscribe :: Nats 
    -> NatsSID 
    -> IO ()
unsubscribe nats sid = do
    -- Remove from internal tables
    atomicModifyIORef' (natsSubMap nats) $ \ioref -> (Map.delete sid ioref, ())
    -- Unsubscribe from server, ignore errors
    sendMessage nats False (NatsClntUnsubscribe sid) Nothing
        `catches` [ Handler (\(_ :: IOException) -> return ()),
                    Handler (\(_ :: NatsException) -> return ()) ]
            

-- | Synchronous request/response communication to obtain one message
request :: Nats 
    -> String             -- ^ Subject
    -> BL.ByteString      -- ^ Request
    -> IO BL.ByteString   -- ^ Response
request nats subject body = do
    mvar <- newEmptyMVar :: IO (MVar (Either String BL.ByteString))
    inbox <- newInbox
    bracket
            (subscribe nats inbox Nothing $ \_ _ response _ -> do
                _ <- tryPutMVar mvar (Right response)
                return ()
            ) 
            (unsubscribe nats)
            (\_ -> do
                sendMessage nats True (NatsClntPublish (makeSubject subject) (Just $ makeSubject inbox) body) $ Just $ \merr -> do
                    case merr of
                         Nothing -> return ()
                         Just err -> tryPutMVar mvar (Left $ T.unpack err) >> return ()
                result <- takeMVar mvar
                case result of
                     Left err -> throwIO $ NatsException err
                     Right res -> return $ res
            )

-- | Synchronous request/response for obtaining many messages in certain timespan
requestMany :: Nats
    -> String              -- ^ Subject
    -> BL.ByteString       -- ^ Body
    -> Int                 -- ^ Timeout in microseconds
    -> IO [BL.ByteString]
requestMany nats subject body time = do
    result <- newIORef []
    inbox <- newInbox
    bracket 
        (subscribe nats inbox Nothing $ \_ _ response _ ->
                atomicModifyIORef result $ \old -> (response:old, ())
        )
        (unsubscribe nats)
        (\_ -> do
            publish' nats subject (Just inbox) body
            threadDelay time
        )
    reverse <$> readIORef result
            
-- | Publish a message
publish :: Nats 
    -> String -- ^ Subject
    -> BL.ByteString -- ^ Data
    -> IO ()
publish nats subject body = publish' nats subject Nothing body
    
publish' :: Nats 
    -> String -- ^ Subject
    -> Maybe String
    -> BL.ByteString -- ^ Data
    -> IO ()
publish' nats subject inbox body = do
    -- Ignore errors - messages can get lost
    sendMessage nats False (NatsClntPublish (makeSubject subject) (makeSubject <$> inbox) body) Nothing
        `catches` [ Handler (\(_ :: IOException) -> return ()),
                    Handler (\(_ :: NatsException) -> return ()) ]
    
-- | Disconnect from a NATS server
disconnect :: Nats -> IO ()
disconnect nats = do
    threadid <- readMVar (natsThreadId nats)
    killThread threadid