module Spark.Core.Internal.DatasetFunctions(
parents,
untyped,
logicalParents,
depends,
dataframe,
asDF,
asDS,
asLocalObservable,
identity,
autocache,
cache,
uncache,
union,
castLocality,
emptyDataset,
emptyLocalData,
emptyNodeStandard,
nodeId,
nodeLogicalDependencies,
nodeLogicalParents,
nodeLocality,
nodeName,
nodePath,
nodeOp,
nodeParents,
nodeType,
untypedDataset,
updateNode,
fun1ToOpTyped,
fun2ToOpTyped,
nodeOpToFun1,
nodeOpToFun1Typed,
nodeOpToFun1Untyped,
nodeOpToFun2,
nodeOpToFun2Typed,
opnameCache,
opnameUnpersist,
opnameAutocache,
) where
import qualified Crypto.Hash.SHA256 as SHA
import qualified Data.Aeson as A
import qualified Data.Text as T
import qualified Data.Text.Format as TF
import qualified Data.Vector as V
import Data.Aeson((.=), toJSON)
import Data.Text.Encoding(decodeUtf8)
import Data.ByteString.Base16(encode)
import Data.Maybe(fromMaybe, listToMaybe)
import Data.Text.Lazy(toStrict)
import Data.String(IsString(fromString))
import Formatting
import Spark.Core.StructuresInternal
import Spark.Core.Try
import Spark.Core.Row
import Spark.Core.Internal.TypesStructures
import Spark.Core.Internal.DatasetStructures
import Spark.Core.Internal.OpStructures
import Spark.Core.Internal.OpFunctions
import Spark.Core.Internal.Utilities
import Spark.Core.Internal.RowUtils
import Spark.Core.Internal.TypesGenerics
nodeOp :: ComputeNode loc a -> NodeOp
nodeOp = _cnOp
nodeParents :: ComputeNode loc a -> [UntypedNode]
nodeParents = V.toList . _cnParents
nodeLogicalParents :: ComputeNode loc a -> Maybe [UntypedNode]
nodeLogicalParents = (V.toList <$>) . _cnLogicalParents
nodeLogicalDependencies :: ComputeNode loc a -> [UntypedNode]
nodeLogicalDependencies = V.toList . _cnLogicalDeps
nodeName :: ComputeNode loc a -> NodeName
nodeName node = fromMaybe (_defaultNodeName node) (_cnName node)
nodePath :: ComputeNode loc a -> NodePath
nodePath node =
if V.null . unNodePath . _cnPath $ node
then NodePath . V.singleton . nodeName $ node
else _cnPath node
nodeType :: ComputeNode loc a -> SQLType a
nodeType = SQLType . _cnType
identity :: ComputeNode loc a -> ComputeNode loc a
identity n = n2 `parents` [untyped n]
where n2 = emptyNodeStandard (nodeLocality n) (nodeType n) name
name = if _cnLocality n == Local
then "org.spark.LocalIdentity"
else "org.spark.Identity"
cache :: Dataset a -> Dataset a
cache n = n2 `parents` [untyped n]
where n2 = emptyNodeStandard (nodeLocality n) (nodeType n) opnameCache
opnameCache :: T.Text
opnameCache = "org.spark.Cache"
uncache :: ComputeNode loc a -> ComputeNode loc a
uncache n = n2 `parents` [untyped n]
where n2 = emptyNodeStandard (nodeLocality n) (nodeType n) opnameUnpersist
opnameUnpersist :: T.Text
opnameUnpersist = "org.spark.Unpersist"
autocache :: Dataset a -> Dataset a
autocache n = n2 `parents` [untyped n]
where n2 = emptyNodeStandard (nodeLocality n) (nodeType n) opnameAutocache
opnameAutocache :: T.Text
opnameAutocache = "org.spark.Autocache"
union :: Dataset a -> Dataset a -> Dataset a
union n1 n2 = n `parents` [untyped n1, untyped n2]
where n = emptyNodeStandard (nodeLocality n1) (nodeType n1) _opnameUnion
_opnameUnion :: T.Text
_opnameUnion = "org.spark.Union"
asDF :: ComputeNode LocDistributed a -> DataFrame
asDF = pure . _unsafeCastNode
asDS :: forall a. (SQLTypeable a) => DataFrame -> Try (Dataset a)
asDS df = do
n <- df
let dt = unSQLType (buildType :: SQLType a)
let dt' = unSQLType (nodeType n)
if dt == dt'
then pure (_unsafeCastNode n)
else tryError $ sformat ("Casting error: dataframe has type "%sh%" incompatible with type "%sh) dt' dt
asLocalObservable :: ComputeNode LocLocal a -> LocalFrame
asLocalObservable = pure . _unsafeCastNode
untyped :: ComputeNode loc a -> UntypedNode
untyped = _unsafeCastNode
untypedDataset :: ComputeNode LocDistributed a -> UntypedDataset
untypedDataset = _unsafeCastNode
parents :: ComputeNode loc a -> [UntypedNode] -> ComputeNode loc a
parents node l = updateNode node $ \n ->
n { _cnParents = V.fromList l V.++ _cnParents n }
logicalParents :: ComputeNode loc a -> [UntypedNode] -> ComputeNode loc a
logicalParents node l = updateNode node $ \n ->
n { _cnLogicalParents = pure . V.fromList $ l }
depends :: ComputeNode loc a -> [UntypedNode] -> ComputeNode loc a
depends node l = updateNode node $ \n ->
n { _cnLogicalDeps = V.fromList l }
castLocality :: forall a loc loc'. (
CheckedLocalityCast loc, CheckedLocalityCast loc') =>
ComputeNode loc a -> Try (ComputeNode loc' a)
castLocality node =
let
loc2 = _cnLocality node
locs = unTypedLocality <$> (_validLocalityValues :: [TypedLocality loc'])
in if locs == [loc2] then
pure $ node { _cnLocality = loc2 }
else
tryError $ sformat ("Wrong locality :"%shown%", expected: "%shown) loc2 locs
nodeId :: ComputeNode loc a -> NodeId
nodeId = _cnNodeId
updateNode :: ComputeNode loc a -> (ComputeNode loc a -> ComputeNode loc' b) -> ComputeNode loc' b
updateNode ds f = ds2 { _cnNodeId = id2 } where
ds2 = f ds
id2 = _nodeId ds2
nodeLocality :: ComputeNode loc a -> TypedLocality loc
nodeLocality = TypedLocality . _cnLocality
emptyDataset :: NodeOp -> SQLType a -> Dataset a
emptyDataset = _emptyNode
emptyLocalData :: NodeOp -> SQLType a -> LocalData a
emptyLocalData = _emptyNode
dataframe :: DataType -> [Cell] -> DataFrame
dataframe dt cells' = do
validCells <- tryEither $ sequence (checkCell dt <$> cells')
let jData = V.fromList (toJSON <$> validCells)
let op = NodeDistributedLit dt jData
return $ _emptyNode op (SQLType dt)
placeholderTyped :: forall a loc. (IsLocality loc) =>
SQLType a -> ComputeNode loc a
placeholderTyped tp =
let
so = makeOperator "org.spark.Placeholder" tp
(TypedLocality l) = _getTypedLocality :: TypedLocality loc
op = case l of
Local -> NodeLocalOp so
Distributed -> NodeDistributedOp so
in _emptyNode op tp
fun1ToOpTyped :: forall a loc a' loc'. (IsLocality loc) =>
SQLType a -> (ComputeNode loc a -> ComputeNode loc' a') -> NodeOp
fun1ToOpTyped sqlt f = nodeOp $ f (placeholderTyped sqlt)
fun2ToOpTyped :: forall a1 a2 a loc1 loc2 loc. (IsLocality loc1, IsLocality loc2) =>
SQLType a1 -> SQLType a2 -> (ComputeNode loc1 a1 -> ComputeNode loc2 a2 -> ComputeNode loc a) -> NodeOp
fun2ToOpTyped sqlt1 sqlt2 f = nodeOp $ f (placeholderTyped sqlt1) (placeholderTyped sqlt2)
nodeOpToFun1 :: forall a1 a2 loc1 loc2. (IsLocality loc1, SQLTypeable a2, IsLocality loc2) =>
NodeOp -> ComputeNode loc1 a1 -> ComputeNode loc2 a2
nodeOpToFun1 = nodeOpToFun1Typed (buildType :: SQLType a2)
nodeOpToFun1Typed :: forall a1 a2 loc1 loc2. (HasCallStack, IsLocality loc1, IsLocality loc2) =>
SQLType a2 -> NodeOp -> ComputeNode loc1 a1 -> ComputeNode loc2 a2
nodeOpToFun1Typed sqlt no node =
let n2 = _emptyNode no sqlt :: ComputeNode loc2 a2
in n2 `parents` [untyped node]
nodeOpToFun1Untyped :: forall loc1 loc2. (HasCallStack, IsLocality loc1, IsLocality loc2) =>
DataType -> NodeOp -> ComputeNode loc1 Cell -> ComputeNode loc2 Cell
nodeOpToFun1Untyped dt no node =
let n2 = _emptyNode no (SQLType dt) :: ComputeNode loc2 Cell
in n2 `parents` [untyped node]
nodeOpToFun2 :: forall a a1 a2 loc loc1 loc2. (SQLTypeable a, IsLocality loc, IsLocality loc1, IsLocality loc2) =>
NodeOp -> ComputeNode loc1 a1 -> ComputeNode loc2 a2 -> ComputeNode loc a
nodeOpToFun2 = nodeOpToFun2Typed (buildType :: SQLType a)
nodeOpToFun2Typed :: forall a a1 a2 loc loc1 loc2. (IsLocality loc, IsLocality loc1, IsLocality loc2) =>
SQLType a -> NodeOp -> ComputeNode loc1 a1 -> ComputeNode loc2 a2 -> ComputeNode loc a
nodeOpToFun2Typed sqlt no node1 node2 =
let n2 = _emptyNode no sqlt :: ComputeNode loc a
in n2 `parents` [untyped node1, untyped node2]
instance forall loc a. Show (ComputeNode loc a) where
show ld = let
txt = fromString "{}@{}{}{{}}" :: TF.Format
loc :: T.Text
loc = case nodeLocality ld of
TypedLocality Local -> "!"
TypedLocality Distributed -> ""
nn = unNodeName . nodeName $ ld
no = simpleShowOp . nodeOp $ ld
fields = T.pack . show . nodeType $ ld in
T.unpack $ toStrict $ TF.format txt (nn, no, loc, fields)
instance forall loc a. A.ToJSON (ComputeNode loc a) where
toJSON node = A.object [
"locality" .= nodeLocality node,
"name" .= nodeName node,
"op" .= (simpleShowOp . nodeOp $ node),
"extra" .= (extraNodeOpData . nodeOp $ node),
"parents" .= (nodeName <$> nodeParents node),
"logicalDependencies" .= (nodeName <$> nodeLogicalDependencies node),
"_type" .= (unSQLType . nodeType) node]
instance forall loc. A.ToJSON (TypedLocality loc) where
toJSON (TypedLocality Local) = A.String "local"
toJSON (TypedLocality Distributed) = A.String "distributed"
_unsafeCastNode :: CheckedLocalityCast loc2 => ComputeNode loc1 a -> ComputeNode loc2 b
_unsafeCastNode x = x {
_cnType = _cnType x,
_cnLocality = _cnLocality x
}
_unsafeCastNodeTyped :: TypedLocality loc2 -> ComputeNode loc1 a -> ComputeNode loc2 b
_unsafeCastNodeTyped l x = x {
_cnType = _cnType x,
_cnLocality = unTypedLocality l
}
_unsafeCastLoc :: CheckedLocalityCast loc' =>
TypedLocality loc -> TypedLocality loc'
_unsafeCastLoc (TypedLocality Local) =
checkLocalityValidity (TypedLocality Local)
_unsafeCastLoc (TypedLocality Distributed) =
checkLocalityValidity (TypedLocality Distributed)
checkLocalityValidity :: forall loc. (HasCallStack, CheckedLocalityCast loc) =>
TypedLocality loc -> TypedLocality loc
checkLocalityValidity x =
if x `notElem` _validLocalityValues
then
let msg = sformat ("CheckedLocalityCast: element "%shown%" not in the list of accepted values: "%shown)
x (_validLocalityValues :: [TypedLocality loc])
in failure msg x
else x
_nodeId :: ComputeNode loc a -> NodeId
_nodeId node =
let c1 = SHA.init
f2 = unNodeId . nodeId
c2 = hashUpdateNodeOp c1 (nodeOp node)
c3 = SHA.updates c2 $ f2 <$> nodeParents node
c4 = SHA.updates c3 $ f2 <$> nodeLogicalDependencies node
in
(NodeId . encode . SHA.finalize) c4
_defaultNodeName :: ComputeNode loc a -> NodeName
_defaultNodeName node =
let opName = (simpleShowOp . nodeOp) node
namePieces = T.splitOn (T.pack ".") opName
lastOpt = (listToMaybe . reverse) namePieces
l = fromMaybe (T.pack "???") lastOpt
idbs = nodeId node
idt = (T.take 6 . decodeUtf8 . unNodeId) idbs
n = T.concat [T.toLower l, T.pack "_", idt]
in NodeName n
_emptyNode :: forall loc a. (IsLocality loc) =>
NodeOp -> SQLType a -> ComputeNode loc a
_emptyNode op sqlt = _emptyNodeTyped (_getTypedLocality :: TypedLocality loc) sqlt op
_emptyNodeTyped :: forall loc a.
TypedLocality loc -> SQLType a -> NodeOp -> ComputeNode loc a
_emptyNodeTyped tloc (SQLType dt) op = updateNode (_unsafeCastNodeTyped tloc ds) id where
ds :: ComputeNode loc a
ds = ComputeNode {
_cnName = Nothing,
_cnOp = op,
_cnType = dt,
_cnParents = V.empty,
_cnLogicalParents = Nothing,
_cnLogicalDeps = V.empty,
_cnLocality = unTypedLocality tloc,
_cnNodeId = error "_emptyNode: _cnNodeId",
_cnPath = NodePath V.empty
}
emptyNodeStandard :: forall loc a.
TypedLocality loc -> SQLType a -> T.Text -> ComputeNode loc a
emptyNodeStandard tloc sqlt name = _emptyNodeTyped tloc sqlt op where
so = StandardOperator {
soName = name,
soOutputType = unSQLType sqlt,
soExtra = A.Null
}
op = if unTypedLocality tloc == Local
then NodeLocalOp so
else NodeDistributedOp so