module ProjectM36.Client
(ConnectionInfo(..),
Connection(..),
Port,
Hostname,
DatabaseName,
ConnectionError(..),
connectProjectM36,
close,
closeRemote_,
executeRelationalExpr,
executeDatabaseContextExpr,
executeDatabaseContextIOExpr,
executeGraphExpr,
executeSchemaExpr,
executeTransGraphRelationalExpr,
commit,
rollback,
typeForRelationalExpr,
inclusionDependencies,
planForDatabaseContextExpr,
currentSchemaName,
SchemaName,
HeadName,
setCurrentSchemaName,
transactionGraphAsRelation,
relationVariablesAsRelation,
headName,
remoteDBLookupName,
defaultServerPort,
headTransactionId,
defaultDatabaseName,
defaultRemoteConnectionInfo,
defaultHeadName,
PersistenceStrategy(..),
RelationalExpr,
RelationalExprBase(..),
DatabaseContextExpr(..),
DatabaseContextIOExpr(..),
Attribute(..),
attributesFromList,
createNodeId,
createSessionAtCommit,
createSessionAtHead,
closeSession,
addClientNode,
callTestTimeout_,
RelationCardinality(..),
TransactionGraphOperator(..),
CommitOption(..),
transactionGraph_,
disconnectedTransaction_,
TransGraphRelationalExpr,
TransactionIdLookup(..),
TransactionIdHeadBacktrack(..),
NodeId(..),
Atom(..),
Session,
SessionId,
NotificationCallback,
emptyNotificationCallback,
EvaluatedNotification(..),
atomTypesAsRelation,
AttributeExpr,
inclusionDependencyForKey,
databaseContextExprForUniqueKey,
databaseContextExprForForeignKey,
createScriptedAtomFunction,
AttributeExprBase(..),
TypeConstructor(..),
TypeConstructorDef(..),
DataConstructorDef(..),
AttributeNames(..),
RelVarName,
IncDepName,
InclusionDependency(..),
AttributeName,
RequestTimeoutException(..),
RemoteProcessDiedException(..),
AtomType(..),
Atomable(..),
TupleExprBase(..),
AtomExprBase(..),
RestrictionPredicateExprBase(..)
) where
import ProjectM36.Base hiding (inclusionDependencies)
import qualified ProjectM36.Base as B
import ProjectM36.Error
import ProjectM36.Atomable
import ProjectM36.AtomFunction
import ProjectM36.StaticOptimizer
import ProjectM36.Key
import qualified ProjectM36.IsomorphicSchema as Schema
import Control.Monad.State
import Control.Monad.Trans.Reader
import qualified ProjectM36.RelationalExpression as RE
import ProjectM36.DatabaseContext (basicDatabaseContext)
import ProjectM36.TransactionGraph
import qualified ProjectM36.Transaction as Trans
import ProjectM36.TransactionGraph.Persist
import ProjectM36.Attribute hiding (atomTypes)
import ProjectM36.TransGraphRelationalExpression (TransGraphRelationalExpr, evalTransGraphRelationalExpr)
import ProjectM36.Persist (DiskSync(..))
import ProjectM36.FileLock
import ProjectM36.Notifications
import ProjectM36.Server.RemoteCallTypes
import qualified ProjectM36.DisconnectedTransaction as Discon
import ProjectM36.Relation (typesAsRelation)
import ProjectM36.ScriptSession (initScriptSession, ScriptSession)
import qualified ProjectM36.Relation as R
import qualified ProjectM36.DatabaseContext as DBC
import Control.Exception.Base
import GHC.Conc.Sync
import Network.Transport (Transport(closeTransport))
import Network.Transport.TCP (createTransport, defaultTCPParameters, encodeEndPointAddress)
import Control.Distributed.Process.Node (newLocalNode, initRemoteTable, runProcess, LocalNode, forkProcess, closeLocalNode)
import Control.Distributed.Process.Extras.Internal.Types (whereisRemote)
import Control.Distributed.Process.ManagedProcess.Client (call, safeCall)
import Control.Distributed.Process (NodeId(..), reconnect)
import Data.UUID.V4 (nextRandom)
import Data.Word
import Control.Distributed.Process (ProcessId, Process, receiveWait, send, match)
import Control.Exception (IOException, handle, AsyncException, throwIO, fromException, Exception)
import Control.Concurrent.MVar
import qualified Data.Map as M
import Control.Distributed.Process.Serializable (Serializable)
import qualified STMContainers.Map as STMMap
import qualified STMContainers.Set as STMSet
import qualified ProjectM36.Session as Sess
import ProjectM36.Session
import ProjectM36.Sessions
import ListT
import Data.Binary (Binary)
import GHC.Generics (Generic)
import Control.DeepSeq (force)
import System.IO
type Hostname = String
type Port = Word16
type DatabaseName = String
type NotificationCallback = NotificationName -> EvaluatedNotification -> IO ()
emptyNotificationCallback :: NotificationCallback
emptyNotificationCallback _ _ = pure ()
type GhcPkgPath = String
data RemoteProcessDiedException = RemoteProcessDiedException
deriving (Show, Eq)
instance Exception RemoteProcessDiedException
data RequestTimeoutException = RequestTimeoutException
deriving (Show, Eq)
instance Exception RequestTimeoutException
data ConnectionInfo = InProcessConnectionInfo PersistenceStrategy NotificationCallback [GhcPkgPath]|
RemoteProcessConnectionInfo DatabaseName NodeId NotificationCallback
type EvaluatedNotifications = M.Map NotificationName EvaluatedNotification
data NotificationMessage = NotificationMessage EvaluatedNotifications
deriving (Binary, Eq, Show, Generic)
data EvaluatedNotification = EvaluatedNotification {
notification :: Notification,
reportRelation :: Either RelationalError Relation
}
deriving(Binary, Eq, Show, Generic)
createNodeId :: Hostname -> Port -> NodeId
createNodeId host port = NodeId $ encodeEndPointAddress host (show port) 1
defaultServerPort :: Port
defaultServerPort = 6543
defaultDatabaseName :: DatabaseName
defaultDatabaseName = "base"
defaultHeadName :: HeadName
defaultHeadName = "master"
defaultRemoteConnectionInfo :: ConnectionInfo
defaultRemoteConnectionInfo = RemoteProcessConnectionInfo defaultDatabaseName (createNodeId "127.0.0.1" defaultServerPort) emptyNotificationCallback
type ClientNodes = STMSet.Set ProcessId
type TransactionGraphLockHandle = Handle
data InProcessConnectionConf = InProcessConnectionConf {
ipPersistenceStrategy :: PersistenceStrategy,
ipClientNodes :: ClientNodes,
ipSessions :: Sessions,
ipTransactionGraph :: TVar TransactionGraph,
ipScriptSession :: Maybe ScriptSession,
ipLocalNode :: LocalNode,
ipTransport :: Transport,
ipLocks :: Maybe (TransactionGraphLockHandle, MVar LockFileHash)
}
data RemoteProcessConnectionConf = RemoteProcessConnectionConf {
rLocalNode :: LocalNode,
rProcessId :: ProcessId,
rTransport :: Transport
}
data Connection = InProcessConnection InProcessConnectionConf |
RemoteProcessConnection RemoteProcessConnectionConf
data ConnectionError = SetupDatabaseDirectoryError PersistenceError |
IOExceptionError IOException |
NoSuchDatabaseByNameError DatabaseName |
LoginError
deriving (Show, Eq, Generic)
remoteDBLookupName :: DatabaseName -> String
remoteDBLookupName = (++) "db-"
createLocalNode :: IO (LocalNode, Transport)
createLocalNode = do
eLocalTransport <- createTransport "127.0.0.1" "0" defaultTCPParameters
case eLocalTransport of
Left err -> error ("failed to create transport: " ++ show err)
Right localTransport -> do
localNode <- newLocalNode localTransport initRemoteTable
pure (localNode, localTransport)
notificationListener :: NotificationCallback -> Process ()
notificationListener callback = do
_ <- forever $ do
receiveWait [
match (\(NotificationMessage eNots) -> do
liftIO $ mapM_ (uncurry callback) (M.toList eNots)
)
]
pure ()
startNotificationListener :: LocalNode -> NotificationCallback -> IO (ProcessId)
startNotificationListener localNode callback = forkProcess localNode (notificationListener callback)
createScriptSession :: [String] -> IO (Maybe ScriptSession)
createScriptSession ghcPkgPaths = do
eScriptSession <- initScriptSession ghcPkgPaths
case eScriptSession of
Left err -> hPutStrLn stderr ("Failed to load scripting engine- scripting disabled: " ++ (show err)) >> pure Nothing
Right s -> pure (Just s)
connectProjectM36 :: ConnectionInfo -> IO (Either ConnectionError Connection)
connectProjectM36 (InProcessConnectionInfo strat notificationCallback ghcPkgPaths) = do
freshId <- nextRandom
let bootstrapContext = basicDatabaseContext
freshGraph = bootstrapTransactionGraph freshId bootstrapContext
case strat of
NoPersistence -> do
graphTvar <- newTVarIO freshGraph
clientNodes <- STMSet.newIO
sessions <- STMMap.newIO
(localNode, transport) <- createLocalNode
notificationPid <- startNotificationListener localNode notificationCallback
mScriptSession <- createScriptSession ghcPkgPaths
let conn = InProcessConnection (InProcessConnectionConf {
ipPersistenceStrategy = strat,
ipClientNodes = clientNodes,
ipSessions = sessions,
ipTransactionGraph = graphTvar,
ipScriptSession = mScriptSession,
ipLocalNode = localNode,
ipTransport = transport,
ipLocks = Nothing})
addClientNode conn notificationPid
pure (Right conn)
MinimalPersistence dbdir -> connectPersistentProjectM36 strat NoDiskSync dbdir freshGraph notificationCallback ghcPkgPaths
CrashSafePersistence dbdir -> connectPersistentProjectM36 strat FsyncDiskSync dbdir freshGraph notificationCallback ghcPkgPaths
connectProjectM36 (RemoteProcessConnectionInfo databaseName serverNodeId notificationCallback) = do
connStatus <- newEmptyMVar
(localNode, transport) <- createLocalNode
let dbName = remoteDBLookupName databaseName
notificationListenerPid <- startNotificationListener localNode notificationCallback
runProcess localNode $ do
mServerProcessId <- whereisRemote serverNodeId dbName
case mServerProcessId of
Nothing -> liftIO $ putMVar connStatus $ Left (NoSuchDatabaseByNameError databaseName)
Just serverProcessId -> do
loginConfirmation <- safeLogin (Login notificationListenerPid) serverProcessId
if not loginConfirmation then
liftIO $ putMVar connStatus (Left LoginError)
else do
liftIO $ putMVar connStatus (Right $ RemoteProcessConnection (RemoteProcessConnectionConf {rLocalNode = localNode, rProcessId = serverProcessId, rTransport = transport}))
status <- takeMVar connStatus
pure status
connectPersistentProjectM36 :: PersistenceStrategy ->
DiskSync ->
FilePath ->
TransactionGraph ->
NotificationCallback ->
[GhcPkgPath] ->
IO (Either ConnectionError Connection)
connectPersistentProjectM36 strat sync dbdir freshGraph notificationCallback ghcPkgPaths = do
err <- setupDatabaseDir sync dbdir freshGraph
case err of
Left err' -> return $ Left (SetupDatabaseDirectoryError err')
Right (lockFileH, digest) -> do
mScriptSession <- createScriptSession ghcPkgPaths
graph <- transactionGraphLoad dbdir emptyTransactionGraph mScriptSession
case graph of
Left err' -> return $ Left (SetupDatabaseDirectoryError err')
Right graph' -> do
tvarGraph <- newTVarIO graph'
sessions <- STMMap.newIO
clientNodes <- STMSet.newIO
(localNode, transport) <- createLocalNode
lockMVar <- newMVar digest
let conn = InProcessConnection (InProcessConnectionConf {
ipPersistenceStrategy = strat,
ipClientNodes = clientNodes,
ipSessions = sessions,
ipTransactionGraph = tvarGraph,
ipScriptSession = mScriptSession,
ipLocalNode = localNode,
ipTransport = transport,
ipLocks = Just (lockFileH, lockMVar)
})
notificationPid <- startNotificationListener localNode notificationCallback
addClientNode conn notificationPid
pure (Right conn)
createSessionAtCommit :: TransactionId -> Connection -> IO (Either RelationalError SessionId)
createSessionAtCommit commitId conn@(InProcessConnection _) = do
newSessionId <- nextRandom
atomically $ do
createSessionAtCommit_ commitId newSessionId conn
createSessionAtCommit uuid conn@(RemoteProcessConnection _) = remoteCall conn (CreateSessionAtCommit uuid)
createSessionAtCommit_ :: TransactionId -> SessionId -> Connection -> STM (Either RelationalError SessionId)
createSessionAtCommit_ commitId newSessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
graphTvar = ipTransactionGraph conf
graph <- readTVar graphTvar
case transactionForId commitId graph of
Left err -> pure (Left err)
Right transaction -> do
let freshDiscon = DisconnectedTransaction commitId (Trans.schemas transaction) False
keyDuplication <- STMMap.lookup newSessionId sessions
case keyDuplication of
Just _ -> pure $ Left (SessionIdInUseError newSessionId)
Nothing -> do
STMMap.insert (Session freshDiscon defaultSchemaName) newSessionId sessions
pure $ Right newSessionId
createSessionAtCommit_ _ _ (RemoteProcessConnection _) = error "createSessionAtCommit_ called on remote connection"
createSessionAtHead :: HeadName -> Connection -> IO (Either RelationalError SessionId)
createSessionAtHead headn conn@(InProcessConnection conf) = do
let graphTvar = ipTransactionGraph conf
newSessionId <- nextRandom
atomically $ do
graph <- readTVar graphTvar
case transactionForHead headn graph of
Nothing -> pure $ Left (NoSuchHeadNameError headn)
Just trans -> createSessionAtCommit_ (transactionId trans) newSessionId conn
createSessionAtHead headn conn@(RemoteProcessConnection _) = remoteCall conn (CreateSessionAtHead headn)
addClientNode :: Connection -> ProcessId -> IO ()
addClientNode (RemoteProcessConnection _) _ = error "addClientNode called on remote connection"
addClientNode (InProcessConnection conf) newProcessId = atomically (STMSet.insert newProcessId (ipClientNodes conf))
closeSession :: SessionId -> Connection -> IO ()
closeSession sessionId (InProcessConnection conf) = do
atomically $ STMMap.delete sessionId (ipSessions conf)
closeSession sessionId conn@(RemoteProcessConnection _) = remoteCall conn (CloseSession sessionId)
close :: Connection -> IO ()
close (InProcessConnection conf) = do
atomically $ do
let sessions = ipSessions conf
STMMap.deleteAll sessions
pure ()
closeLocalNode (ipLocalNode conf)
closeTransport (ipTransport conf)
close conn@(RemoteProcessConnection conf) = do
_ <- (remoteCall conn Logout) :: IO Bool
closeLocalNode (rLocalNode conf)
closeTransport (rTransport conf)
closeRemote_ :: Connection -> IO ()
closeRemote_ (InProcessConnection _) = error "invalid call of closeRemote_ on InProcessConnection"
closeRemote_ (RemoteProcessConnection conf) = runProcess (rLocalNode conf) (reconnect (rProcessId conf))
excMaybe :: IO (Maybe RelationalError) -> IO (Maybe RelationalError)
excMaybe m = handle handler m
where
handler exc | Just (_ :: AsyncException) <- fromException exc = throwIO exc
| otherwise = pure (Just (UnhandledExceptionError (show exc)))
excEither :: IO (Either RelationalError a) -> IO (Either RelationalError a)
excEither m = handle handler m
where
handler exc | Just (_ :: AsyncException) <- fromException exc = throwIO exc
| otherwise = pure (Left (UnhandledExceptionError (show exc)))
runProcessResult :: LocalNode -> Process a -> IO a
runProcessResult localNode proc = do
ret <- newEmptyMVar
runProcess localNode $ do
val <- proc
liftIO $ putMVar ret val
takeMVar ret
safeLogin :: Login -> ProcessId -> Process (Bool)
safeLogin login procId = do
ret <- call procId login
case ret of
Left (_ :: ServerError) -> pure False
Right val -> pure val
remoteCall :: (Serializable a, Serializable b) => Connection -> a -> IO b
remoteCall (InProcessConnection _ ) _ = error "remoteCall called on local connection"
remoteCall (RemoteProcessConnection conf) arg = runProcessResult localNode $ do
ret <- safeCall serverProcessId arg
case ret of
Left err -> error ("server died: " ++ show err)
Right ret' -> case ret' of
Left RequestTimeoutError -> liftIO (throwIO RequestTimeoutException)
Left (ProcessDiedError _) -> liftIO (throwIO RemoteProcessDiedException)
Right val -> pure val
where
localNode = rLocalNode conf
serverProcessId = rProcessId conf
sessionForSessionId :: SessionId -> Sessions -> STM (Either RelationalError Session)
sessionForSessionId sessionId sessions = do
maybeSession <- STMMap.lookup sessionId sessions
pure $ maybe (Left $ NoSuchSessionError sessionId) Right maybeSession
schemaForSessionId :: Session -> STM (Either RelationalError Schema)
schemaForSessionId session = do
let sname = schemaName session
if sname == defaultSchemaName then
pure (Right (Schema []))
else
case M.lookup sname (subschemas session) of
Nothing -> pure (Left (SubschemaNameNotInUseError sname))
Just schema -> pure (Right schema)
sessionAndSchema :: SessionId -> Sessions -> STM (Either RelationalError (Session, Schema))
sessionAndSchema sessionId sessions = do
eSession <- sessionForSessionId sessionId sessions
case eSession of
Left err -> pure (Left err)
Right session -> do
eSchema <- schemaForSessionId session
case eSchema of
Left err -> pure (Left err)
Right schema -> pure (Right (session, schema))
currentSchemaName :: SessionId -> Connection -> IO (Maybe SchemaName)
currentSchemaName sessionId (InProcessConnection conf) = atomically $ do
let sessions = ipSessions conf
eSession <- sessionForSessionId sessionId sessions
case eSession of
Left _ -> pure Nothing
Right session -> pure (Just (Sess.schemaName session))
currentSchemaName sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveCurrentSchemaName sessionId)
setCurrentSchemaName :: SessionId -> Connection -> SchemaName -> IO (Maybe RelationalError)
setCurrentSchemaName sessionId (InProcessConnection conf) sname = atomically $ do
let sessions = ipSessions conf
eSession <- sessionForSessionId sessionId sessions
case eSession of
Left _ -> pure Nothing
Right session -> case Sess.setSchemaName sname session of
Left err -> pure (Just err)
Right newSession -> STMMap.insert newSession sessionId sessions >> pure Nothing
setCurrentSchemaName sessionId conn@(RemoteProcessConnection _) sname = remoteCall conn (ExecuteSetCurrentSchema sessionId sname)
executeRelationalExpr :: SessionId -> Connection -> RelationalExpr -> IO (Either RelationalError Relation)
executeRelationalExpr sessionId (InProcessConnection conf) expr = excEither $ atomically $ do
let sessions = ipSessions conf
eSession <- sessionAndSchema sessionId sessions
case eSession of
Left err -> pure $ Left err
Right (session, schema) -> do
let expr' = if schemaName session /= defaultSchemaName then
Schema.processRelationalExprInSchema schema expr
else
Right expr
case expr' of
Left err -> pure (Left err)
Right expr'' -> case runReader (RE.evalRelationalExpr expr'') (RE.mkRelationalExprState (Sess.concreteDatabaseContext session)) of
Left err -> pure (Left err)
Right rel -> pure (force (Right rel))
executeRelationalExpr sessionId conn@(RemoteProcessConnection _) relExpr = remoteCall conn (ExecuteRelationalExpr sessionId relExpr)
executeDatabaseContextExpr :: SessionId -> Connection -> DatabaseContextExpr -> IO (Maybe RelationalError)
executeDatabaseContextExpr sessionId (InProcessConnection conf) expr = excMaybe $ atomically $ do
let sessions = ipSessions conf
eSession <- sessionAndSchema sessionId sessions
case eSession of
Left err -> pure $ Just err
Right (session, schema) -> do
let expr' = if schemaName session == defaultSchemaName then
Right expr
else
Schema.processDatabaseContextExprInSchema schema expr
case expr' of
Left err -> pure (Just err)
Right expr'' -> case runState (RE.evalDatabaseContextExpr expr'') (RE.freshDatabaseState (Sess.concreteDatabaseContext session)) of
(Just err,_) -> return $ Just err
(Nothing, (_,_,False)) -> pure Nothing
(Nothing, (!context',_,True)) -> do
let newDiscon = DisconnectedTransaction (Sess.parentId session) newSchemas True
newSubschemas = Schema.processDatabaseContextExprSchemasUpdate (Sess.subschemas session) expr
newSchemas = Schemas context' newSubschemas
newSession = Session newDiscon (Sess.schemaName session)
STMMap.insert newSession sessionId sessions
pure Nothing
executeDatabaseContextExpr sessionId conn@(RemoteProcessConnection _) dbExpr = remoteCall conn (ExecuteDatabaseContextExpr sessionId dbExpr)
executeDatabaseContextIOExpr :: SessionId -> Connection -> DatabaseContextIOExpr -> IO (Maybe RelationalError)
executeDatabaseContextIOExpr sessionId (InProcessConnection conf) expr = excMaybe $ do
let sessions = ipSessions conf
scriptSession = ipScriptSession conf
eSession <- atomically $ sessionForSessionId sessionId sessions
case eSession of
Left err -> pure $ Just err
Right session -> do
res <- RE.evalDatabaseContextIOExpr scriptSession (Sess.concreteDatabaseContext session) expr
case res of
Left err -> pure (Just err)
Right context' -> do
let newDiscon = DisconnectedTransaction (Sess.parentId session) newSchemas True
newSchemas = Schemas context' (Sess.subschemas session)
newSession = Session newDiscon (Sess.schemaName session)
atomically $ STMMap.insert newSession sessionId sessions
pure Nothing
executeDatabaseContextIOExpr sessionId conn@(RemoteProcessConnection _) dbExpr = remoteCall conn (ExecuteDatabaseContextIOExpr sessionId dbExpr)
executeGraphExprSTM_ :: Bool -> TransactionId -> SessionId -> Session -> Sessions -> TransactionGraphOperator -> TransactionGraph -> TVar TransactionGraph -> STM (Either RelationalError TransactionGraph)
executeGraphExprSTM_ updateGraphOnError freshId sessionId session sessions graphExpr graph graphTVar= do
case evalGraphOp freshId (Sess.disconnectedTransaction session) graph graphExpr of
Left err -> do
when updateGraphOnError (writeTVar graphTVar graph)
pure $ Left err
Right (discon', graph') -> do
writeTVar graphTVar graph'
let newSession = Session discon' (Sess.schemaName session)
STMMap.insert newSession sessionId sessions
pure $ Right graph'
executeCommitExprSTM_ :: DatabaseContext -> DatabaseContext -> ClientNodes -> STM (EvaluatedNotifications, ClientNodes)
executeCommitExprSTM_ oldContext newContext nodes = do
let nots = notifications oldContext
fireNots = notificationChanges nots oldContext newContext
evaldNots = M.map mkEvaldNot fireNots
mkEvaldNot notif = EvaluatedNotification { notification = notif,
reportRelation = runReader (RE.evalRelationalExpr (reportExpr notif)) (RE.mkRelationalExprState oldContext) }
pure (evaldNots, nodes)
executeGraphExpr :: SessionId -> Connection -> TransactionGraphOperator -> IO (Maybe RelationalError)
executeGraphExpr sessionId (InProcessConnection conf) graphExpr = excMaybe $ do
let strat = ipPersistenceStrategy conf
clientNodes = ipClientNodes conf
sessions = ipSessions conf
graphTvar = ipTransactionGraph conf
mLockFileH = ipLocks conf
lockHandler body = case graphExpr of
Commit _ -> case mLockFileH of
Nothing -> body False
Just (lockFileH, lockMVar) -> let acquireLocks = do
lastWrittenDigest <- takeMVar lockMVar
lockFile lockFileH WriteLock
latestDigest <- readGraphTransactionIdDigest strat
pure (latestDigest /= lastWrittenDigest)
releaseLocks _ = do
gDigest <- readGraphTransactionIdDigest strat
unlockFile lockFileH
putMVar lockMVar gDigest
in bracket acquireLocks releaseLocks body
_ -> body False
freshId <- nextRandom
lockHandler $ \dbWrittenByOtherProcess -> do
manip <- atomically $ do
eSession <- sessionForSessionId sessionId sessions
oldGraph <- readTVar graphTvar
case eSession of
Left err -> pure (Left err)
Right session -> do
let mScriptSession = ipScriptSession conf
dbdir = case strat of
MinimalPersistence x -> x
CrashSafePersistence x -> x
_ -> error "accessing dbdir on non-persisted connection"
eRefreshedGraph <- if dbWrittenByOtherProcess then
unsafeIOToSTM (transactionGraphLoad dbdir oldGraph mScriptSession)
else
pure (Right oldGraph)
case eRefreshedGraph of
Left err -> pure (Left (DatabaseLoadError err))
Right refreshedGraph -> do
if not (isDirty session) && graphExpr == Commit IgnoreEmptyCommitOption then
pure (Right (M.empty, [], oldGraph))
else do
eGraph <- executeGraphExprSTM_ dbWrittenByOtherProcess freshId sessionId session sessions graphExpr refreshedGraph graphTvar
case eGraph of
Left err -> pure (Left err)
Right newGraph -> do
if not (isDirty session) && graphExpr == Commit ForbidEmptyCommitOption then
pure (Left EmptyCommitError)
else if not (isDirty session) && graphExpr == Commit IgnoreEmptyCommitOption then
pure (Right (M.empty, [], newGraph))
else if isCommit graphExpr then do
case transactionForId (Sess.parentId session) oldGraph of
Left err -> pure $ Left err
Right previousTrans -> do
(evaldNots, nodes) <- executeCommitExprSTM_ (Trans.concreteDatabaseContext previousTrans) (Sess.concreteDatabaseContext session) clientNodes
nodesToNotify <- toList (STMSet.stream nodes)
pure $ Right (evaldNots, nodesToNotify, newGraph)
else
pure $ Right (M.empty, [], newGraph)
case manip of
Left err -> return $ Just err
Right (notsToFire, nodesToNotify, newGraph) -> do
processTransactionGraphPersistence strat newGraph
sendNotifications nodesToNotify (ipLocalNode conf) notsToFire
pure Nothing
executeGraphExpr sessionId conn@(RemoteProcessConnection _) graphExpr = remoteCall conn (ExecuteGraphExpr sessionId graphExpr)
executeTransGraphRelationalExpr :: SessionId -> Connection -> TransGraphRelationalExpr -> IO (Either RelationalError Relation)
executeTransGraphRelationalExpr _ (InProcessConnection conf) tgraphExpr = excEither . atomically $ do
let graphTvar = ipTransactionGraph conf
graph <- readTVar graphTvar
case evalTransGraphRelationalExpr tgraphExpr graph of
Left err -> pure (Left err)
Right relExpr -> case runReader (RE.evalRelationalExpr relExpr) (RE.mkRelationalExprState DBC.empty) of
Left err -> pure (Left err)
Right rel -> pure (force (Right rel))
executeTransGraphRelationalExpr sessionId conn@(RemoteProcessConnection _) tgraphExpr = remoteCall conn (ExecuteTransGraphRelationalExpr sessionId tgraphExpr)
executeSchemaExpr :: SessionId -> Connection -> Schema.SchemaExpr -> IO (Maybe RelationalError)
executeSchemaExpr sessionId (InProcessConnection conf) schemaExpr = atomically $ do
let sessions = ipSessions conf
eSession <- sessionAndSchema sessionId sessions
case eSession of
Left err -> pure (Just err)
Right (session, _) -> do
let subschemas' = subschemas session
case Schema.evalSchemaExpr schemaExpr (Sess.concreteDatabaseContext session) subschemas' of
Left err -> pure (Just err)
Right (newSubschemas, newContext) -> do
let discon = Sess.disconnectedTransaction session
newSchemas = Schemas newContext newSubschemas
newSession = Session (DisconnectedTransaction (Discon.parentId discon) newSchemas True) (Sess.schemaName session)
STMMap.insert newSession sessionId sessions
pure Nothing
executeSchemaExpr sessionId conn@(RemoteProcessConnection _) schemaExpr = remoteCall conn (ExecuteSchemaExpr sessionId schemaExpr)
commit :: SessionId -> Connection -> CommitOption -> IO (Maybe RelationalError)
commit sessionId conn@(InProcessConnection _) cOpt = executeGraphExpr sessionId conn (Commit cOpt)
commit sessionId conn@(RemoteProcessConnection _) cOpt = remoteCall conn (ExecuteGraphExpr sessionId (Commit cOpt))
sendNotifications :: [ProcessId] -> LocalNode -> EvaluatedNotifications -> IO ()
sendNotifications pids localNode nots = mapM_ sendNots pids
where
sendNots remoteClientPid = do
when (not (M.null nots)) $ runProcess localNode $ send remoteClientPid (NotificationMessage nots)
rollback :: SessionId -> Connection -> IO (Maybe RelationalError)
rollback sessionId conn@(InProcessConnection _) = executeGraphExpr sessionId conn Rollback
rollback sessionId conn@(RemoteProcessConnection _) = remoteCall conn (ExecuteGraphExpr sessionId Rollback)
processTransactionGraphPersistence :: PersistenceStrategy -> TransactionGraph -> IO ()
processTransactionGraphPersistence NoPersistence _ = pure ()
processTransactionGraphPersistence (MinimalPersistence dbdir) graph = transactionGraphPersist NoDiskSync dbdir graph >> pure ()
processTransactionGraphPersistence (CrashSafePersistence dbdir) graph = transactionGraphPersist FsyncDiskSync dbdir graph >> pure ()
readGraphTransactionIdDigest :: PersistenceStrategy -> IO (LockFileHash)
readGraphTransactionIdDigest NoPersistence = error "attempt to read digest from transaction log without persistence enabled"
readGraphTransactionIdDigest (MinimalPersistence dbdir) = readGraphTransactionIdFileDigest dbdir
readGraphTransactionIdDigest (CrashSafePersistence dbdir) = readGraphTransactionIdFileDigest dbdir
typeForRelationalExpr :: SessionId -> Connection -> RelationalExpr -> IO (Either RelationalError Relation)
typeForRelationalExpr sessionId conn@(InProcessConnection _) relExpr = atomically $ typeForRelationalExprSTM sessionId conn relExpr
typeForRelationalExpr sessionId conn@(RemoteProcessConnection _) relExpr = remoteCall conn (ExecuteTypeForRelationalExpr sessionId relExpr)
typeForRelationalExprSTM :: SessionId -> Connection -> RelationalExpr -> STM (Either RelationalError Relation)
typeForRelationalExprSTM sessionId (InProcessConnection conf) relExpr = do
let sessions = ipSessions conf
eSession <- sessionAndSchema sessionId sessions
case eSession of
Left err -> pure $ Left err
Right (session, schema) -> do
let processed = if schemaName session == defaultSchemaName then
Right relExpr
else
Schema.processRelationalExprInSchema schema relExpr
case processed of
Left err -> pure (Left err)
Right relExpr' -> pure $ runReader (RE.typeForRelationalExpr relExpr') (RE.mkRelationalExprState (Sess.concreteDatabaseContext session))
typeForRelationalExprSTM _ _ _ = error "typeForRelationalExprSTM called on non-local connection"
inclusionDependencies :: SessionId -> Connection -> IO (Either RelationalError InclusionDependencies)
inclusionDependencies sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
atomically $ do
eSession <- sessionAndSchema sessionId sessions
case eSession of
Left err -> pure $ Left err
Right (session, schema) -> do
let context = Sess.concreteDatabaseContext session
if schemaName session == defaultSchemaName then
pure $ Right (B.inclusionDependencies context)
else
pure (Schema.inclusionDependenciesInSchema schema (B.inclusionDependencies context))
inclusionDependencies sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveInclusionDependencies sessionId)
planForDatabaseContextExpr :: SessionId -> Connection -> DatabaseContextExpr -> IO (Either RelationalError DatabaseContextExpr)
planForDatabaseContextExpr sessionId (InProcessConnection conf) dbExpr = do
let sessions = ipSessions conf
atomically $ do
eSession <- sessionAndSchema sessionId sessions
case eSession of
Left err -> pure $ Left err
Right (session, _) -> if schemaName session == defaultSchemaName then
pure $ evalState (applyStaticDatabaseOptimization dbExpr) (RE.freshDatabaseState (Sess.concreteDatabaseContext session))
else
pure (Right dbExpr)
planForDatabaseContextExpr sessionId conn@(RemoteProcessConnection _) dbExpr = remoteCall conn (RetrievePlanForDatabaseContextExpr sessionId dbExpr)
transactionGraphAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
transactionGraphAsRelation sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
tvar = ipTransactionGraph conf
atomically $ do
eSession <- sessionForSessionId sessionId sessions
case eSession of
Left err -> pure $ Left err
Right session -> do
graph <- readTVar tvar
pure $ graphAsRelation (Sess.disconnectedTransaction session) graph
transactionGraphAsRelation sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveTransactionGraph sessionId)
relationVariablesAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
relationVariablesAsRelation sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
atomically $ do
eSession <- sessionAndSchema sessionId sessions
case eSession of
Left err -> pure (Left err)
Right (session, schema) -> do
let context = Sess.concreteDatabaseContext session
if Sess.schemaName session == defaultSchemaName then
pure $ R.relationVariablesAsRelation (relationVariables context)
else
case Schema.relationVariablesInSchema schema context of
Left err -> pure (Left err)
Right relvars -> pure $ R.relationVariablesAsRelation relvars
relationVariablesAsRelation sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveRelationVariableSummary sessionId)
headTransactionId :: SessionId -> Connection -> IO (Maybe TransactionId)
headTransactionId sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
atomically $ do
eSession <- sessionForSessionId sessionId sessions
case eSession of
Left _ -> pure Nothing
Right session -> pure $ Just (Sess.parentId session)
headTransactionId sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveHeadTransactionId sessionId)
headNameSTM_ :: SessionId -> Sessions -> TVar TransactionGraph -> STM (Maybe HeadName)
headNameSTM_ sessionId sessions graphTvar = do
graph <- readTVar graphTvar
eSession <- sessionForSessionId sessionId sessions
case eSession of
Left _ -> pure $ Nothing
Right session -> pure $ case transactionForId (Sess.parentId session) graph of
Left _ -> Nothing
Right parentTrans -> headNameForTransaction parentTrans graph
headName :: SessionId -> Connection -> IO (Maybe HeadName)
headName sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
graphTvar = ipTransactionGraph conf
atomically (headNameSTM_ sessionId sessions graphTvar)
headName sessionId conn@(RemoteProcessConnection _) = remoteCall conn (ExecuteHeadName sessionId)
atomTypesAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
atomTypesAsRelation sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
atomically $ do
eSession <- sessionForSessionId sessionId sessions
case eSession of
Left err -> pure (Left err)
Right session -> do
case typesAsRelation (typeConstructorMapping (Sess.concreteDatabaseContext session)) of
Left err -> pure (Left err)
Right rel -> pure (Right rel)
atomTypesAsRelation sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveAtomTypesAsRelation sessionId)
callTestTimeout_ :: SessionId -> Connection -> IO Bool
callTestTimeout_ _ (InProcessConnection _) = error "bad testing call"
callTestTimeout_ sessionId conn@(RemoteProcessConnection _) = remoteCall conn (TestTimeout sessionId)
transactionGraph_ :: Connection -> IO TransactionGraph
transactionGraph_ (InProcessConnection conf) = atomically $ readTVar (ipTransactionGraph conf)
transactionGraph_ _ = error "remote connection used"
disconnectedTransaction_ :: SessionId -> Connection -> IO DisconnectedTransaction
disconnectedTransaction_ sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
mSession <- atomically $ do
STMMap.lookup sessionId sessions
case mSession of
Nothing -> error "No such session"
Just (Sess.Session discon _) -> pure discon
disconnectedTransaction_ _ _= error "remote connection used"