{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Database.Redis.ProtocolPipelining (
Connection,
connect, enableTLS, beginReceiving, disconnect, request, send, recv, flush, fromCtx
) where
import Prelude
import Control.Monad
import qualified Scanner
import qualified Data.ByteString as S
import Data.IORef
import qualified Network.Socket as NS
import qualified Network.TLS as TLS
import System.IO.Unsafe
import Database.Redis.Protocol
import qualified Database.Redis.ConnectionContext as CC
data Connection = Conn
{ Connection -> ConnectionContext
connCtx :: CC.ConnectionContext
, Connection -> IORef [Reply]
connReplies :: IORef [Reply]
, Connection -> IORef [Reply]
connPending :: IORef [Reply]
, Connection -> IORef Int
connPendingCnt :: IORef Int
}
fromCtx :: CC.ConnectionContext -> IO Connection
fromCtx :: ConnectionContext -> IO Connection
fromCtx ConnectionContext
ctx = ConnectionContext
-> IORef [Reply] -> IORef [Reply] -> IORef Int -> Connection
Conn ConnectionContext
ctx (IORef [Reply] -> IORef [Reply] -> IORef Int -> Connection)
-> IO (IORef [Reply])
-> IO (IORef [Reply] -> IORef Int -> Connection)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef [] IO (IORef [Reply] -> IORef Int -> Connection)
-> IO (IORef [Reply]) -> IO (IORef Int -> Connection)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef [] IO (IORef Int -> Connection) -> IO (IORef Int) -> IO Connection
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
connect :: NS.HostName -> CC.PortID -> Maybe Int -> IO Connection
connect :: HostName -> PortID -> Maybe Int -> IO Connection
connect HostName
hostName PortID
portId Maybe Int
timeoutOpt = do
ConnectionContext
connCtx <- HostName -> PortID -> Maybe Int -> IO ConnectionContext
CC.connect HostName
hostName PortID
portId Maybe Int
timeoutOpt
IORef [Reply]
connReplies <- [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef []
IORef [Reply]
connPending <- [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef []
IORef Int
connPendingCnt <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Conn :: ConnectionContext
-> IORef [Reply] -> IORef [Reply] -> IORef Int -> Connection
Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
..}
enableTLS :: TLS.ClientParams -> Connection -> IO Connection
enableTLS :: ClientParams -> Connection -> IO Connection
enableTLS ClientParams
tlsParams conn :: Connection
conn@Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} = do
ConnectionContext
newCtx <- ClientParams -> ConnectionContext -> IO ConnectionContext
CC.enableTLS ClientParams
tlsParams ConnectionContext
connCtx
Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn{connCtx :: ConnectionContext
connCtx = ConnectionContext
newCtx}
beginReceiving :: Connection -> IO ()
beginReceiving :: Connection -> IO ()
beginReceiving Connection
conn = do
[Reply]
rs <- Connection -> IO [Reply]
connGetReplies Connection
conn
IORef [Reply] -> [Reply] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (Connection -> IORef [Reply]
connReplies Connection
conn) [Reply]
rs
IORef [Reply] -> [Reply] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (Connection -> IORef [Reply]
connPending Connection
conn) [Reply]
rs
disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} = ConnectionContext -> IO ()
CC.disconnect ConnectionContext
connCtx
send :: Connection -> S.ByteString -> IO ()
send :: Connection -> ByteString -> IO ()
send Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} ByteString
s = do
ConnectionContext -> ByteString -> IO ()
CC.send ConnectionContext
connCtx ByteString
s
Int
n <- IORef Int -> (Int -> (Int, Int)) -> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
connPendingCnt ((Int -> (Int, Int)) -> IO Int) -> (Int -> (Int, Int)) -> IO Int
forall a b. (a -> b) -> a -> b
$ \Int
n -> let n' :: Int
n' = Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1 in (Int
n', Int
n')
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
1000) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Reply
r:[Reply]
_ <- IORef [Reply] -> IO [Reply]
forall a. IORef a -> IO a
readIORef IORef [Reply]
connPending
Reply
r Reply -> IO () -> IO ()
`seq` () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
recv :: Connection -> IO Reply
recv :: Connection -> IO Reply
recv Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} = do
(Reply
r:[Reply]
rs) <- IORef [Reply] -> IO [Reply]
forall a. IORef a -> IO a
readIORef IORef [Reply]
connReplies
IORef [Reply] -> [Reply] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [Reply]
connReplies [Reply]
rs
Reply -> IO Reply
forall (m :: * -> *) a. Monad m => a -> m a
return Reply
r
flush :: Connection -> IO ()
flush :: Connection -> IO ()
flush Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} = ConnectionContext -> IO ()
CC.flush ConnectionContext
connCtx
request :: Connection -> S.ByteString -> IO Reply
request :: Connection -> ByteString -> IO Reply
request Connection
conn ByteString
req = Connection -> ByteString -> IO ()
send Connection
conn ByteString
req IO () -> IO Reply -> IO Reply
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Connection -> IO Reply
recv Connection
conn
connGetReplies :: Connection -> IO [Reply]
connGetReplies :: Connection -> IO [Reply]
connGetReplies conn :: Connection
conn@Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} = ByteString -> Reply -> IO [Reply]
go ByteString
S.empty (ByteString -> Reply
SingleLine ByteString
"previous of first")
where
go :: ByteString -> Reply -> IO [Reply]
go ByteString
rest Reply
previous = do
~(Reply
r, ByteString
rest') <- IO (Reply, ByteString) -> IO (Reply, ByteString)
forall a. IO a -> IO a
unsafeInterleaveIO (IO (Reply, ByteString) -> IO (Reply, ByteString))
-> IO (Reply, ByteString) -> IO (Reply, ByteString)
forall a b. (a -> b) -> a -> b
$ do
Reply
previous Reply -> IO () -> IO ()
`seq` () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Result Reply
scanResult <- IO ByteString -> Scanner Reply -> ByteString -> IO (Result Reply)
forall (m :: * -> *) a.
Monad m =>
m ByteString -> Scanner a -> ByteString -> m (Result a)
Scanner.scanWith IO ByteString
readMore Scanner Reply
reply ByteString
rest
case Result Reply
scanResult of
Scanner.Fail{} -> IO (Reply, ByteString)
forall a. IO a
CC.errConnClosed
Scanner.More{} -> HostName -> IO (Reply, ByteString)
forall a. HasCallStack => HostName -> a
error HostName
"Hedis: parseWith returned Partial"
Scanner.Done ByteString
rest' Reply
r -> do
IORef [Reply] -> ([Reply] -> ([Reply], ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [Reply]
connPending (([Reply] -> ([Reply], ())) -> IO ())
-> ([Reply] -> ([Reply], ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Reply
_:[Reply]
rs) -> ([Reply]
rs, ())
IORef Int -> (Int -> (Int, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
connPendingCnt ((Int -> (Int, ())) -> IO ()) -> (Int -> (Int, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 (Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1), ())
(Reply, ByteString) -> IO (Reply, ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Reply
r, ByteString
rest')
[Reply]
rs <- IO [Reply] -> IO [Reply]
forall a. IO a -> IO a
unsafeInterleaveIO (ByteString -> Reply -> IO [Reply]
go ByteString
rest' Reply
r)
[Reply] -> IO [Reply]
forall (m :: * -> *) a. Monad m => a -> m a
return (Reply
rReply -> [Reply] -> [Reply]
forall a. a -> [a] -> [a]
:[Reply]
rs)
readMore :: IO ByteString
readMore = IO ByteString -> IO ByteString
forall a. IO a -> IO a
CC.ioErrorToConnLost (IO ByteString -> IO ByteString) -> IO ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ do
Connection -> IO ()
flush Connection
conn
ConnectionContext -> IO ByteString
CC.recv ConnectionContext
connCtx