module Database.EventStore.Internal.Connection
( Connection
, ConnectionException(..)
, HostName
, connUUID
, connClose
, connSend
, connRecv
, connIsClosed
, newConnection
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import qualified Data.ByteString as B
import Data.Typeable
import System.IO
import Data.UUID
import Data.UUID.V4
import Network
import Database.EventStore.Internal.Types
import Database.EventStore.Logging
data ConnectionException
= MaxAttemptConnectionReached HostName Int Int
| ClosedConnection
deriving (Show, Typeable)
instance Exception ConnectionException
data In a where
Id :: In UUID
Close :: In ()
Send :: B.ByteString -> In ()
Recv :: Int -> In B.ByteString
data Connection =
Connection
{ _var :: TMVar State
, _host :: HostName
, _port :: Int
, _setts :: Settings
}
data State
= Offline
| Online !UUID !Handle
| Closed
newConnection :: Settings -> HostName -> Int -> IO Connection
newConnection setts host port = do
var <- newTMVarIO Offline
return $ Connection var host port setts
connUUID :: Connection -> IO UUID
connUUID conn = execute conn Id
connClose :: Connection -> IO ()
connClose conn = execute conn Close
connSend :: Connection -> B.ByteString -> IO ()
connSend conn b = execute conn (Send b)
connRecv :: Connection -> Int -> IO B.ByteString
connRecv conn i = execute conn (Recv i)
connIsClosed :: Connection -> STM Bool
connIsClosed Connection{..} = do
r <- readTMVar _var
case r of
Closed -> return True
_ -> return False
execute :: forall a. Connection -> In a -> IO a
execute Connection{..} i = do
res <- atomically $ do
s <- takeTMVar _var
case s of
Offline -> return $ Right Nothing
Online u hdl -> return $ Right $ Just (u, hdl)
Closed -> return $ Left ClosedConnection
case i of
Close ->
case res of
Left _ -> atomically $ putTMVar _var Closed
Right Nothing -> atomically $ putTMVar _var Closed
Right (Just (_, h)) -> do
hClose h
atomically $ putTMVar _var Closed
other ->
case res of
Left e -> do
atomically $ putTMVar _var Closed
throwIO e
Right alt -> do
sres <- case alt of
Nothing -> newState _setts _host _port
Just (u, h) -> return $ Right $ Online u h
case sres of
Left e -> do
atomically $ putTMVar _var Closed
throwIO e
Right s -> do
atomically $ putTMVar _var s
let Online u h = s
case other of
Id -> return u
Send b -> B.hPut h b >> hFlush h
Recv siz -> B.hGet h siz
Close -> error "impossible execute"
newState :: Settings -> HostName -> Int -> IO (Either ConnectionException State)
newState sett host port =
case s_retry sett of
AtMost n ->
let loop i = do
_settingsLog sett (Info $ Connecting i)
let action = fmap Right $ connect sett host port
catch action $ \(_ :: SomeException) -> do
threadDelay delay
if n <= i
then return $
Left $
MaxAttemptConnectionReached host port n
else loop (i + 1) in
loop 1
KeepRetrying ->
let endlessly i = do
_settingsLog sett (Info $ Connecting i)
let action = fmap Right $ connect sett host port
catch action $ \(_ :: SomeException) ->
threadDelay delay >> endlessly (i + 1) in
endlessly (1 :: Int)
where
delay = s_reconnect_delay_secs sett * secs
secs :: Int
secs = 1000000
connect :: Settings -> HostName -> Int -> IO State
connect sett host port = do
hdl <- connectTo host (PortNumber $ fromIntegral port)
hSetBuffering hdl NoBuffering
uuid <- nextRandom
regularConnection sett hdl uuid
regularConnection :: Settings -> Handle -> UUID -> IO State
regularConnection sett h uuid = do
_settingsLog sett (Info $ Connected uuid)
return $ Online uuid h