{-# LANGUAGE DeriveAnyClass, DeriveGeneric, ScopedTypeVariables, BangPatterns, MonoLocalBinds #-}
module ProjectM36.Client
(ConnectionInfo(..),
Connection(..),
Port,
Hostname,
DatabaseName,
ConnectionError(..),
connectProjectM36,
close,
closeRemote_,
executeRelationalExpr,
executeDatabaseContextExpr,
executeDatabaseContextIOExpr,
executeDataFrameExpr,
executeGraphExpr,
executeSchemaExpr,
executeTransGraphRelationalExpr,
commit,
rollback,
typeForRelationalExpr,
inclusionDependencies,
ProjectM36.Client.typeConstructorMapping,
ProjectM36.Client.databaseContextFunctionsAsRelation,
planForDatabaseContextExpr,
currentSchemaName,
SchemaName,
HeadName,
setCurrentSchemaName,
transactionGraphAsRelation,
relationVariablesAsRelation,
ProjectM36.Client.atomFunctionsAsRelation,
disconnectedTransactionIsDirty,
headName,
remoteDBLookupName,
defaultServerPort,
headTransactionId,
defaultDatabaseName,
defaultRemoteConnectionInfo,
defaultHeadName,
PersistenceStrategy(..),
RelationalExpr,
RelationalExprBase(..),
DatabaseContextExpr(..),
DatabaseContextIOExpr(..),
Attribute(..),
MergeStrategy(..),
attributesFromList,
createNodeId,
createSessionAtCommit,
createSessionAtHead,
closeSession,
addClientNode,
callTestTimeout_,
RelationCardinality(..),
TransactionGraphOperator(..),
ProjectM36.Client.autoMergeToHead,
transactionGraph_,
disconnectedTransaction_,
TransGraphRelationalExpr,
TransactionIdLookup(..),
TransactionIdHeadBacktrack(..),
NodeId(..),
Atom(..),
Session,
SessionId,
NotificationCallback,
emptyNotificationCallback,
EvaluatedNotification(..),
atomTypesAsRelation,
AttributeExpr,
inclusionDependencyForKey,
databaseContextExprForUniqueKey,
databaseContextExprForForeignKey,
createScriptedAtomFunction,
AttributeExprBase(..),
TypeConstructorBase(..),
TypeConstructorDef(..),
DataConstructorDef(..),
AttributeNamesBase(..),
RelVarName,
IncDepName,
InclusionDependency(..),
AttributeName,
DF.DataFrame,
DF.DataFrameExpr,
DF.AttributeOrderExpr,
DF.Order(..),
RelationalError(..),
RequestTimeoutException(..),
RemoteProcessDiedException(..),
AtomType(..),
Atomable(..),
TupleExprBase(..),
AtomExprBase(..),
RestrictionPredicateExprBase(..),
withTransaction
) where
import ProjectM36.Base hiding (inclusionDependencies)
import qualified ProjectM36.Base as B
import ProjectM36.Error
import ProjectM36.Atomable
import ProjectM36.AtomFunction as AF
import ProjectM36.StaticOptimizer
import ProjectM36.Key
import qualified ProjectM36.DataFrame as DF
import ProjectM36.DatabaseContextFunction as DCF
import qualified ProjectM36.IsomorphicSchema as Schema
import Control.Monad.State
import Control.Monad.Trans.Reader
import qualified ProjectM36.RelationalExpression as RE
import ProjectM36.DatabaseContext (basicDatabaseContext)
import qualified ProjectM36.TransactionGraph as Graph
import ProjectM36.TransactionGraph
import qualified ProjectM36.Transaction as Trans
import ProjectM36.TransactionGraph.Persist
import ProjectM36.Attribute
import ProjectM36.TransGraphRelationalExpression (TransGraphRelationalExpr, evalTransGraphRelationalExpr)
import ProjectM36.Persist (DiskSync(..))
import ProjectM36.FileLock
import ProjectM36.Notifications
import ProjectM36.Server.RemoteCallTypes
import qualified ProjectM36.DisconnectedTransaction as Discon
import ProjectM36.Relation (typesAsRelation)
import ProjectM36.ScriptSession (initScriptSession, ScriptSession)
import qualified ProjectM36.Relation as R
import qualified ProjectM36.DatabaseContext as DBC
import Control.Exception.Base
import GHC.Conc.Sync
import Network.Transport (Transport(closeTransport))
#if MIN_VERSION_network_transport_tcp(0,6,0)
import Network.Transport.TCP (createTransport, defaultTCPParameters)
import Network.Transport.TCP.Internal (encodeEndPointAddress)
#else
import Network.Transport.TCP (encodeEndPointAddress, createTransport, defaultTCPParameters)
#endif
import Control.Distributed.Process.Node (newLocalNode, initRemoteTable, runProcess, LocalNode, forkProcess, closeLocalNode)
import Control.Distributed.Process.Extras.Internal.Types (whereisRemote)
import Control.Distributed.Process.ManagedProcess.Client (call, safeCall)
import Data.Either (isRight)
import Data.UUID.V4 (nextRandom)
import Data.Word
import Control.Distributed.Process (ProcessId, Process, receiveWait, send, match, NodeId(..), reconnect)
import Control.Exception (IOException, handle, AsyncException, throwIO, fromException, Exception)
import Control.Concurrent.MVar
import qualified Data.Map as M
import Control.Distributed.Process.Serializable (Serializable)
#if MIN_VERSION_stm_containers(1,0,0)
import qualified StmContainers.Map as StmMap
import qualified StmContainers.Set as StmSet
#else
import qualified STMContainers.Map as StmMap
import qualified STMContainers.Set as StmSet
#endif
import qualified ProjectM36.Session as Sess
import ProjectM36.Session
import ProjectM36.Sessions
import Data.Binary (Binary)
import GHC.Generics (Generic)
import Control.DeepSeq (force)
import System.IO
import Data.Time.Clock
type Hostname = String
type Port = Word16
type DatabaseName = String
type NotificationCallback = NotificationName -> EvaluatedNotification -> IO ()
emptyNotificationCallback :: NotificationCallback
emptyNotificationCallback _ _ = pure ()
type GhcPkgPath = String
data RemoteProcessDiedException = RemoteProcessDiedException
deriving (Show, Eq)
instance Exception RemoteProcessDiedException
data RequestTimeoutException = RequestTimeoutException
deriving (Show, Eq)
instance Exception RequestTimeoutException
data ConnectionInfo = InProcessConnectionInfo PersistenceStrategy NotificationCallback [GhcPkgPath]|
RemoteProcessConnectionInfo DatabaseName NodeId NotificationCallback
type EvaluatedNotifications = M.Map NotificationName EvaluatedNotification
newtype NotificationMessage = NotificationMessage EvaluatedNotifications
deriving (Binary, Eq, Show, Generic)
data EvaluatedNotification = EvaluatedNotification {
notification :: Notification,
reportOldRelation :: Either RelationalError Relation,
reportNewRelation :: Either RelationalError Relation
}
deriving(Binary, Eq, Show, Generic)
createNodeId :: Hostname -> Port -> NodeId
createNodeId host port = NodeId $ encodeEndPointAddress host (show port) 1
defaultServerPort :: Port
defaultServerPort = 6543
defaultDatabaseName :: DatabaseName
defaultDatabaseName = "base"
defaultHeadName :: HeadName
defaultHeadName = "master"
defaultRemoteConnectionInfo :: ConnectionInfo
defaultRemoteConnectionInfo = RemoteProcessConnectionInfo defaultDatabaseName (createNodeId "127.0.0.1" defaultServerPort) emptyNotificationCallback
type ClientNodes = StmSet.Set ProcessId
data InProcessConnectionConf = InProcessConnectionConf {
ipPersistenceStrategy :: PersistenceStrategy,
ipClientNodes :: ClientNodes,
ipSessions :: Sessions,
ipTransactionGraph :: TVar TransactionGraph,
ipScriptSession :: Maybe ScriptSession,
ipLocalNode :: LocalNode,
ipTransport :: Transport,
ipLocks :: Maybe (LockFile, MVar LockFileHash)
}
data RemoteProcessConnectionConf = RemoteProcessConnectionConf {
rLocalNode :: LocalNode,
rProcessId :: ProcessId,
rTransport :: Transport
}
data Connection = InProcessConnection InProcessConnectionConf |
RemoteProcessConnection RemoteProcessConnectionConf
data ConnectionError = SetupDatabaseDirectoryError PersistenceError |
IOExceptionError IOException |
NoSuchDatabaseByNameError DatabaseName |
LoginError
deriving (Show, Eq, Generic)
remoteDBLookupName :: DatabaseName -> String
remoteDBLookupName = (++) "db-"
createLocalNode :: IO (LocalNode, Transport)
createLocalNode = do
#if MIN_VERSION_network_transport_tcp(0,6,0)
eLocalTransport <- createTransport "127.0.0.1" "0" (\sn -> ("127.0.0.1", sn)) defaultTCPParameters
#else
eLocalTransport <- createTransport "127.0.0.1" "0" defaultTCPParameters
#endif
case eLocalTransport of
Left err -> error ("failed to create transport: " ++ show err)
Right localTransport -> do
localNode <- newLocalNode localTransport initRemoteTable
pure (localNode, localTransport)
notificationListener :: NotificationCallback -> Process ()
notificationListener callback = do
_ <- forever $
receiveWait [
match (\(NotificationMessage eNots) ->
liftIO $ mapM_ (uncurry callback) (M.toList eNots)
)
]
pure ()
startNotificationListener :: LocalNode -> NotificationCallback -> IO ProcessId
startNotificationListener localNode callback = forkProcess localNode (notificationListener callback)
createScriptSession :: [String] -> IO (Maybe ScriptSession)
createScriptSession ghcPkgPaths = do
eScriptSession <- initScriptSession ghcPkgPaths
case eScriptSession of
Left err -> hPutStrLn stderr ("Failed to load scripting engine- scripting disabled: " ++ show err) >> pure Nothing
Right s -> pure (Just s)
connectProjectM36 :: ConnectionInfo -> IO (Either ConnectionError Connection)
connectProjectM36 (InProcessConnectionInfo strat notificationCallback ghcPkgPaths) = do
freshId <- nextRandom
stamp <- getCurrentTime
let bootstrapContext = basicDatabaseContext
freshGraph = bootstrapTransactionGraph stamp freshId bootstrapContext
case strat of
NoPersistence -> do
graphTvar <- newTVarIO freshGraph
clientNodes <- StmSet.newIO
sessions <- StmMap.newIO
(localNode, transport) <- createLocalNode
notificationPid <- startNotificationListener localNode notificationCallback
mScriptSession <- createScriptSession ghcPkgPaths
let conn = InProcessConnection InProcessConnectionConf {
ipPersistenceStrategy = strat,
ipClientNodes = clientNodes,
ipSessions = sessions,
ipTransactionGraph = graphTvar,
ipScriptSession = mScriptSession,
ipLocalNode = localNode,
ipTransport = transport,
ipLocks = Nothing}
addClientNode conn notificationPid
pure (Right conn)
MinimalPersistence dbdir -> connectPersistentProjectM36 strat NoDiskSync dbdir freshGraph notificationCallback ghcPkgPaths
CrashSafePersistence dbdir -> connectPersistentProjectM36 strat FsyncDiskSync dbdir freshGraph notificationCallback ghcPkgPaths
connectProjectM36 (RemoteProcessConnectionInfo databaseName serverNodeId notificationCallback) = do
connStatus <- newEmptyMVar
(localNode, transport) <- createLocalNode
let dbName = remoteDBLookupName databaseName
notificationListenerPid <- startNotificationListener localNode notificationCallback
runProcess localNode $ do
mServerProcessId <- whereisRemote serverNodeId dbName
case mServerProcessId of
Nothing -> liftIO $ putMVar connStatus $ Left (NoSuchDatabaseByNameError databaseName)
Just serverProcessId -> do
loginConfirmation <- safeLogin (Login notificationListenerPid) serverProcessId
if not loginConfirmation then
liftIO $ putMVar connStatus (Left LoginError)
else
liftIO $ putMVar connStatus (Right $ RemoteProcessConnection RemoteProcessConnectionConf {rLocalNode = localNode, rProcessId = serverProcessId, rTransport = transport})
takeMVar connStatus
connectPersistentProjectM36 :: PersistenceStrategy ->
DiskSync ->
FilePath ->
TransactionGraph ->
NotificationCallback ->
[GhcPkgPath] ->
IO (Either ConnectionError Connection)
connectPersistentProjectM36 strat sync dbdir freshGraph notificationCallback ghcPkgPaths = do
err <- setupDatabaseDir sync dbdir freshGraph
case err of
Left err' -> return $ Left (SetupDatabaseDirectoryError err')
Right (lockFileH, digest) -> do
mScriptSession <- createScriptSession ghcPkgPaths
graph <- transactionGraphLoad dbdir emptyTransactionGraph mScriptSession
case graph of
Left err' -> return $ Left (SetupDatabaseDirectoryError err')
Right graph' -> do
tvarGraph <- newTVarIO graph'
sessions <- StmMap.newIO
clientNodes <- StmSet.newIO
(localNode, transport) <- createLocalNode
lockMVar <- newMVar digest
let conn = InProcessConnection InProcessConnectionConf {
ipPersistenceStrategy = strat,
ipClientNodes = clientNodes,
ipSessions = sessions,
ipTransactionGraph = tvarGraph,
ipScriptSession = mScriptSession,
ipLocalNode = localNode,
ipTransport = transport,
ipLocks = Just (lockFileH, lockMVar)
}
notificationPid <- startNotificationListener localNode notificationCallback
addClientNode conn notificationPid
pure (Right conn)
createSessionAtCommit :: Connection -> TransactionId -> IO (Either RelationalError SessionId)
createSessionAtCommit conn@(InProcessConnection _) commitId = do
newSessionId <- nextRandom
atomically $ createSessionAtCommit_ commitId newSessionId conn
createSessionAtCommit conn@(RemoteProcessConnection _) uuid = remoteCall conn (CreateSessionAtCommit uuid)
createSessionAtCommit_ :: TransactionId -> SessionId -> Connection -> STM (Either RelationalError SessionId)
createSessionAtCommit_ commitId newSessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
graphTvar = ipTransactionGraph conf
graph <- readTVar graphTvar
case transactionForId commitId graph of
Left err -> pure (Left err)
Right transaction -> do
let freshDiscon = DisconnectedTransaction commitId (Trans.schemas transaction) False
keyDuplication <- StmMap.lookup newSessionId sessions
case keyDuplication of
Just _ -> pure $ Left (SessionIdInUseError newSessionId)
Nothing -> do
StmMap.insert (Session freshDiscon defaultSchemaName) newSessionId sessions
pure $ Right newSessionId
createSessionAtCommit_ _ _ (RemoteProcessConnection _) = error "createSessionAtCommit_ called on remote connection"
createSessionAtHead :: Connection -> HeadName -> IO (Either RelationalError SessionId)
createSessionAtHead conn@(InProcessConnection conf) headn = do
let graphTvar = ipTransactionGraph conf
newSessionId <- nextRandom
atomically $ do
graph <- readTVar graphTvar
case transactionForHead headn graph of
Nothing -> pure $ Left (NoSuchHeadNameError headn)
Just trans -> createSessionAtCommit_ (transactionId trans) newSessionId conn
createSessionAtHead conn@(RemoteProcessConnection _) headn = remoteCall conn (CreateSessionAtHead headn)
addClientNode :: Connection -> ProcessId -> IO ()
addClientNode (RemoteProcessConnection _) _ = error "addClientNode called on remote connection"
addClientNode (InProcessConnection conf) newProcessId = atomically (StmSet.insert newProcessId (ipClientNodes conf))
closeSession :: SessionId -> Connection -> IO ()
closeSession sessionId (InProcessConnection conf) =
atomically $ StmMap.delete sessionId (ipSessions conf)
closeSession sessionId conn@(RemoteProcessConnection _) = remoteCall conn (CloseSession sessionId)
close :: Connection -> IO ()
close (InProcessConnection conf) = do
atomically $ do
let sessions = ipSessions conf
#if MIN_VERSION_stm_containers(1,0,0)
StmMap.reset sessions
#else
StmMap.deleteAll sessions
#endif
pure ()
closeLocalNode (ipLocalNode conf)
closeTransport (ipTransport conf)
let mLocks = ipLocks conf
case mLocks of
Nothing -> pure ()
Just (lockFileH, _) -> closeLockFile lockFileH
close conn@(RemoteProcessConnection conf) = do
_ <- remoteCall conn Logout :: IO Bool
closeLocalNode (rLocalNode conf)
closeTransport (rTransport conf)
closeRemote_ :: Connection -> IO ()
closeRemote_ (InProcessConnection _) = error "invalid call of closeRemote_ on InProcessConnection"
closeRemote_ (RemoteProcessConnection conf) = runProcess (rLocalNode conf) (reconnect (rProcessId conf))
excEither :: IO (Either RelationalError a) -> IO (Either RelationalError a)
excEither = handle handler
where
handler exc | Just (_ :: AsyncException) <- fromException exc = throwIO exc
| otherwise = pure (Left (UnhandledExceptionError (show exc)))
runProcessResult :: LocalNode -> Process a -> IO a
runProcessResult localNode proc = do
ret <- newEmptyMVar
runProcess localNode $ do
val <- proc
liftIO $ putMVar ret val
takeMVar ret
safeLogin :: Login -> ProcessId -> Process Bool
safeLogin login procId = do
ret <- call procId login
case ret of
Left (_ :: ServerError) -> pure False
Right val -> pure val
remoteCall :: (Serializable a, Serializable b) => Connection -> a -> IO b
remoteCall (InProcessConnection _ ) _ = error "remoteCall called on local connection"
remoteCall (RemoteProcessConnection conf) arg = runProcessResult localNode $ do
ret <- safeCall serverProcessId arg
case ret of
Left err -> error ("server died: " ++ show err)
Right ret' -> case ret' of
Left RequestTimeoutError -> liftIO (throwIO RequestTimeoutException)
Left (ProcessDiedError _) -> liftIO (throwIO RemoteProcessDiedException)
Right val -> pure val
where
localNode = rLocalNode conf
serverProcessId = rProcessId conf
sessionForSessionId :: SessionId -> Sessions -> STM (Either RelationalError Session)
sessionForSessionId sessionId sessions =
maybe (Left $ NoSuchSessionError sessionId) Right <$> StmMap.lookup sessionId sessions
schemaForSessionId :: Session -> STM (Either RelationalError Schema)
schemaForSessionId session = do
let sname = schemaName session
if sname == defaultSchemaName then
pure (Right (Schema []))
else
case M.lookup sname (subschemas session) of
Nothing -> pure (Left (SubschemaNameNotInUseError sname))
Just schema -> pure (Right schema)
sessionAndSchema :: SessionId -> Sessions -> STM (Either RelationalError (Session, Schema))
sessionAndSchema sessionId sessions = do
eSession <- sessionForSessionId sessionId sessions
case eSession of
Left err -> pure (Left err)
Right session -> do
eSchema <- schemaForSessionId session
case eSchema of
Left err -> pure (Left err)
Right schema -> pure (Right (session, schema))
currentSchemaName :: SessionId -> Connection -> IO (Either RelationalError SchemaName)
currentSchemaName sessionId (InProcessConnection conf) = atomically $ do
let sessions = ipSessions conf
eSession <- sessionForSessionId sessionId sessions
case eSession of
Left err -> pure (Left err)
Right session -> pure (Right (Sess.schemaName session))
currentSchemaName sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveCurrentSchemaName sessionId)
setCurrentSchemaName :: SessionId -> Connection -> SchemaName -> IO (Either RelationalError ())
setCurrentSchemaName sessionId (InProcessConnection conf) sname = atomically $ do
let sessions = ipSessions conf
eSession <- sessionForSessionId sessionId sessions
case eSession of
Left err -> pure (Left err)
Right session -> case Sess.setSchemaName sname session of
Left err -> pure (Left err)
Right newSession -> StmMap.insert newSession sessionId sessions >> pure (Right ())
setCurrentSchemaName sessionId conn@(RemoteProcessConnection _) sname = remoteCall conn (ExecuteSetCurrentSchema sessionId sname)
executeRelationalExpr :: SessionId -> Connection -> RelationalExpr -> IO (Either RelationalError Relation)
executeRelationalExpr sessionId (InProcessConnection conf) expr = excEither $ atomically $ do
let sessions = ipSessions conf
eSession <- sessionAndSchema sessionId sessions
case eSession of
Left err -> pure $ Left err
Right (session, schema) -> do
let expr' = if schemaName session /= defaultSchemaName then
Schema.processRelationalExprInSchema schema expr
else
Right expr
case expr' of
Left err -> pure (Left err)
Right expr'' -> case optimizeRelationalExpr (Sess.concreteDatabaseContext session) expr'' of
Left err -> pure (Left err)
Right optExpr -> do
let evald = runReader (RE.evalRelationalExpr optExpr) (RE.mkRelationalExprState (Sess.concreteDatabaseContext session))
case evald of
Left err -> pure (Left err)
Right rel -> pure (force (Right rel))
executeRelationalExpr sessionId conn@(RemoteProcessConnection _) relExpr = remoteCall conn (ExecuteRelationalExpr sessionId relExpr)
executeDatabaseContextExpr :: SessionId -> Connection -> DatabaseContextExpr -> IO (Either RelationalError ())
executeDatabaseContextExpr sessionId (InProcessConnection conf) expr = excEither $ atomically $ do
let sessions = ipSessions conf
eSession <- sessionAndSchema sessionId sessions
case eSession of
Left err -> pure (Left err)
Right (session, schema) -> do
let expr' = if schemaName session == defaultSchemaName then
Right expr
else
Schema.processDatabaseContextExprInSchema schema expr
case expr' of
Left err -> pure (Left err)
Right expr'' -> case runState (do
eOptExpr <- applyStaticDatabaseOptimization expr''
case eOptExpr of
Left err -> pure (Left err)
Right optExpr ->
RE.evalDatabaseContextExpr optExpr) (RE.freshDatabaseState (Sess.concreteDatabaseContext session)) of
(Left err,_) -> pure (Left err)
(Right (), (_,_,False)) -> pure (Right ())
(Right (), (!context',_,True)) -> do
let newDiscon = DisconnectedTransaction (Sess.parentId session) newSchemas True
newSubschemas = Schema.processDatabaseContextExprSchemasUpdate (Sess.subschemas session) expr
newSchemas = Schemas context' newSubschemas
newSession = Session newDiscon (Sess.schemaName session)
StmMap.insert newSession sessionId sessions
pure (Right ())
executeDatabaseContextExpr sessionId conn@(RemoteProcessConnection _) dbExpr = remoteCall conn (ExecuteDatabaseContextExpr sessionId dbExpr)
autoMergeToHead :: SessionId -> Connection -> MergeStrategy -> HeadName -> IO (Either RelationalError ())
autoMergeToHead sessionId (InProcessConnection conf) strat headName' = do
let sessions = ipSessions conf
id1 <- nextRandom
id2 <- nextRandom
id3 <- nextRandom
stamp <- getCurrentTime
commitLock_ sessionId conf $ \graph -> do
eSession <- sessionForSessionId sessionId sessions
case eSession of
Left err -> pure (Left err)
Right session ->
case Graph.transactionForHead headName' graph of
Nothing -> pure (Left (NoSuchHeadNameError headName'))
Just headTrans -> do
let graphInfo = if Sess.parentId session == transactionId headTrans then do
ret <- Graph.evalGraphOp stamp id1 (Sess.disconnectedTransaction session) graph Commit
pure (ret, [id1])
else do
ret <- Graph.autoMergeToHead stamp (id1, id2, id3) (Sess.disconnectedTransaction session) headName' strat graph
pure (ret, [id1,id2,id3])
case graphInfo of
Left err -> pure (Left err)
Right ((discon', graph'), transactionIdsAdded) ->
pure (Right (discon', graph', transactionIdsAdded))
autoMergeToHead sessionId conn@(RemoteProcessConnection _) strat headName' = remoteCall conn (ExecuteAutoMergeToHead sessionId strat headName')
executeDatabaseContextIOExpr :: SessionId -> Connection -> DatabaseContextIOExpr -> IO (Either RelationalError ())
executeDatabaseContextIOExpr sessionId (InProcessConnection conf) expr = excEither $ do
let sessions = ipSessions conf
scriptSession = ipScriptSession conf
eSession <- atomically $ sessionForSessionId sessionId sessions
case eSession of
Left err -> pure (Left err)
Right session -> do
res <- RE.evalDatabaseContextIOExpr scriptSession (Sess.concreteDatabaseContext session) expr
case res of
Left err -> pure (Left err)
Right context' -> do
let newDiscon = DisconnectedTransaction (Sess.parentId session) newSchemas True
newSchemas = Schemas context' (Sess.subschemas session)
newSession = Session newDiscon (Sess.schemaName session)
atomically $ StmMap.insert newSession sessionId sessions
pure (Right ())
executeDatabaseContextIOExpr sessionId conn@(RemoteProcessConnection _) dbExpr = remoteCall conn (ExecuteDatabaseContextIOExpr sessionId dbExpr)
executeCommitExprSTM_ :: DatabaseContext -> DatabaseContext -> ClientNodes -> STM (EvaluatedNotifications, ClientNodes)
executeCommitExprSTM_ oldContext newContext nodes = do
let nots = notifications oldContext
fireNots = notificationChanges nots oldContext newContext
evaldNots = M.map mkEvaldNot fireNots
evalInContext expr ctx = runReader (RE.evalRelationalExpr expr) (RE.mkRelationalExprState ctx)
mkEvaldNot notif = EvaluatedNotification { notification = notif,
reportOldRelation = evalInContext (reportOldExpr notif) oldContext,
reportNewRelation = evalInContext (reportNewExpr notif) newContext}
pure (evaldNots, nodes)
executeGraphExpr :: SessionId -> Connection -> TransactionGraphOperator -> IO (Either RelationalError ())
executeGraphExpr sessionId (InProcessConnection conf) graphExpr = excEither $ do
let sessions = ipSessions conf
freshId <- nextRandom
stamp <- getCurrentTime
commitLock_ sessionId conf $ \updatedGraph -> do
eSession <- sessionForSessionId sessionId sessions
case eSession of
Left err -> pure (Left err)
Right session -> do
let discon = Sess.disconnectedTransaction session
case evalGraphOp stamp freshId discon updatedGraph graphExpr of
Left err -> pure (Left err)
Right (discon', graph') -> do
let transIds = [freshId | isRight (transactionForId freshId graph')]
pure (Right (discon', graph', transIds))
executeGraphExpr sessionId conn@(RemoteProcessConnection _) graphExpr = remoteCall conn (ExecuteGraphExpr sessionId graphExpr)
executeTransGraphRelationalExpr :: SessionId -> Connection -> TransGraphRelationalExpr -> IO (Either RelationalError Relation)
executeTransGraphRelationalExpr _ (InProcessConnection conf) tgraphExpr = excEither . atomically $ do
let graphTvar = ipTransactionGraph conf
graph <- readTVar graphTvar
case evalTransGraphRelationalExpr tgraphExpr graph of
Left err -> pure (Left err)
Right relExpr -> case runReader (RE.evalRelationalExpr relExpr) (RE.mkRelationalExprState DBC.empty) of
Left err -> pure (Left err)
Right rel -> pure (force (Right rel))
executeTransGraphRelationalExpr sessionId conn@(RemoteProcessConnection _) tgraphExpr = remoteCall conn (ExecuteTransGraphRelationalExpr sessionId tgraphExpr)
executeSchemaExpr :: SessionId -> Connection -> Schema.SchemaExpr -> IO (Either RelationalError ())
executeSchemaExpr sessionId (InProcessConnection conf) schemaExpr = atomically $ do
let sessions = ipSessions conf
eSession <- sessionAndSchema sessionId sessions
case eSession of
Left err -> pure (Left err)
Right (session, _) -> do
let subschemas' = subschemas session
case Schema.evalSchemaExpr schemaExpr (Sess.concreteDatabaseContext session) subschemas' of
Left err -> pure (Left err)
Right (newSubschemas, newContext) -> do
let discon = Sess.disconnectedTransaction session
newSchemas = Schemas newContext newSubschemas
newSession = Session (DisconnectedTransaction (Discon.parentId discon) newSchemas True) (Sess.schemaName session)
StmMap.insert newSession sessionId sessions
pure (Right ())
executeSchemaExpr sessionId conn@(RemoteProcessConnection _) schemaExpr = remoteCall conn (ExecuteSchemaExpr sessionId schemaExpr)
commit :: SessionId -> Connection -> IO (Either RelationalError ())
commit sessionId conn@(InProcessConnection _) = executeGraphExpr sessionId conn Commit
commit sessionId conn@(RemoteProcessConnection _) = remoteCall conn (ExecuteGraphExpr sessionId Commit)
sendNotifications :: [ProcessId] -> LocalNode -> EvaluatedNotifications -> IO ()
sendNotifications pids localNode nots = mapM_ sendNots pids
where
sendNots remoteClientPid =
unless (M.null nots) $ runProcess localNode $ send remoteClientPid (NotificationMessage nots)
rollback :: SessionId -> Connection -> IO (Either RelationalError ())
rollback sessionId conn@(InProcessConnection _) = executeGraphExpr sessionId conn Rollback
rollback sessionId conn@(RemoteProcessConnection _) = remoteCall conn (ExecuteGraphExpr sessionId Rollback)
processTransactionGraphPersistence :: PersistenceStrategy -> [TransactionId] -> TransactionGraph -> IO ()
processTransactionGraphPersistence NoPersistence _ _ = pure ()
processTransactionGraphPersistence (MinimalPersistence dbdir) transIds graph = transactionGraphPersist NoDiskSync dbdir transIds graph >> pure ()
processTransactionGraphPersistence (CrashSafePersistence dbdir) transIds graph = transactionGraphPersist FsyncDiskSync dbdir transIds graph >> pure ()
readGraphTransactionIdDigest :: PersistenceStrategy -> IO LockFileHash
readGraphTransactionIdDigest NoPersistence = error "attempt to read digest from transaction log without persistence enabled"
readGraphTransactionIdDigest (MinimalPersistence dbdir) = readGraphTransactionIdFileDigest dbdir
readGraphTransactionIdDigest (CrashSafePersistence dbdir) = readGraphTransactionIdFileDigest dbdir
typeForRelationalExpr :: SessionId -> Connection -> RelationalExpr -> IO (Either RelationalError Relation)
typeForRelationalExpr sessionId conn@(InProcessConnection _) relExpr = atomically $ typeForRelationalExprSTM sessionId conn relExpr
typeForRelationalExpr sessionId conn@(RemoteProcessConnection _) relExpr = remoteCall conn (ExecuteTypeForRelationalExpr sessionId relExpr)
typeForRelationalExprSTM :: SessionId -> Connection -> RelationalExpr -> STM (Either RelationalError Relation)
typeForRelationalExprSTM sessionId (InProcessConnection conf) relExpr = do
let sessions = ipSessions conf
eSession <- sessionAndSchema sessionId sessions
case eSession of
Left err -> pure $ Left err
Right (session, schema) -> do
let processed = if schemaName session == defaultSchemaName then
Right relExpr
else
Schema.processRelationalExprInSchema schema relExpr
case processed of
Left err -> pure (Left err)
Right relExpr' -> pure $ runReader (RE.typeForRelationalExpr relExpr') (RE.mkRelationalExprState (Sess.concreteDatabaseContext session))
typeForRelationalExprSTM _ _ _ = error "typeForRelationalExprSTM called on non-local connection"
inclusionDependencies :: SessionId -> Connection -> IO (Either RelationalError InclusionDependencies)
inclusionDependencies sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
atomically $ do
eSession <- sessionAndSchema sessionId sessions
case eSession of
Left err -> pure $ Left err
Right (session, schema) -> do
let context = Sess.concreteDatabaseContext session
if schemaName session == defaultSchemaName then
pure $ Right (B.inclusionDependencies context)
else
pure (Schema.inclusionDependenciesInSchema schema (B.inclusionDependencies context))
inclusionDependencies sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveInclusionDependencies sessionId)
typeConstructorMapping :: SessionId -> Connection -> IO (Either RelationalError TypeConstructorMapping)
typeConstructorMapping sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
atomically $ do
eSession <- sessionAndSchema sessionId sessions
case eSession of
Left err -> pure $ Left err
Right (session, _) ->
pure (Right (B.typeConstructorMapping (Sess.concreteDatabaseContext session)))
typeConstructorMapping sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveTypeConstructorMapping sessionId)
planForDatabaseContextExpr :: SessionId -> Connection -> DatabaseContextExpr -> IO (Either RelationalError DatabaseContextExpr)
planForDatabaseContextExpr sessionId (InProcessConnection conf) dbExpr = do
let sessions = ipSessions conf
atomically $ do
eSession <- sessionAndSchema sessionId sessions
case eSession of
Left err -> pure $ Left err
Right (session, _) -> if schemaName session == defaultSchemaName then
pure $ evalState (applyStaticDatabaseOptimization dbExpr) (RE.freshDatabaseState (Sess.concreteDatabaseContext session))
else
pure (Right dbExpr)
planForDatabaseContextExpr sessionId conn@(RemoteProcessConnection _) dbExpr = remoteCall conn (RetrievePlanForDatabaseContextExpr sessionId dbExpr)
transactionGraphAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
transactionGraphAsRelation sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
tvar = ipTransactionGraph conf
atomically $ do
eSession <- sessionForSessionId sessionId sessions
case eSession of
Left err -> pure $ Left err
Right session ->
graphAsRelation (Sess.disconnectedTransaction session) <$> readTVar tvar
transactionGraphAsRelation sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveTransactionGraph sessionId)
relationVariablesAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
relationVariablesAsRelation sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
atomically $ do
eSession <- sessionAndSchema sessionId sessions
case eSession of
Left err -> pure (Left err)
Right (session, schema) -> do
let context = Sess.concreteDatabaseContext session
if Sess.schemaName session == defaultSchemaName then
pure $ R.relationVariablesAsRelation (relationVariables context)
else
case Schema.relationVariablesInSchema schema context of
Left err -> pure (Left err)
Right relvars -> pure $ R.relationVariablesAsRelation relvars
relationVariablesAsRelation sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveRelationVariableSummary sessionId)
atomFunctionsAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
atomFunctionsAsRelation sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
atomically $ do
eSession <- sessionAndSchema sessionId sessions
case eSession of
Left err -> pure (Left err)
Right (session, _) ->
pure (AF.atomFunctionsAsRelation (atomFunctions (concreteDatabaseContext session)))
atomFunctionsAsRelation sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveAtomFunctionSummary sessionId)
databaseContextFunctionsAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
databaseContextFunctionsAsRelation sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
atomically $ do
eSession <- sessionAndSchema sessionId sessions
case eSession of
Left err -> pure (Left err)
Right (session, _) ->
pure (DCF.databaseContextFunctionsAsRelation (dbcFunctions (concreteDatabaseContext session)))
databaseContextFunctionsAsRelation sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveDatabaseContextFunctionSummary sessionId)
headTransactionId :: SessionId -> Connection -> IO (Either RelationalError TransactionId)
headTransactionId sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
atomically $ do
eSession <- sessionForSessionId sessionId sessions
case eSession of
Left err -> pure (Left err)
Right session -> pure $ Right (Sess.parentId session)
headTransactionId sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveHeadTransactionId sessionId)
headNameSTM_ :: SessionId -> Sessions -> TVar TransactionGraph -> STM (Either RelationalError HeadName)
headNameSTM_ sessionId sessions graphTvar = do
graph <- readTVar graphTvar
eSession <- sessionForSessionId sessionId sessions
case eSession of
Left err -> pure (Left err)
Right session -> case transactionForId (Sess.parentId session) graph of
Left err -> pure (Left err)
Right parentTrans -> case headNameForTransaction parentTrans graph of
Nothing -> pure (Left UnknownHeadError)
Just headName' -> pure (Right headName')
headName :: SessionId -> Connection -> IO (Either RelationalError HeadName)
headName sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
graphTvar = ipTransactionGraph conf
atomically (headNameSTM_ sessionId sessions graphTvar)
headName sessionId conn@(RemoteProcessConnection _) = remoteCall conn (ExecuteHeadName sessionId)
atomTypesAsRelation :: SessionId -> Connection -> IO (Either RelationalError Relation)
atomTypesAsRelation sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
atomically $ do
eSession <- sessionForSessionId sessionId sessions
case eSession of
Left err -> pure (Left err)
Right session ->
case typesAsRelation (B.typeConstructorMapping (Sess.concreteDatabaseContext session)) of
Left err -> pure (Left err)
Right rel -> pure (Right rel)
atomTypesAsRelation sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveAtomTypesAsRelation sessionId)
disconnectedTransactionIsDirty :: SessionId -> Connection -> IO (Either RelationalError Bool)
disconnectedTransactionIsDirty sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
atomically $ do
eSession <- sessionForSessionId sessionId sessions
case eSession of
Left err -> pure (Left err)
Right session ->
pure (Right (isDirty session))
disconnectedTransactionIsDirty sessionId conn@(RemoteProcessConnection _) = remoteCall conn (RetrieveSessionIsDirty sessionId)
callTestTimeout_ :: SessionId -> Connection -> IO Bool
callTestTimeout_ _ (InProcessConnection _) = error "bad testing call"
callTestTimeout_ sessionId conn@(RemoteProcessConnection _) = remoteCall conn (TestTimeout sessionId)
transactionGraph_ :: Connection -> IO TransactionGraph
transactionGraph_ (InProcessConnection conf) = readTVarIO (ipTransactionGraph conf)
transactionGraph_ _ = error "remote connection used"
disconnectedTransaction_ :: SessionId -> Connection -> IO DisconnectedTransaction
disconnectedTransaction_ sessionId (InProcessConnection conf) = do
let sessions = ipSessions conf
mSession <- atomically $ StmMap.lookup sessionId sessions
case mSession of
Nothing -> error "No such session"
Just (Sess.Session discon _) -> pure discon
disconnectedTransaction_ _ _= error "remote connection used"
commitLock_ :: SessionId ->
InProcessConnectionConf ->
(TransactionGraph ->
STM (Either RelationalError (DisconnectedTransaction, TransactionGraph, [TransactionId]))) ->
IO (Either RelationalError ())
commitLock_ sessionId conf stmBlock = do
let sessions = ipSessions conf
strat = ipPersistenceStrategy conf
mScriptSession = ipScriptSession conf
graphTvar = ipTransactionGraph conf
clientNodes = ipClientNodes conf
mLockFileH = ipLocks conf
lockHandler body = case mLockFileH of
Nothing -> body False
Just (lockFileH, lockMVar) ->
let acquireLocks = do
lastWrittenDigest <- takeMVar lockMVar
lockFile lockFileH WriteLock
latestDigest <- readGraphTransactionIdDigest strat
pure (latestDigest /= lastWrittenDigest)
releaseLocks _ = do
gDigest <- readGraphTransactionIdDigest strat
unlockFile lockFileH
putMVar lockMVar gDigest
in bracket acquireLocks releaseLocks body
manip <- lockHandler $ \dbWrittenByOtherProcess -> atomically $ do
eSession <- sessionForSessionId sessionId sessions
oldGraph <- readTVar graphTvar
case eSession of
Left err -> pure (Left err)
Right session -> do
let dbdir = case strat of
MinimalPersistence x -> x
CrashSafePersistence x -> x
_ -> error "accessing dbdir on non-persisted connection"
eRefreshedGraph <- if dbWrittenByOtherProcess then
unsafeIOToSTM (transactionGraphLoad dbdir oldGraph mScriptSession)
else
pure (Right oldGraph)
case eRefreshedGraph of
Left err -> pure (Left (DatabaseLoadError err))
Right refreshedGraph -> do
eGraph <- stmBlock refreshedGraph
case eGraph of
Left err -> pure (Left err)
Right (discon', graph', transactionIdsToPersist) -> do
writeTVar graphTvar graph'
let newSession = Session discon' (Sess.schemaName session)
StmMap.insert newSession sessionId sessions
case transactionForId (Sess.parentId session) oldGraph of
Left err -> pure $ Left err
Right previousTrans ->
if not (Prelude.null transactionIdsToPersist) then do
(evaldNots, nodes) <- executeCommitExprSTM_ (Trans.concreteDatabaseContext previousTrans) (Sess.concreteDatabaseContext session) clientNodes
nodesToNotify <- stmSetToList nodes
pure $ Right (evaldNots, nodesToNotify, graph', transactionIdsToPersist)
else pure (Right (M.empty, [], graph', []))
case manip of
Left err -> pure (Left err)
Right (notsToFire, nodesToNotify, newGraph, transactionIdsToPersist) -> do
processTransactionGraphPersistence strat transactionIdsToPersist newGraph
sendNotifications nodesToNotify (ipLocalNode conf) notsToFire
pure (Right ())
withTransaction :: SessionId -> Connection -> IO (Either RelationalError a) -> IO (Either RelationalError ()) -> IO (Either RelationalError a)
withTransaction sessionId conn io successFunc = bracketOnError (pure ()) (const do_rollback) block
where
do_rollback = rollback sessionId conn
block _ = do
eErr <- io
case eErr of
Left err -> do
_ <- do_rollback
pure (Left err)
Right val -> do
eIsDirty <- disconnectedTransactionIsDirty sessionId conn
case eIsDirty of
Left err -> pure (Left err)
Right dirty ->
if dirty then do
res <- successFunc
case res of
Left err -> pure (Left err)
Right _ -> pure (Right val)
else
pure (Right val)
executeDataFrameExpr :: SessionId -> Connection -> DF.DataFrameExpr -> IO (Either RelationalError DF.DataFrame)
executeDataFrameExpr sessionId conn@(InProcessConnection _) dfExpr = do
eRel <- executeRelationalExpr sessionId conn (DF.convertExpr dfExpr)
case eRel of
Left err -> pure (Left err)
Right rel -> do
let relAttrs = R.attributes rel
attrName (DF.AttributeOrderExpr name _) = name
order (DF.AttributeOrderExpr _ ord) = ord
orders = map order (DF.orderExprs dfExpr)
attributeForName' = flip attributeForName relAttrs
attrNames = map attrName (DF.orderExprs dfExpr)
verified = forM attrNames attributeForName'
case verified of
Left err -> pure (Left err)
Right attrs -> do
let attrOrders = map (\(attr',order') -> DF.AttributeOrder (attributeName attr') order') (zip attrs orders)
case DF.sortDataFrameBy attrOrders . DF.toDataFrame $ rel of
Left err -> pure (Left err)
Right dFrame -> do
let dFrame' = maybe dFrame (`DF.drop'` dFrame) (DF.offset dfExpr)
dFrame'' = maybe dFrame' (`DF.take'` dFrame') (DF.limit dfExpr)
pure (Right dFrame'')
executeDataFrameExpr sessionId conn@(RemoteProcessConnection _) dfExpr = remoteCall conn (ExecuteDataFrameExpr sessionId dfExpr)