module Spark.Core.Internal.Pruning(
NodeCacheStatus(..),
NodeCacheInfo(..),
NodeCache,
pruneGraph,
pruneGraphDefault,
emptyNodeCache
) where
import Data.HashMap.Strict as HM
import Spark.Core.StructuresInternal(NodeId, NodePath, ComputationID)
import Spark.Core.Internal.DatasetStructures(UntypedNode, StructureEdge)
import Spark.Core.Internal.DAGFunctions
import Spark.Core.Internal.DAGStructures
import Spark.Core.Internal.DatasetFunctions
import Spark.Core.Internal.OpStructures
data NodeCacheStatus =
NodeCacheRunning
| NodeCacheError
| NodeCacheSuccess
deriving (Eq, Show)
data NodeCacheInfo = NodeCacheInfo {
nciStatus :: !NodeCacheStatus,
nciComputation :: !ComputationID,
nciPath :: !NodePath
} deriving (Eq, Show)
type NodeCache = HM.HashMap NodeId NodeCacheInfo
emptyNodeCache :: NodeCache
emptyNodeCache = HM.empty
pruneGraph :: (Show v) =>
NodeCache ->
(v -> NodeId) ->
(v -> NodeCacheInfo -> v) ->
Graph v StructureEdge ->
Graph v StructureEdge
pruneGraph c getNodeId f g =
let depGraph = reverseGraph g
fop v = if HM.member (getNodeId v) c
then CutChildren
else Keep
filtered = graphFilterVertices fop depGraph
comFiltered = reverseGraph filtered
repOp v = case HM.lookup (getNodeId v) c of
Just nci -> f v nci
Nothing -> v
g' = graphMapVertices' repOp comFiltered
in g'
pruneGraphDefault ::
NodeCache -> Graph UntypedNode StructureEdge -> Graph UntypedNode StructureEdge
pruneGraphDefault c = pruneGraph c nodeId _createNodeCache
_createNodeCache :: UntypedNode -> NodeCacheInfo -> UntypedNode
_createNodeCache n nci =
let name = "org.spark.PlaceholderCache"
no = NodePointer (Pointer (nciComputation nci) (nciPath nci))
n2 = emptyNodeStandard (nodeLocality n) (nodeType n) name
`updateNodeOp` no
in n2