{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Database.Redis.ProtocolPipelining (
  Connection,
  connect, enableTLS, beginReceiving, disconnect, request, send, recv, flush, fromCtx
) where
import           Prelude
import           Control.Monad
import qualified Scanner
import qualified Data.ByteString as S
import           Data.IORef
import qualified Network.Socket as NS
import qualified Network.TLS as TLS
import           System.IO.Unsafe
import           Database.Redis.Protocol
import qualified Database.Redis.ConnectionContext as CC
data Connection = Conn
  { Connection -> ConnectionContext
connCtx        :: CC.ConnectionContext 
  , Connection -> IORef [Reply]
connReplies    :: IORef [Reply] 
  , Connection -> IORef [Reply]
connPending    :: IORef [Reply]
    
    
  , Connection -> IORef Int
connPendingCnt :: IORef Int
    
    
    
  }
fromCtx :: CC.ConnectionContext -> IO Connection
fromCtx :: ConnectionContext -> IO Connection
fromCtx ConnectionContext
ctx = ConnectionContext
-> IORef [Reply] -> IORef [Reply] -> IORef Int -> Connection
Conn ConnectionContext
ctx (IORef [Reply] -> IORef [Reply] -> IORef Int -> Connection)
-> IO (IORef [Reply])
-> IO (IORef [Reply] -> IORef Int -> Connection)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef [] IO (IORef [Reply] -> IORef Int -> Connection)
-> IO (IORef [Reply]) -> IO (IORef Int -> Connection)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef [] IO (IORef Int -> Connection) -> IO (IORef Int) -> IO Connection
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
connect :: NS.HostName -> CC.PortID -> Maybe Int -> IO Connection
connect :: HostName -> PortID -> Maybe Int -> IO Connection
connect HostName
hostName PortID
portId Maybe Int
timeoutOpt = do
    ConnectionContext
connCtx <- HostName -> PortID -> Maybe Int -> IO ConnectionContext
CC.connect HostName
hostName PortID
portId Maybe Int
timeoutOpt
    IORef [Reply]
connReplies <- [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef []
    IORef [Reply]
connPending <- [Reply] -> IO (IORef [Reply])
forall a. a -> IO (IORef a)
newIORef []
    IORef Int
connPendingCnt <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Conn :: ConnectionContext
-> IORef [Reply] -> IORef [Reply] -> IORef Int -> Connection
Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
..}
enableTLS :: TLS.ClientParams -> Connection -> IO Connection
enableTLS :: ClientParams -> Connection -> IO Connection
enableTLS ClientParams
tlsParams conn :: Connection
conn@Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} = do
    ConnectionContext
newCtx <- ClientParams -> ConnectionContext -> IO ConnectionContext
CC.enableTLS ClientParams
tlsParams ConnectionContext
connCtx
    Connection -> IO Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn{connCtx :: ConnectionContext
connCtx = ConnectionContext
newCtx}
beginReceiving :: Connection -> IO ()
beginReceiving :: Connection -> IO ()
beginReceiving Connection
conn = do
  [Reply]
rs <- Connection -> IO [Reply]
connGetReplies Connection
conn
  IORef [Reply] -> [Reply] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (Connection -> IORef [Reply]
connReplies Connection
conn) [Reply]
rs
  IORef [Reply] -> [Reply] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (Connection -> IORef [Reply]
connPending Connection
conn) [Reply]
rs
disconnect :: Connection -> IO ()
disconnect :: Connection -> IO ()
disconnect Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} = ConnectionContext -> IO ()
CC.disconnect ConnectionContext
connCtx
send :: Connection -> S.ByteString -> IO ()
send :: Connection -> ByteString -> IO ()
send Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} ByteString
s = do
  ConnectionContext -> ByteString -> IO ()
CC.send ConnectionContext
connCtx ByteString
s
  
  Int
n <- IORef Int -> (Int -> (Int, Int)) -> IO Int
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
connPendingCnt ((Int -> (Int, Int)) -> IO Int) -> (Int -> (Int, Int)) -> IO Int
forall a b. (a -> b) -> a -> b
$ \Int
n -> let n' :: Int
n' = Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1 in (Int
n', Int
n')
  
  
  
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
1000) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    
    Reply
r:[Reply]
_ <- IORef [Reply] -> IO [Reply]
forall a. IORef a -> IO a
readIORef IORef [Reply]
connPending
    Reply
r Reply -> IO () -> IO ()
`seq` () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
recv :: Connection -> IO Reply
recv :: Connection -> IO Reply
recv Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} = do
  (Reply
r:[Reply]
rs) <- IORef [Reply] -> IO [Reply]
forall a. IORef a -> IO a
readIORef IORef [Reply]
connReplies
  IORef [Reply] -> [Reply] -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef [Reply]
connReplies [Reply]
rs
  Reply -> IO Reply
forall (m :: * -> *) a. Monad m => a -> m a
return Reply
r
flush :: Connection -> IO ()
flush :: Connection -> IO ()
flush Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} = ConnectionContext -> IO ()
CC.flush ConnectionContext
connCtx
request :: Connection -> S.ByteString -> IO Reply
request :: Connection -> ByteString -> IO Reply
request Connection
conn ByteString
req = Connection -> ByteString -> IO ()
send Connection
conn ByteString
req IO () -> IO Reply -> IO Reply
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Connection -> IO Reply
recv Connection
conn
connGetReplies :: Connection -> IO [Reply]
connGetReplies :: Connection -> IO [Reply]
connGetReplies conn :: Connection
conn@Conn{IORef Int
IORef [Reply]
ConnectionContext
connPendingCnt :: IORef Int
connPending :: IORef [Reply]
connReplies :: IORef [Reply]
connCtx :: ConnectionContext
connPendingCnt :: Connection -> IORef Int
connPending :: Connection -> IORef [Reply]
connReplies :: Connection -> IORef [Reply]
connCtx :: Connection -> ConnectionContext
..} = ByteString -> Reply -> IO [Reply]
go ByteString
S.empty (ByteString -> Reply
SingleLine ByteString
"previous of first")
  where
    go :: ByteString -> Reply -> IO [Reply]
go ByteString
rest Reply
previous = do
      
      ~(Reply
r, ByteString
rest') <- IO (Reply, ByteString) -> IO (Reply, ByteString)
forall a. IO a -> IO a
unsafeInterleaveIO (IO (Reply, ByteString) -> IO (Reply, ByteString))
-> IO (Reply, ByteString) -> IO (Reply, ByteString)
forall a b. (a -> b) -> a -> b
$ do
        
        Reply
previous Reply -> IO () -> IO ()
`seq` () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Result Reply
scanResult <- IO ByteString -> Scanner Reply -> ByteString -> IO (Result Reply)
forall (m :: * -> *) a.
Monad m =>
m ByteString -> Scanner a -> ByteString -> m (Result a)
Scanner.scanWith IO ByteString
readMore Scanner Reply
reply ByteString
rest
        case Result Reply
scanResult of
          Scanner.Fail{}       -> IO (Reply, ByteString)
forall a. IO a
CC.errConnClosed
          Scanner.More{}    -> HostName -> IO (Reply, ByteString)
forall a. HasCallStack => HostName -> a
error HostName
"Hedis: parseWith returned Partial"
          Scanner.Done ByteString
rest' Reply
r -> do
            
            
            IORef [Reply] -> ([Reply] -> ([Reply], ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [Reply]
connPending (([Reply] -> ([Reply], ())) -> IO ())
-> ([Reply] -> ([Reply], ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Reply
_:[Reply]
rs) -> ([Reply]
rs, ())
            
            
            IORef Int -> (Int -> (Int, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef Int
connPendingCnt ((Int -> (Int, ())) -> IO ()) -> (Int -> (Int, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 (Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1), ())
            (Reply, ByteString) -> IO (Reply, ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (Reply
r, ByteString
rest')
      [Reply]
rs <- IO [Reply] -> IO [Reply]
forall a. IO a -> IO a
unsafeInterleaveIO (ByteString -> Reply -> IO [Reply]
go ByteString
rest' Reply
r)
      [Reply] -> IO [Reply]
forall (m :: * -> *) a. Monad m => a -> m a
return (Reply
rReply -> [Reply] -> [Reply]
forall a. a -> [a] -> [a]
:[Reply]
rs)
    readMore :: IO ByteString
readMore = IO ByteString -> IO ByteString
forall a. IO a -> IO a
CC.ioErrorToConnLost (IO ByteString -> IO ByteString) -> IO ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ do
      Connection -> IO ()
flush Connection
conn
      ConnectionContext -> IO ByteString
CC.recv ConnectionContext
connCtx