-- This Source Code Form is subject to the terms of the Mozilla Public -- License, v. 2.0. If a copy of the MPL was not distributed with this -- file, You can obtain one at http://mozilla.org/MPL/2.0/. {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TupleSections #-} module Database.Redis.IO.Connection ( Connection , settings , resolve , connect , close , request , sync , send , receive ) where import Control.Applicative import Control.Exception import Control.Monad import Data.ByteString (ByteString) import Data.ByteString.Lazy (toChunks) import Data.Foldable hiding (concatMap) import Data.IORef import Data.Maybe (isJust) import Data.Redis import Data.Sequence (Seq, (|>)) import Data.Word import Database.Redis.IO.Settings import Database.Redis.IO.Types import Database.Redis.IO.Timeouts (TimeoutManager, withTimeout) import Network import Network.Socket hiding (connect, close, send, recv) import Network.Socket.ByteString (recv, sendMany) import Pipes import Pipes.Attoparsec import Pipes.Parse import System.Logger hiding (Settings, settings, close) import System.Timeout import qualified Data.ByteString as B import qualified Data.Sequence as Seq import qualified Network.Socket as S data Connection = Connection { settings :: !Settings , logger :: !Logger , timeouts :: !TimeoutManager , sock :: !Socket , producer :: IORef (Producer ByteString IO ()) , buffer :: IORef (Seq (Resp, IORef Resp)) } instance Show Connection where show c = "Connection" ++ show (sock c) resolve :: String -> Word16 -> IO AddrInfo resolve host port = head <$> getAddrInfo (Just hints) (Just host) (Just (show port)) where hints = defaultHints { addrFlags = [AI_ADDRCONFIG], addrSocketType = Stream } connect :: Settings -> Logger -> TimeoutManager -> AddrInfo -> IO Connection connect t g m a = bracketOnError mkSock S.close $ \s -> do ok <- timeout (ms (sConnectTimeout t) * 1000) (S.connect s (addrAddress a)) unless (isJust ok) $ throwIO ConnectTimeout Connection t g m s <$> newIORef (fromSock s) <*> newIORef Seq.empty where mkSock = socket (addrFamily a) (addrSocketType a) (addrProtocol a) fromSock :: Socket -> Producer ByteString IO () fromSock s = do x <- lift $ recv s 4096 when (B.null x) $ lift $ throwIO ConnectionClosed yield x fromSock s close :: Connection -> IO () close = S.close . sock request :: Resp -> IORef Resp -> Connection -> IO () request x y c = modifyIORef' (buffer c) (|> (x, y)) sync :: Connection -> IO () sync c = do a <- readIORef (buffer c) unless (Seq.null a) $ do writeIORef (buffer c) Seq.empty case sSendRecvTimeout (settings c) of 0 -> go a t -> withTimeout (timeouts c) t abort (go a) where go a = do send c (toList $ fmap fst a) prod <- readIORef (producer c) foldlM fetchResult prod (fmap snd a) >>= writeIORef (producer c) abort = do err (logger c) $ "connection.timeout" .= show c close c throwIO $ Timeout (show c) fetchResult :: Producer ByteString IO () -> IORef Resp -> IO (Producer ByteString IO ()) fetchResult p r = do (p', x) <- receiveWith p writeIORef r x return p' send :: Connection -> [Resp] -> IO () send c = sendMany (sock c) . concatMap (toChunks . encode) receive :: Connection -> IO Resp receive c = do prod <- readIORef (producer c) (p, x) <- receiveWith prod writeIORef (producer c) p return x receiveWith :: Producer ByteString IO () -> IO (Producer ByteString IO (), Resp) receiveWith p = do (x, p') <- runStateT (parse resp) p case x of Nothing -> throwIO ConnectionClosed Just (Left e) -> throwIO $ InternalError (peMessage e) Just (Right y) -> (p',) <$> errorCheck y errorCheck :: Resp -> IO Resp errorCheck (Err e) = throwIO $ RedisError e errorCheck r = return r