module Database.EventStore.Internal.Execution.Production
( Production
, ServerConnectionError(..)
, newExecutionModel
, pushOperation
, shutdownExecutionModel
, pushConnectStream
, pushConnectPersist
, pushCreatePersist
, pushUpdatePersist
, pushDeletePersist
, pushAckPersist
, pushNakPersist
, pushUnsubscribe
, prodWaitTillClosed
) where
import Prelude hiding (take)
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Fix
import Data.IORef
import Data.Int
import Data.Foldable
import Data.Typeable
import Text.Printf
import Data.Serialize.Get hiding (Done)
import Data.Serialize.Put
import Data.Text
import Data.UUID
import Database.EventStore.Internal.Connection
import Database.EventStore.Internal.Generator
import Database.EventStore.Internal.Manager.Subscription hiding
( submitPackage
, unsubscribe
, ackPersist
, nakPersist
, abort
)
import Database.EventStore.Internal.Operation hiding (retry)
import Database.EventStore.Internal.Packages
import Database.EventStore.Internal.Processor
import Database.EventStore.Internal.Types
import Database.EventStore.Logging
data Worker
= Reader ThreadId
| Runner ThreadId
| Writer ThreadId
deriving Show
data ServerConnectionError
= WrongPackageFraming
| PackageParsingError String
deriving (Show, Typeable)
instance Exception ServerConnectionError
data Slot a = Slot !a | End
newtype CycleQueue a = CycleQueue (TQueue (Slot a))
newCycleQueue :: IO (CycleQueue a)
newCycleQueue = fmap CycleQueue newTQueueIO
readCycleQueue :: CycleQueue a -> STM a
readCycleQueue (CycleQueue q) = do
Slot a <- readTQueue q
return a
writeCycleQueue :: CycleQueue a -> a -> STM ()
writeCycleQueue (CycleQueue q) a = writeTQueue q (Slot a)
emptyCycleQueue :: CycleQueue a -> STM ()
emptyCycleQueue (CycleQueue q) = writeTQueue q End >> go
where
go = do
s <- readTQueue q
case s of
End -> return ()
_ -> go
updateCycleQueue :: CycleQueue a -> (a -> STM (Maybe a)) -> STM ()
updateCycleQueue (CycleQueue q) k = writeTQueue q End >> go
where
go = do
s <- readTQueue q
case s of
End -> return ()
Slot a -> do
r <- k a
case r of
Nothing -> go
Just a' -> writeTQueue q (Slot a') >> go
isEmptyCycleQueue :: CycleQueue a -> STM Bool
isEmptyCycleQueue (CycleQueue q) = isEmptyTQueue q
wkUpdState :: Worker -> State -> State
wkUpdState (Reader tid) s = s { _reader = Just tid }
wkUpdState (Runner tid) s = s { _runner = Just tid }
wkUpdState (Writer tid) s = s { _writer = Just tid }
data Production =
Prod
{ _submit :: TVar (Msg -> IO ())
, _waitClosed :: STM ()
}
data Env =
Env
{ _setts :: Settings
, _queue :: CycleQueue Msg
, _pkgQueue :: CycleQueue Package
, _jobQueue :: CycleQueue Job
, _state :: TVar State
, _nextSubmit :: TVar (Msg -> IO ())
, _connRef :: IORef Connection
, _disposed :: TMVar ()
}
data Msg
= Stopped Worker SomeException
| Arrived Package
| Shutdown
| forall a.
NewOperation (Either OperationError a -> IO ()) (Operation a)
| ConnectStream (SubConnectEvent -> IO ()) Text Bool
| ConnectPersist (SubConnectEvent -> IO ()) Text Text Int32
| Unsubscribe Running
| CreatePersist (Either PersistActionException ConfirmedAction -> IO ())
Text Text PersistentSubscriptionSettings
| UpdatePersist (Either PersistActionException ConfirmedAction -> IO ())
Text Text PersistentSubscriptionSettings
| DeletePersist (Either PersistActionException ConfirmedAction -> IO ())
Text Text
| AckPersist Running [UUID]
| NakPersist Running NakAction (Maybe Text) [UUID]
pushCmd :: Production -> Msg -> IO ()
pushCmd (Prod _sender _) msg = do
push <- readTVarIO _sender
push msg
shutdownExecutionModel :: Production -> IO ()
shutdownExecutionModel prod = pushCmd prod Shutdown
pushOperation :: Production
-> (Either OperationError a -> IO ())
-> Operation a
-> IO ()
pushOperation prod k op = pushCmd prod (NewOperation k op)
pushConnectStream :: Production
-> (SubConnectEvent -> IO ())
-> Text
-> Bool
-> IO ()
pushConnectStream prod k n tos = pushCmd prod (ConnectStream k n tos)
pushConnectPersist :: Production
-> (SubConnectEvent -> IO ())
-> Text
-> Text
-> Int32
-> IO ()
pushConnectPersist prod k g n buf = pushCmd prod (ConnectPersist k g n buf)
pushCreatePersist :: Production
-> (Either PersistActionException ConfirmedAction -> IO ())
-> Text
-> Text
-> PersistentSubscriptionSettings
-> IO ()
pushCreatePersist prod k g n setts = pushCmd prod (CreatePersist k g n setts)
pushUpdatePersist :: Production
-> (Either PersistActionException ConfirmedAction -> IO ())
-> Text
-> Text
-> PersistentSubscriptionSettings
-> IO ()
pushUpdatePersist prod k g n setts = pushCmd prod (UpdatePersist k g n setts)
pushDeletePersist :: Production
-> (Either PersistActionException ConfirmedAction -> IO ())
-> Text
-> Text
-> IO ()
pushDeletePersist prod k g n = pushCmd prod (DeletePersist k g n)
pushAckPersist :: Production -> Running -> [UUID] -> IO ()
pushAckPersist prod run evts = pushCmd prod (AckPersist run evts)
pushNakPersist :: Production
-> Running
-> NakAction
-> Maybe Text
-> [UUID]
-> IO ()
pushNakPersist prod run act res evts =
pushCmd prod (NakPersist run act res evts)
pushUnsubscribe :: Production -> Running -> IO ()
pushUnsubscribe prod r = pushCmd prod (Unsubscribe r)
prodWaitTillClosed :: Production -> IO ()
prodWaitTillClosed (Prod _ disposed) = atomically disposed
newtype Job = Job (IO ())
data State =
State
{ _proc :: !(Processor (IO ()))
, _reader :: !(Maybe ThreadId)
, _runner :: !(Maybe ThreadId)
, _writer :: !(Maybe ThreadId)
}
emptyState :: Settings -> Generator -> State
emptyState setts gen = State (newProcessor setts gen) Nothing Nothing Nothing
updateProc :: Processor (IO ()) -> State -> State
updateProc p s = s { _proc = p }
reader :: Settings -> CycleQueue Msg -> Connection -> IO ()
reader sett queue c = forever $ do
header_bs <- connRecv c 4
case runGet getLengthPrefix header_bs of
Left _ -> throwIO WrongPackageFraming
Right length_prefix -> connRecv c length_prefix >>= parsePackage
where
parsePackage bs =
case runGet getPackage bs of
Left e -> throwIO $ PackageParsingError e
Right pkg -> do
atomically $ writeCycleQueue queue (Arrived pkg)
let cmd = packageCmd pkg
uuid = packageCorrelation pkg
_settingsLog sett $ Info $ PackageReceived cmd uuid
writer :: Settings -> CycleQueue Package -> Connection -> IO ()
writer setts pkg_queue conn = forever $ do
pkg <- atomically $ readCycleQueue pkg_queue
connSend conn $ runPut $ putPackage pkg
let cmd = packageCmd pkg
uuid = packageCorrelation pkg
_settingsLog setts $ Info $ PackageSent cmd uuid
getLengthPrefix :: Get Int
getLengthPrefix = fmap fromIntegral getWord32le
getPackage :: Get Package
getPackage = do
cmd <- getWord8
flg <- getFlag
col <- getUUID
cred <- getCredentials flg
rest <- remaining
dta <- getBytes rest
let pkg = Package
{ packageCmd = cmd
, packageCorrelation = col
, packageData = dta
, packageCred = cred
}
return pkg
getFlag :: Get Flag
getFlag = do
wd <- getWord8
case wd of
0x00 -> return None
0x01 -> return Authenticated
_ -> fail $ printf "TCP: Unhandled flag value 0x%x" wd
getCredEntryLength :: Get Int
getCredEntryLength = fmap fromIntegral getWord8
getCredentials :: Flag -> Get (Maybe Credentials)
getCredentials None = return Nothing
getCredentials _ = do
loginLen <- getCredEntryLength
login <- getBytes loginLen
passwLen <- getCredEntryLength
passw <- getBytes passwLen
return $ Just $ credentials login passw
getUUID :: Get UUID
getUUID = do
bs <- getLazyByteString 16
case fromByteString bs of
Just uuid -> return uuid
_ -> fail "TCP: Wrong UUID format"
runner :: CycleQueue Job -> IO ()
runner job_queue = forever $ do
Job j <- atomically $ readCycleQueue job_queue
j
spawn :: Env -> (ThreadId -> Worker) -> IO Worker
spawn Env{..} mk = do
conn <- readIORef _connRef
tid <- mfix $ \tid ->
let worker = mk tid
action =
case worker of
Reader _ -> reader _setts _queue conn
Runner _ -> runner _jobQueue
Writer _ -> writer _setts _pkgQueue conn in
forkFinally action $ \r ->
case r of
Left e ->
case asyncExceptionFromException e of
Just ThreadKilled -> return ()
_ -> atomically $ writeCycleQueue _queue
$ Stopped worker e
_ -> return ()
return $ mk tid
runTransition :: Env -> Transition (IO ()) -> STM (Processor (IO ()))
runTransition Env{..} = go
where
go (Produce j nxt) = do
let job = Job j
writeCycleQueue _jobQueue job
go nxt
go (Transmit pkg nxt) = do
writeCycleQueue _pkgQueue pkg
go nxt
go (Await new_proc) = return new_proc
bootstrap :: Env -> IO ()
bootstrap env@Env{..} = do
rew <- spawn env Reader
ruw <- spawn env Runner
wrw <- spawn env Writer
let _F = wkUpdState rew .
wkUpdState ruw .
wkUpdState wrw
atomically $ modifyTVar' _state _F
cruising env
cruising :: Env -> IO ()
cruising env@Env{..} = do
msg <- atomically $ readCycleQueue _queue
s <- readTVarIO _state
case msg of
Stopped _ e -> throwIO e
Arrived pkg -> do
let sm = submitPackage pkg $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
Shutdown -> throwIO ClosedConnection
NewOperation k op -> do
let sm = newOperation k op $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
ConnectStream k n tos -> do
let sm = connectRegularStream k n tos $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
ConnectPersist k g n b -> do
let sm = connectPersistent k g n b $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
Unsubscribe r -> do
let sm = unsubscribe r $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
CreatePersist k g n psetts -> do
let sm = createPersistent k g n psetts $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
UpdatePersist k g n psetts -> do
let sm = updatePersistent k g n psetts $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
DeletePersist k g n -> do
let sm = deletePersistent k g n $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
AckPersist run evts -> do
let sm = ackPersist (return ()) run evts $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
NakPersist run act res evts -> do
let sm = nakPersist (return ()) run act res evts $ _proc s
atomically $ do
new_proc <- runTransition env sm
modifyTVar' _state $ updateProc new_proc
cruising env
closing :: Env -> IO ()
closing env@Env{..} = do
State _ retid rutid wutid <- readTVarIO _state
traverse_ killThread retid
traverse_ killThread wutid
atomically $ emptyCycleQueue _pkgQueue
atomically $ updateCycleQueue _queue $ \nxt ->
case nxt of
Arrived pkg -> do
s <- readTVar _state
let sm = submitPackage pkg $ _proc s
nxt_proc <- runTransition env sm
modifyTVar' _state $ updateProc nxt_proc
return Nothing
Shutdown -> return Nothing
AckPersist _ _ -> return Nothing
NakPersist _ _ _ _ -> return Nothing
Unsubscribe _ -> return Nothing
Stopped _ _ -> return Nothing
_ -> return $ Just nxt
conn <- readIORef _connRef
_ <- try $ connClose conn :: (IO (Either ConnectionException ()))
atomically $ do
s <- readTVar _state
_ <- runTransition env $ abort $ _proc s
return ()
atomically $ do
end <- isEmptyCycleQueue _jobQueue
unless end retry
traverse_ killThread rutid
raiseException :: Exception e => e -> Msg -> IO ()
raiseException e _ = throwIO e
newExecutionModel :: Settings -> HostName -> Int -> IO Production
newExecutionModel setts host port = do
gen <- newGenerator
queue <- newCycleQueue
pkg_queue <- newCycleQueue
job_queue <- newCycleQueue
conn <- newConnection setts host port
conn_ref <- newIORef conn
var <- newTVarIO $ emptyState setts gen
nxt_sub <- newTVarIO (atomically . writeCycleQueue queue)
disposed <- newEmptyTMVarIO
let env = Env setts queue pkg_queue job_queue var nxt_sub conn_ref disposed
handler res = do
closing env
case res of
Left e -> do
_settingsLog setts (Error $ UnexpectedException e)
case fromException e of
Just (_ :: ConnectionException) -> atomically $ do
writeTVar nxt_sub (raiseException e)
putTMVar disposed ()
_ -> do new_conn <- newConnection setts host port
writeIORef conn_ref new_conn
_ <- forkFinally (bootstrap env) handler
return ()
_ -> atomically $ putTMVar disposed ()
_ <- forkFinally (bootstrap env) handler
return $ Prod nxt_sub $ do
closed <- connIsClosed conn
unless closed retry
readTMVar disposed