module Database.Sql.Util.Lineage.ColumnPlus where
import Database.Sql.Type
import Database.Sql.Position
import Database.Sql.Info
import Database.Sql.Util.Eval
import Data.List.NonEmpty (NonEmpty ((:|)))
import Data.Maybe (mapMaybe, maybeToList)
import Data.Map (Map)
import qualified Data.Map as M
import Data.Set (Set)
import qualified Data.Set as S
import Data.Proxy
import Data.Semigroup
import Control.Arrow
import Control.Applicative
import Control.Monad.Identity
import Control.Monad.Except
import Control.Monad.Writer (Writer, runWriter, execWriter, writer, tell)
import Data.Foldable
type ColumnLineagePlus = Map (Either FQTN FQCN) ColumnPlusSet
class HasColumnLineage q where
getColumnLineage :: q -> (RecordSet ColumnLineage, ColumnLineagePlus)
instance HasColumnLineage (Statement d ResolvedNames Range) where
getColumnLineage stmt = columnLineage stmt
emptyLineage :: [FQColumnName ()] -> ColumnLineagePlus
emptyLineage = M.fromList . map (, emptyColumnPlusSet) . map (Right . fqcnToFQCN)
data ColumnLineage
data ColumnPlusSet = ColumnPlusSet
{ columnPlusColumns :: Map FQCN (Map FieldChain (Set Range))
, columnPlusTables :: Map FQTN (Set Range)
} deriving (Eq, Show)
instance Semigroup ColumnPlusSet where
ColumnPlusSet m s <> ColumnPlusSet n t = ColumnPlusSet (M.unionWith (M.unionWith S.union) m n) (M.unionWith S.union s t)
instance Monoid ColumnPlusSet where
mempty = ColumnPlusSet mempty mempty
mappend = (<>)
emptyColumnPlusSet :: ColumnPlusSet
emptyColumnPlusSet = ColumnPlusSet M.empty M.empty
singleColumnSet :: Range -> FullyQualifiedColumnName -> ColumnPlusSet
singleColumnSet info c = ColumnPlusSet (M.singleton c $ M.singleton (FieldChain M.empty) $ S.singleton info) M.empty
singleTableSet :: Range -> FullyQualifiedTableName -> ColumnPlusSet
singleTableSet info t = ColumnPlusSet M.empty (M.singleton t $ S.singleton info)
mergeLineages :: Writer ColumnPlusSet [ColumnPlusSet] -> Writer ColumnPlusSet [ColumnPlusSet] -> EvalT ColumnLineage 'TableContext Identity (Writer ColumnPlusSet [ColumnPlusSet])
mergeLineages = (pure .) . liftA2 (zipWith (<>))
instance Evaluation ColumnLineage where
type EvalValue ColumnLineage = ColumnPlusSet
type EvalRow ColumnLineage = Writer ColumnPlusSet
type EvalMonad ColumnLineage = Identity
addItems _ = mergeLineages
removeItems _ = mergeLineages
unionItems _ = mergeLineages
intersectItems _ = mergeLineages
distinctItems _ = id
offsetItems _ _ = id
limitItems _ _ = id
filterBy expr (RecordSet cs rs) = do
let p = Proxy :: Proxy ColumnLineage
let (r, rowCount) = runWriter rs
x <- exprToTable (eval p expr) $ makeRowMap cs r
pure $ makeRecordSet (Proxy :: Proxy ColumnLineage) cs $ writer (map (x <>) r, x <> rowCount)
inList x xs = pure $ foldl' (<>) x xs
inSubquery x xs =
let (ys, y) = runWriter xs
in pure $ foldl' (<>) x (y:ys)
existsSubquery rs = pure $ execWriter rs
atTimeZone ts tz = pure $ ts <> tz
handleConstant _ _ = pure emptyColumnPlusSet
handleCases p cases else_ = do
cases' :: [ColumnPlusSet] <- mapM (\ (when_, then_) -> (<>) <$> eval p when_ <*> eval p then_) cases
else_' <- maybe (pure emptyColumnPlusSet) (eval p) else_
pure $ foldl' (<>) else_' cases'
handleFunction p _ _ args params filter' _ = mconcat <$> mapM (eval p)
( args
++ map snd params
++ map filterExpr (maybeToList filter')
)
handleGroups cs gs =
pure $ RecordSet cs $ do
(g, rs) <- gs
r <- rs
mapM_ tell g
pure $ map (\ v -> sconcat (v:|g)) r
handleLike p (Operator _) escape pattern expr = do
let addEscape = maybe id ((:) . escapeExpr) escape
exprs = addEscape [patternExpr pattern, expr]
mconcat <$> mapM (eval p) exprs
handleOrder _ _ = pure
handleSubquery xs = case runWriter xs of
([x], y) -> pure (x<>y)
(_, _) -> throwError "wrong number of columns from subquery"
handleJoin p joinType cond x y = clip joinType <$> eval p cond x y
where
clip (JoinSemi _) = \ set ->
let n = length $ recordSetLabels x
in RecordSet
{ recordSetLabels = take n $ recordSetLabels set
, recordSetItems = fmap (take n) $ recordSetItems set
}
clip _ = id
handleStructField expr field = go expr (FieldChain $ M.singleton (void field) $ FieldChain M.empty, getInfo expr)
where
go (ColumnExpr _ (RColumnRef col)) (chain, info) = pure ColumnPlusSet
{ columnPlusColumns = M.singleton (fqcnToFQCN col) $ M.singleton chain $ S.singleton info
, columnPlusTables = mempty
}
go (FieldAccessExpr _ expr' field') chain = go expr' $ first (FieldChain . M.singleton (void field')) chain
go expr' _ = eval (Proxy :: Proxy ColumnLineage) expr'
handleTypeCast _ expr _ = eval (Proxy :: Proxy ColumnLineage) expr
binop _ _ = Just $ (pure .) . (<>)
unop _ _ = Just $ pure . id
ancestorsForTableName :: RTableName Range -> Maybe (RecordSet ColumnLineage)
ancestorsForTableName (RTableName fqtn SchemaMember{..}) =
let recordSetLabels = map RColumnRef columns
info = getInfo fqtn
recordSetItems = writer (map (singleColumnSet info . fqcnToFQCN) columns, singleTableSet info $ fqtnToFQTN $ void fqtn)
columns = map (qualifyColumnName fqtn) columnsList
in Just RecordSet{..}
truncateTableLineage :: FQTableName a -> [FQColumnName ()] -> ColumnLineagePlus
truncateTableLineage tableName columns = M.insert (Left $ fqtnToFQTN tableName) emptyColumnPlusSet $ emptyLineage columns
evalDefaultExpr :: DefaultExpr ResolvedNames Range -> EvalResult ColumnLineage (Expr ResolvedNames Range)
evalDefaultExpr (DefaultValue _) = pure emptyColumnPlusSet
evalDefaultExpr (ExprValue expr) = eval (Proxy :: Proxy ColumnLineage) expr
returnNothing :: ColumnLineagePlus -> (RecordSet ColumnLineage, ColumnLineagePlus)
returnNothing = (emptyRecordSet Proxy,)
columnLineage :: Statement d ResolvedNames Range -> (RecordSet ColumnLineage, ColumnLineagePlus)
columnLineage (QueryStmt query) =
let p = Proxy :: Proxy ColumnLineage
in case runEval (eval p query) ancestorsForTableName of
Left err -> error $ "failed to evaluate column lineage for query: " ++ err
Right r -> (r, mempty)
columnLineage (InsertStmt Insert{insertTable = RTableName tableName SchemaMember{..}, ..}) =
returnNothing $
let columns :: [Either FQTN FQCN]
columns = case insertColumns of
Just (c:|cs) -> map (columnNameToKey . extractColumnRef) $ c : cs
Nothing -> map (columnNameToKey . qualifyColumnName tableName) columnsList
in mergeExisting $ case insertValues of
InsertExprValues _ exprs -> case runEval (mapM (mapM evalDefaultExpr) exprs) ancestorsForTableName of
Left err -> error $ "failed to evaluate column lineage for insert exprs: " ++ err
Right rows -> M.fromList $ zip columns $ foldl1 (zipWith (<>)) $ map toList $ toList rows
InsertSelectValues query -> case runEval (eval p query) ancestorsForTableName of
Left err -> error $ "failed to evaluate column lineage for insert query: " ++ err
Right RecordSet{..} ->
let (columnsLineage, tableLineage) = runWriter recordSetItems
in M.insert (Left $ fqtnToFQTN tableName) tableLineage $ M.fromList $ zip columns columnsLineage
InsertDefaultValues _ -> M.empty
InsertDataFromFile _ _ -> M.empty
where
p = Proxy :: Proxy ColumnLineage
mergeExisting :: ColumnLineagePlus -> ColumnLineagePlus
mergeExisting = M.unionWith (<>) existing
existing :: ColumnLineagePlus
existing = importExistingTable existingColumns
existingColumns = M.mapKeysMonotonic Right $ M.fromSet importExistingColumns $ S.fromList $ map (fqcnToFQCN . qualifyColumnName tableName) columnsList
extractColumnRef :: RColumnRef Range -> FQColumnName Range
extractColumnRef (RColumnRef fqcn) = fqcn
extractColumnRef (RColumnAlias _) = error "this shouldn't happen"
columnNameToKey :: FQColumnName a -> Either FQTN FQCN
columnNameToKey = Right . fqcnToFQCN
(importExistingColumns, importExistingTable) = case insertBehavior of
InsertOverwrite _ -> (const emptyColumnPlusSet, id)
_ ->
( singleColumnSet insertInfo
, M.insert (Left $ fqtnToFQTN tableName) (singleTableSet (getInfo tableName) $ fqtnToFQTN tableName)
)
columnLineage (UpdateStmt Update{..}) =
returnNothing $
let lineagesFromSetExprs :: ColumnLineagePlus = case runEval (mapM evalDefaultExpr exprs) ancestorsForTableName of
Left err -> error $ "failed to evaluate column lineage for update exprs: " ++ err
Right lins -> M.fromList $ zip columns lins
ancestorsFromWhere :: ColumnPlusSet = case updateWhere of
Just expr -> case runEval (filterBy expr table) ancestorsForTableName of
Left err -> error $ "failed to evaluate column lineage for update expr: " ++ err
Right lins -> execWriter $ recordSetItems lins
Nothing -> mempty
in mergeExisting $ M.map (<> ancestorsFromWhere) lineagesFromSetExprs
where
table :: RecordSet ColumnLineage
Just table = ancestorsForTableName updateTable
existing :: ColumnLineagePlus
existing = M.mapKeysMonotonic Right $ M.fromSet (singleColumnSet updateInfo) $ S.fromList $ map (fqcnToFQCN) fqcns
mergeExisting :: ColumnLineagePlus -> ColumnLineagePlus
mergeExisting = M.unionWith (<>) existing
columns = map (Right . fqcnToFQCN) fqcns
fqcns = map (\(RColumnRef fqcn) -> fqcn) colRefs
(colRefs, exprs) = unzip $ toList updateSetExprs
columnLineage (DeleteStmt (Delete _ (RTableName _ SchemaMember{viewQuery = Just _}) _)) = error "delete statement targeting view"
columnLineage (DeleteStmt (Delete _ (RTableName tableName SchemaMember{viewQuery = Nothing, ..}) maybeExpr)) =
returnNothing $
let columns = map (qualifyColumnName tableName) columnsList
in case maybeExpr of
Nothing -> truncateTableLineage tableName columns
Just expr ->
let lineages = map (singleColumnSet (getInfo tableName) . fqcnToFQCN) columns
table :: RecordSet ColumnLineage
table = RecordSet (map RColumnRef columns) (writer (lineages, singleTableSet (getInfo tableName) $ fqtnToFQTN tableName))
in case runEval (filterBy expr table) ancestorsForTableName of
Left err -> error $ "failed to evaluate column lineage for delete query: " ++ err
Right RecordSet{..} ->
let (columnsLineage, tableLineage) = runWriter recordSetItems
in M.insert (Left $ fqtnToFQTN tableName) (singleTableSet (getInfo tableName) (fqtnToFQTN tableName) <> tableLineage) $ M.fromList $ zip (map (Right . fqcnToFQCN) columns) columnsLineage
columnLineage (TruncateStmt (Truncate _ (RTableName _ SchemaMember{viewQuery = Just _}))) = error "truncate statement targeting view"
columnLineage (TruncateStmt (Truncate _ (RTableName tableName SchemaMember{viewQuery = Nothing, ..}))) =
returnNothing $
let columns = map (qualifyColumnName tableName) columnsList
in truncateTableLineage tableName columns
columnLineage (CreateTableStmt CreateTable{createTableName = RCreateTableName _ Exists}) = returnNothing M.empty
columnLineage (CreateTableStmt CreateTable{createTableName = RCreateTableName tableName DoesNotExist, ..}) =
returnNothing $
case createTableDefinition of
TableColumns _ (c:|cs) ->
M.insert (Left $ fqtnToFQTN tableName) emptyColumnPlusSet
$ emptyLineage $ (`mapMaybe` (c:cs)) $ \case
ColumnOrConstraintConstraint _ -> Nothing
ColumnOrConstraintColumn ColumnDefinition{..} -> Just $ qualifyColumnName tableName columnDefinitionName
TableLike _ (RTableName _ SchemaMember{..}) ->
M.insert (Left $ fqtnToFQTN tableName) emptyColumnPlusSet
$ emptyLineage $ map (qualifyColumnName tableName) columnsList
TableAs _ maybeColumns query -> case runEval (eval (Proxy :: Proxy ColumnLineage) query) ancestorsForTableName of
Left err -> error $ "failed to evaluate column lineage for create table statement: " ++ err
Right RecordSet{..} ->
let columns = maybe queryColumns proc maybeColumns
proc = map (qualifyColumnName tableName) . toList
queryColumns = map (qualifyColumnName tableName . resolvedColumnName) recordSetLabels
resolvedColumnName (RColumnRef fqtn) = fqtn{columnNameTable = None}
resolvedColumnName (RColumnAlias (ColumnAlias _ name _)) = QColumnName () None name
(columnsLineage, tableLineage) = runWriter recordSetItems
in M.insert (Left $ fqtnToFQTN tableName) tableLineage $ M.fromList $ zip (map (Right . fqcnToFQCN) columns) columnsLineage
TableNoColumnInfo _ -> M.empty
columnLineage (DropTableStmt DropTable{dropTableNames = tables}) =
returnNothing $
M.unions $ map
(\case
RDropExistingTableName tableName SchemaMember{..} ->
let columns = map (qualifyColumnName tableName) columnsList
in truncateTableLineage tableName columns
RDropMissingTableName _ -> M.empty
) $ toList tables
columnLineage (AlterTableStmt (AlterTableRenameTable _ (RTableName from SchemaMember{..}) (RTableName to _))) =
returnNothing $
let as = map (qualifyColumnName from) columnsList
ds = map (qualifyColumnName to) columnsList
in M.insert (Left $ fqtnToFQTN to) (singleTableSet (getInfo from) $ fqtnToFQTN from)
$ M.insert (Left $ fqtnToFQTN from) emptyColumnPlusSet
$ M.union (emptyLineage as) $ M.fromList $ zip (map (Right . fqcnToFQCN) ds) $ map (singleColumnSet (getInfo from) . fqcnToFQCN) as
columnLineage (AlterTableStmt (AlterTableRenameColumn _ (RTableName table _) from to)) =
returnNothing $
let a = fqcnToFQCN $ qualifyColumnName table from
d = fqcnToFQCN $ qualifyColumnName table to
in M.fromList [(Right d, singleColumnSet (getInfo table) a), (Right a, emptyColumnPlusSet)]
columnLineage (AlterTableStmt (AlterTableAddColumns _ (RTableName table _) (c:|cs))) =
returnNothing $ emptyLineage $ map (qualifyColumnName table) (c:cs)
columnLineage (CreateViewStmt _) = returnNothing M.empty
columnLineage (DropViewStmt _) = returnNothing M.empty
columnLineage (CreateSchemaStmt _) = returnNothing M.empty
columnLineage (GrantStmt _) = returnNothing M.empty
columnLineage (RevokeStmt _) = returnNothing M.empty
columnLineage (BeginStmt _) = returnNothing M.empty
columnLineage (CommitStmt _) = returnNothing M.empty
columnLineage (RollbackStmt _) = returnNothing M.empty
columnLineage (ExplainStmt _ _) = returnNothing M.empty
columnLineage (EmptyStmt _) = returnNothing M.empty