{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE LambdaCase #-}
module Database.EventStore.Internal.Connection
( ConnectionBuilder(..)
, Connection(..)
, RecvOutcome(..)
, PackageArrived(..)
, ConnectionError(..)
, ConnectionEstablished(..)
, ConnectionClosed(..)
, ConnectionRef(..)
, getConnection
, connectionBuilder
, connectionError
) where
import Prelude (String)
import Text.Printf
import Control.Monad.Reader
import Data.Serialize
import Data.UUID
import qualified Network.Connection as Network
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.EndPoint
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Types
newtype ConnectionBuilder =
ConnectionBuilder { ConnectionBuilder -> EndPoint -> EventStore Connection
connect :: EndPoint -> EventStore Connection }
data RecvOutcome
= ResetByPeer
| Recv Package
| WrongFraming
| ParsingError
type ConnectionId = UUID
newtype ConnectionRef =
ConnectionRef { ConnectionRef -> EventStore (Maybe Connection)
maybeConnection :: EventStore (Maybe Connection) }
getConnection :: ConnectionRef -> EventStore Connection
getConnection :: ConnectionRef -> EventStore Connection
getConnection ConnectionRef
ref =
ConnectionRef -> EventStore (Maybe Connection)
maybeConnection ConnectionRef
ref EventStore (Maybe Connection)
-> (Maybe Connection -> EventStore Connection)
-> EventStore Connection
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just Connection
conn -> Connection -> EventStore Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn
Maybe Connection
Nothing -> do
$(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logError) Text
"Expected a connection but got none."
String -> EventStore Connection
forall (m :: * -> *) a.
(MonadThrow m, HasCallStack) =>
String -> m a
throwString String
"No current connection (impossible situation)"
data Connection =
Connection { Connection -> ConnectionId
connectionId :: ConnectionId
, Connection -> EndPoint
connectionEndPoint :: EndPoint
, Connection -> Package -> EventStore ()
enqueuePackage :: Package -> EventStore ()
, Connection -> EventStore ()
dispose :: EventStore ()
}
instance Show Connection where
show :: Connection -> String
show Connection{ConnectionId
EndPoint
EventStore ()
Package -> EventStore ()
dispose :: EventStore ()
enqueuePackage :: Package -> EventStore ()
connectionEndPoint :: EndPoint
connectionId :: ConnectionId
dispose :: Connection -> EventStore ()
enqueuePackage :: Connection -> Package -> EventStore ()
connectionEndPoint :: Connection -> EndPoint
connectionId :: Connection -> ConnectionId
..} = String
"Connection [" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> ConnectionId -> String
forall a. Show a => a -> String
show ConnectionId
connectionId String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"] on "
String -> ShowS
forall a. Semigroup a => a -> a -> a
<> EndPoint -> String
forall a. Show a => a -> String
show EndPoint
connectionEndPoint
instance Eq Connection where
Connection
a == :: Connection -> Connection -> Bool
== Connection
b = Connection -> ConnectionId
connectionId Connection
a ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== Connection -> ConnectionId
connectionId Connection
b
newtype ConnectionState =
ConnectionState { ConnectionState -> TBMQueue Package
_sendQueue :: TBMQueue Package }
data PackageArrived = PackageArrived Connection Package deriving Typeable
data ConnectionError =
ConnectionError Connection SomeException deriving Typeable
connectionError :: Exception e => Connection -> e -> ConnectionError
connectionError :: Connection -> e -> ConnectionError
connectionError Connection
c = Connection -> SomeException -> ConnectionError
ConnectionError Connection
c (SomeException -> ConnectionError)
-> (e -> SomeException) -> e -> ConnectionError
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> SomeException
forall e. Exception e => e -> SomeException
toException
data ConnectionClosed = ConnectionClosed Connection SomeException
deriving Typeable
data ConnectionEstablished =
ConnectionEstablished Connection deriving Typeable
newtype ConnectionResetByPeer = ConnectionResetByPeer SomeException
deriving Typeable
instance Show ConnectionResetByPeer where
show :: ConnectionResetByPeer -> String
show (ConnectionResetByPeer SomeException
reason) =
String
"Connection reset by peer: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> SomeException -> String
forall a. Show a => a -> String
show SomeException
reason
instance Exception ConnectionResetByPeer
data ProtocolError
= WrongFramingError !String
| PackageParsingError !String
deriving Typeable
instance Show ProtocolError where
show :: ProtocolError -> String
show (WrongFramingError String
reason) = String
"Package framing error: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
reason
show (PackageParsingError String
reason) = String
"Package parsing error: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
reason
instance Exception ProtocolError
connectionBuilder :: Settings -> IO ConnectionBuilder
connectionBuilder :: Settings -> IO ConnectionBuilder
connectionBuilder Settings
setts = do
ConnectionContext
ctx <- IO ConnectionContext
Network.initConnectionContext
ConnectionBuilder -> IO ConnectionBuilder
forall (m :: * -> *) a. Monad m => a -> m a
return (ConnectionBuilder -> IO ConnectionBuilder)
-> ConnectionBuilder -> IO ConnectionBuilder
forall a b. (a -> b) -> a -> b
$ (EndPoint -> EventStore Connection) -> ConnectionBuilder
ConnectionBuilder ((EndPoint -> EventStore Connection) -> ConnectionBuilder)
-> (EndPoint -> EventStore Connection) -> ConnectionBuilder
forall a b. (a -> b) -> a -> b
$ \EndPoint
ept -> do
ConnectionId
cid <- EventStore ConnectionId
forall (m :: * -> *). MonadIO m => m ConnectionId
freshUUID
ConnectionState
state <- EventStore ConnectionState
createState
(Connection -> EventStore Connection) -> EventStore Connection
forall (m :: * -> *) a. MonadFix m => (a -> m a) -> m a
mfix ((Connection -> EventStore Connection) -> EventStore Connection)
-> (Connection -> EventStore Connection) -> EventStore Connection
forall a b. (a -> b) -> a -> b
$ \Connection
self -> do
Async Connection
tcpConnAsync <- EventStore Connection
-> EventStore (Async (StM EventStore Connection))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async (EventStore Connection
-> EventStore (Async (StM EventStore Connection)))
-> EventStore Connection
-> EventStore (Async (StM EventStore Connection))
forall a b. (a -> b) -> a -> b
$
EventStore Connection
-> EventStore (Either SomeException Connection)
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny (Settings -> ConnectionContext -> EndPoint -> EventStore Connection
createConnection Settings
setts ConnectionContext
ctx EndPoint
ept) EventStore (Either SomeException Connection)
-> (Either SomeException Connection -> EventStore Connection)
-> EventStore Connection
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SomeException
e -> do
ConnectionClosed -> EventStore ()
forall a. Typeable a => a -> EventStore ()
publish (Connection -> SomeException -> ConnectionClosed
ConnectionClosed Connection
self SomeException
e)
SomeException -> EventStore Connection
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw SomeException
e
Right Connection
conn -> do
ConnectionEstablished -> EventStore ()
forall a. Typeable a => a -> EventStore ()
publish (Connection -> ConnectionEstablished
ConnectionEstablished Connection
self)
Connection -> EventStore Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
conn
Async ()
sendAsync <- EventStore () -> EventStore (Async (StM EventStore ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async (ConnectionState -> Connection -> Async Connection -> EventStore ()
sending ConnectionState
state Connection
self Async Connection
tcpConnAsync)
Async ()
recvAsync <- EventStore () -> EventStore (Async (StM EventStore ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async (ConnectionState -> Connection -> Async Connection -> EventStore ()
receiving ConnectionState
state Connection
self Async Connection
tcpConnAsync)
Connection -> EventStore Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection :: ConnectionId
-> EndPoint
-> (Package -> EventStore ())
-> EventStore ()
-> Connection
Connection { connectionId :: ConnectionId
connectionId = ConnectionId
cid
, connectionEndPoint :: EndPoint
connectionEndPoint = EndPoint
ept
, enqueuePackage :: Package -> EventStore ()
enqueuePackage = ConnectionState -> Package -> EventStore ()
enqueue ConnectionState
state
, dispose :: EventStore ()
dispose = do
ConnectionState -> EventStore ()
closeState ConnectionState
state
Async Connection -> EventStore ()
disposeConnection Async Connection
tcpConnAsync
Async () -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => Async a -> m ()
cancel Async ()
sendAsync
Async () -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => Async a -> m ()
cancel Async ()
recvAsync
}
createState :: EventStore ConnectionState
createState :: EventStore ConnectionState
createState = TBMQueue Package -> ConnectionState
ConnectionState (TBMQueue Package -> ConnectionState)
-> EventStore (TBMQueue Package) -> EventStore ConnectionState
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (TBMQueue Package) -> EventStore (TBMQueue Package)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO (TBMQueue Package)
forall a. Int -> IO (TBMQueue a)
newTBMQueueIO Int
500)
closeState :: ConnectionState -> EventStore ()
closeState :: ConnectionState -> EventStore ()
closeState ConnectionState{TBMQueue Package
_sendQueue :: TBMQueue Package
_sendQueue :: ConnectionState -> TBMQueue Package
..} = STM () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> EventStore ()) -> STM () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ TBMQueue Package -> STM ()
forall a. TBMQueue a -> STM ()
closeTBMQueue TBMQueue Package
_sendQueue
createConnection :: Settings
-> Network.ConnectionContext
-> EndPoint
-> EventStore Network.Connection
createConnection :: Settings -> ConnectionContext -> EndPoint -> EventStore Connection
createConnection Settings
setts ConnectionContext
ctx EndPoint
ept = IO Connection -> EventStore Connection
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Connection -> EventStore Connection)
-> IO Connection -> EventStore Connection
forall a b. (a -> b) -> a -> b
$ ConnectionContext -> ConnectionParams -> IO Connection
Network.connectTo ConnectionContext
ctx ConnectionParams
params
where
host :: String
host = EndPoint -> String
endPointIp EndPoint
ept
port :: PortNumber
port = Int -> PortNumber
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> PortNumber) -> Int -> PortNumber
forall a b. (a -> b) -> a -> b
$ EndPoint -> Int
endPointPort EndPoint
ept
params :: ConnectionParams
params = String
-> PortNumber
-> Maybe TLSSettings
-> Maybe ProxySettings
-> ConnectionParams
Network.ConnectionParams String
host PortNumber
port (Settings -> Maybe TLSSettings
s_ssl Settings
setts) Maybe ProxySettings
forall a. Maybe a
Nothing
disposeConnection :: Async Network.Connection -> EventStore ()
disposeConnection :: Async Connection -> EventStore ()
disposeConnection Async Connection
as = (Either SomeException Connection -> EventStore ())
-> Maybe (Either SomeException Connection) -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Either SomeException Connection -> EventStore ()
tryDisposing (Maybe (Either SomeException Connection) -> EventStore ())
-> EventStore (Maybe (Either SomeException Connection))
-> EventStore ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Async (StM EventStore Connection)
-> EventStore (Maybe (Either SomeException Connection))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m (Maybe (Either SomeException a))
poll Async Connection
Async (StM EventStore Connection)
as
where
tryDisposing :: Either SomeException Connection -> EventStore ()
tryDisposing = (Connection -> EventStore ())
-> Either SomeException Connection -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Connection -> EventStore ()
disposing
disposing :: Connection -> EventStore ()
disposing = IO () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> EventStore ())
-> (Connection -> IO ()) -> Connection -> EventStore ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> IO ()
Network.connectionClose
receivePackage :: Connection -> Network.Connection -> EventStore Package
receivePackage :: Connection -> Connection -> EventStore Package
receivePackage Connection
self Connection
conn =
EventStore ByteString
-> EventStore (Either SomeException ByteString)
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny (IO ByteString -> EventStore ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> EventStore ByteString)
-> IO ByteString -> EventStore ByteString
forall a b. (a -> b) -> a -> b
$ Connection -> Int -> IO ByteString
Network.connectionGetExact Connection
conn Int
4) EventStore (Either SomeException ByteString)
-> (Either SomeException ByteString -> EventStore Package)
-> EventStore Package
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SomeException
e -> do
ConnectionClosed -> EventStore ()
forall a. Typeable a => a -> EventStore ()
publish (Connection -> SomeException -> ConnectionClosed
ConnectionClosed Connection
self SomeException
e)
SomeException -> EventStore Package
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw SomeException
e
Right ByteString
frame ->
case Get Int -> ByteString -> Either String Int
forall a. Get a -> ByteString -> Either String a
runGet Get Int
getLengthPrefix ByteString
frame of
Left String
reason -> do
let cause :: ProtocolError
cause = String -> ProtocolError
WrongFramingError String
reason
ConnectionError -> EventStore ()
forall a. Typeable a => a -> EventStore ()
publish (Connection -> ProtocolError -> ConnectionError
forall e. Exception e => Connection -> e -> ConnectionError
connectionError Connection
self ProtocolError
cause)
ProtocolError -> EventStore Package
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw ProtocolError
cause
Right Int
prefix -> do
EventStore ByteString
-> EventStore (Either SomeException ByteString)
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny (IO ByteString -> EventStore ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> EventStore ByteString)
-> IO ByteString -> EventStore ByteString
forall a b. (a -> b) -> a -> b
$ Connection -> Int -> IO ByteString
Network.connectionGetExact Connection
conn Int
prefix) EventStore (Either SomeException ByteString)
-> (Either SomeException ByteString -> EventStore Package)
-> EventStore Package
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SomeException
e -> do
ConnectionClosed -> EventStore ()
forall a. Typeable a => a -> EventStore ()
publish (Connection -> SomeException -> ConnectionClosed
ConnectionClosed Connection
self SomeException
e)
SomeException -> EventStore Package
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw SomeException
e
Right ByteString
payload ->
case Get Package -> ByteString -> Either String Package
forall a. Get a -> ByteString -> Either String a
runGet Get Package
getPackage ByteString
payload of
Left String
reason -> do
let cause :: ProtocolError
cause = String -> ProtocolError
PackageParsingError String
reason
ConnectionError -> EventStore ()
forall a. Typeable a => a -> EventStore ()
publish (Connection -> ProtocolError -> ConnectionError
forall e. Exception e => Connection -> e -> ConnectionError
connectionError Connection
self ProtocolError
cause)
ProtocolError -> EventStore Package
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throw ProtocolError
cause
Right Package
pkg -> Package -> EventStore Package
forall (m :: * -> *) a. Monad m => a -> m a
return Package
pkg
receiving :: ConnectionState
-> Connection
-> Async Network.Connection
-> EventStore ()
receiving :: ConnectionState -> Connection -> Async Connection -> EventStore ()
receiving ConnectionState{TBMQueue Package
_sendQueue :: TBMQueue Package
_sendQueue :: ConnectionState -> TBMQueue Package
..} Connection
self Async Connection
tcpConnAsync =
EventStore () -> EventStore ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (EventStore () -> EventStore ())
-> (Connection -> EventStore ()) -> Connection -> EventStore ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> EventStore ()
go (Connection -> EventStore ())
-> EventStore Connection -> EventStore ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Async (StM EventStore Connection) -> EventStore Connection
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait Async Connection
Async (StM EventStore Connection)
tcpConnAsync
where
go :: Connection -> EventStore ()
go Connection
conn =
PackageArrived -> EventStore ()
forall a. Typeable a => a -> EventStore ()
publish (PackageArrived -> EventStore ())
-> (Package -> PackageArrived) -> Package -> EventStore ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Connection -> Package -> PackageArrived
PackageArrived Connection
self (Package -> EventStore ()) -> EventStore Package -> EventStore ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Connection -> Connection -> EventStore Package
receivePackage Connection
self Connection
conn
enqueue :: ConnectionState -> Package -> EventStore ()
enqueue :: ConnectionState -> Package -> EventStore ()
enqueue ConnectionState{TBMQueue Package
_sendQueue :: TBMQueue Package
_sendQueue :: ConnectionState -> TBMQueue Package
..} pkg :: Package
pkg@Package{Maybe Credentials
ByteString
ConnectionId
Command
packageCred :: Package -> Maybe Credentials
packageData :: Package -> ByteString
packageCorrelation :: Package -> ConnectionId
packageCmd :: Package -> Command
packageCred :: Maybe Credentials
packageData :: ByteString
packageCorrelation :: ConnectionId
packageCmd :: Command
..} = do
$(Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logDebug) [i|Package enqueued: #{pkg}|]
STM () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> EventStore ()) -> STM () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ TBMQueue Package -> Package -> STM ()
forall a. TBMQueue a -> a -> STM ()
writeTBMQueue TBMQueue Package
_sendQueue Package
pkg
sending :: ConnectionState
-> Connection
-> Async Network.Connection
-> EventStore ()
sending :: ConnectionState -> Connection -> Async Connection -> EventStore ()
sending ConnectionState{TBMQueue Package
_sendQueue :: TBMQueue Package
_sendQueue :: ConnectionState -> TBMQueue Package
..} Connection
self Async Connection
tcpConnAsync = Connection -> EventStore ()
go (Connection -> EventStore ())
-> EventStore Connection -> EventStore ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Async (StM EventStore Connection) -> EventStore Connection
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait Async Connection
Async (StM EventStore Connection)
tcpConnAsync
where
go :: Connection -> EventStore ()
go Connection
conn =
let loop :: EventStore ()
loop = (Package -> EventStore ()) -> Maybe Package -> EventStore ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ Package -> EventStore ()
send (Maybe Package -> EventStore ())
-> EventStore (Maybe Package) -> EventStore ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STM (Maybe Package) -> EventStore (Maybe Package)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBMQueue Package -> STM (Maybe Package)
forall a. TBMQueue a -> STM (Maybe a)
readTBMQueue TBMQueue Package
_sendQueue)
send :: Package -> EventStore ()
send Package
pkg =
EventStore () -> EventStore (Either SomeException ())
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny (IO () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> EventStore ()) -> IO () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ Connection -> ByteString -> IO ()
Network.connectionPut Connection
conn ByteString
bytes) EventStore (Either SomeException ())
-> (Either SomeException () -> EventStore ()) -> EventStore ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SomeException
e -> ConnectionClosed -> EventStore ()
forall a. Typeable a => a -> EventStore ()
publish (Connection -> SomeException -> ConnectionClosed
ConnectionClosed Connection
self SomeException
e)
Right ()
_ -> do
Int -> EventStore ()
monitorAddDataTransmitted (ByteString -> Int
forall mono. MonoFoldable mono => mono -> Int
length ByteString
bytes)
EventStore ()
loop
where
bytes :: ByteString
bytes = Put -> ByteString
runPut (Put -> ByteString) -> Put -> ByteString
forall a b. (a -> b) -> a -> b
$ Package -> Put
putPackage Package
pkg in
EventStore ()
loop
putPackage :: Package -> Put
putPackage :: Package -> Put
putPackage Package
pkg = do
Putter Word32
putWord32le Word32
length_prefix
Putter Word8
putWord8 (Command -> Word8
cmdWord8 (Command -> Word8) -> Command -> Word8
forall a b. (a -> b) -> a -> b
$ Package -> Command
packageCmd Package
pkg)
Putter Word8
putWord8 Word8
flag_word8
Putter ByteString
putLazyByteString ByteString
corr_bytes
Maybe Credentials -> (Credentials -> Put) -> Put
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe Credentials
cred_m ((Credentials -> Put) -> Put) -> (Credentials -> Put) -> Put
forall a b. (a -> b) -> a -> b
$ \(Credentials ByteString
login ByteString
passw) -> do
Putter Word8
putWord8 Putter Word8 -> Putter Word8
forall a b. (a -> b) -> a -> b
$ Int -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word8) -> Int -> Word8
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
forall mono. MonoFoldable mono => mono -> Int
length ByteString
login
Putter ByteString
putByteString ByteString
login
Putter Word8
putWord8 Putter Word8 -> Putter Word8
forall a b. (a -> b) -> a -> b
$ Int -> Word8
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Word8) -> Int -> Word8
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
forall mono. MonoFoldable mono => mono -> Int
length ByteString
passw
Putter ByteString
putByteString ByteString
passw
Putter ByteString
putByteString ByteString
pack_data
where
pack_data :: ByteString
pack_data = Package -> ByteString
packageData Package
pkg
cred_len :: Int
cred_len = Int -> (Credentials -> Int) -> Maybe Credentials -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
0 Credentials -> Int
credSize Maybe Credentials
cred_m
length_prefix :: Word32
length_prefix = Int -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (ByteString -> Int
forall mono. MonoFoldable mono => mono -> Int
length ByteString
pack_data Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
mandatorySize Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
cred_len)
cred_m :: Maybe Credentials
cred_m = Package -> Maybe Credentials
packageCred Package
pkg
flag_word8 :: Word8
flag_word8 = Word8 -> (Credentials -> Word8) -> Maybe Credentials -> Word8
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Word8
0x00 (Word8 -> Credentials -> Word8
forall a b. a -> b -> a
const Word8
0x01) Maybe Credentials
cred_m
corr_bytes :: ByteString
corr_bytes = ConnectionId -> ByteString
toByteString (ConnectionId -> ByteString) -> ConnectionId -> ByteString
forall a b. (a -> b) -> a -> b
$ Package -> ConnectionId
packageCorrelation Package
pkg
credSize :: Credentials -> Int
credSize :: Credentials -> Int
credSize (Credentials ByteString
login ByteString
passw) = ByteString -> Int
forall mono. MonoFoldable mono => mono -> Int
length ByteString
login Int -> Int -> Int
forall a. Num a => a -> a -> a
+ ByteString -> Int
forall mono. MonoFoldable mono => mono -> Int
length ByteString
passw Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
2
mandatorySize :: Int
mandatorySize :: Int
mandatorySize = Int
18
getLengthPrefix :: Get Int
getLengthPrefix :: Get Int
getLengthPrefix = (Word32 -> Int) -> Get Word32 -> Get Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Get Word32
getWord32le
getPackage :: Get Package
getPackage :: Get Package
getPackage = do
Word8
cmd <- Get Word8
getWord8
Flag
flg <- Get Flag
getFlag
ConnectionId
col <- Get ConnectionId
getUUID
Maybe Credentials
cred <- Flag -> Get (Maybe Credentials)
getCredentials Flag
flg
Int
rest <- Get Int
remaining
ByteString
dta <- Int -> Get ByteString
getBytes Int
rest
let pkg :: Package
pkg = Package :: Command
-> ConnectionId -> ByteString -> Maybe Credentials -> Package
Package
{ packageCmd :: Command
packageCmd = Word8 -> Command
getCommand Word8
cmd
, packageCorrelation :: ConnectionId
packageCorrelation = ConnectionId
col
, packageData :: ByteString
packageData = ByteString
dta
, packageCred :: Maybe Credentials
packageCred = Maybe Credentials
cred
}
Package -> Get Package
forall (m :: * -> *) a. Monad m => a -> m a
return Package
pkg
getFlag :: Get Flag
getFlag :: Get Flag
getFlag = do
Word8
wd <- Get Word8
getWord8
case Word8
wd of
Word8
0x00 -> Flag -> Get Flag
forall (m :: * -> *) a. Monad m => a -> m a
return Flag
None
Word8
0x01 -> Flag -> Get Flag
forall (m :: * -> *) a. Monad m => a -> m a
return Flag
Authenticated
Word8
_ -> String -> Get Flag
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Get Flag) -> String -> Get Flag
forall a b. (a -> b) -> a -> b
$ String -> Word8 -> String
forall r. PrintfType r => String -> r
printf String
"TCP: Unhandled flag value 0x%x" Word8
wd
getCredEntryLength :: Get Int
getCredEntryLength :: Get Int
getCredEntryLength = (Word8 -> Int) -> Get Word8 -> Get Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Word8 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Get Word8
getWord8
getCredentials :: Flag -> Get (Maybe Credentials)
getCredentials :: Flag -> Get (Maybe Credentials)
getCredentials Flag
None = Maybe Credentials -> Get (Maybe Credentials)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe Credentials
forall a. Maybe a
Nothing
getCredentials Flag
_ = do
Int
loginLen <- Get Int
getCredEntryLength
ByteString
login <- Int -> Get ByteString
getBytes Int
loginLen
Int
passwLen <- Get Int
getCredEntryLength
ByteString
passw <- Int -> Get ByteString
getBytes Int
passwLen
Maybe Credentials -> Get (Maybe Credentials)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe Credentials -> Get (Maybe Credentials))
-> Maybe Credentials -> Get (Maybe Credentials)
forall a b. (a -> b) -> a -> b
$ Credentials -> Maybe Credentials
forall a. a -> Maybe a
Just (Credentials -> Maybe Credentials)
-> Credentials -> Maybe Credentials
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString -> Credentials
credentials ByteString
login ByteString
passw
getUUID :: Get UUID
getUUID :: Get ConnectionId
getUUID = do
ByteString
bs <- Int64 -> Get ByteString
getLazyByteString Int64
16
case ByteString -> Maybe ConnectionId
fromByteString ByteString
bs of
Just ConnectionId
uuid -> ConnectionId -> Get ConnectionId
forall (m :: * -> *) a. Monad m => a -> m a
return ConnectionId
uuid
Maybe ConnectionId
_ -> String -> Get ConnectionId
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"TCP: Wrong UUID format"