module Spark.Core.Internal.OpFunctions(
simpleShowOp,
prettyShowOp,
extraNodeOpData,
hashUpdateNodeOp,
prettyShowColOp,
hdfsPath,
updateSourceStamp,
prettyShowColFun
) where
import qualified Data.Text as T
import qualified Data.Aeson as A
import qualified Data.Vector as V
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as LBS
import qualified Data.HashMap.Strict as HM
import Data.Text(Text)
import Data.Aeson((.=), toJSON)
import Data.Char(isSymbol)
import qualified Crypto.Hash.SHA256 as SHA
import Spark.Core.Internal.OpStructures
import Spark.Core.Internal.Utilities
import Spark.Core.Try
simpleShowOp :: NodeOp -> T.Text
simpleShowOp (NodeLocalOp op) = soName op
simpleShowOp (NodeDistributedOp op) = soName op
simpleShowOp (NodeLocalLit _ _) = "org.spark.LocalLiteral"
simpleShowOp (NodeOpaqueAggregator op) = soName op
simpleShowOp (NodeAggregatorReduction ua) =
_jsonShowAggTrans . uaoInitialOuter $ ua
simpleShowOp (NodeAggregatorLocalReduction ua) = _jsonShowSGO . uaoMergeBuffer $ ua
simpleShowOp (NodeStructuredTransform _) = "org.spark.Select"
simpleShowOp (NodeDistributedLit _ _) = "org.spark.DistributedLiteral"
simpleShowOp (NodeGroupedReduction _) = "org.spark.GroupedReduction"
simpleShowOp (NodeReduction _) = "org.spark.Reduction"
simpleShowOp NodeBroadcastJoin = "org.spark.BroadcastJoin"
simpleShowOp (NodePointer _) = "org.spark.PlaceholderCache"
prettyShowOp :: NodeOp -> T.Text
prettyShowOp (NodeAggregatorReduction uao) =
case uaoInitialOuter uao of
OpaqueAggTransform so -> soName so
InnerAggOp (AggFunction n _) -> n
_ -> simpleShowOp (NodeAggregatorReduction uao)
prettyShowOp x = simpleShowOp x
prettyShowColOp :: ColOp -> T.Text
prettyShowColOp (ColExtraction fpath) = T.pack (show fpath)
prettyShowColOp (ColFunction txt cols) =
prettyShowColFun txt (V.toList (prettyShowColOp <$> cols))
prettyShowColOp (ColLit _ cell) = show' cell
prettyShowColOp (ColStruct s) =
"struct(" <> T.intercalate "," (prettyShowColOp . tfValue <$> V.toList s) <> ")"
hdfsPath :: NodeOp -> Maybe HdfsPath
hdfsPath (NodeDistributedOp so) =
if soName so == "org.spark.GenericDatasource"
then case soExtra so of
A.Object o -> case HM.lookup "inputPath" o of
Just (A.String x) -> Just . HdfsPath $ x
_ -> Nothing
_ -> Nothing
else Nothing
hdfsPath _ = Nothing
updateSourceStamp :: NodeOp -> DataInputStamp -> Try NodeOp
updateSourceStamp (NodeDistributedOp so) (DataInputStamp dis) | soName so == "org.spark.GenericDatasource" =
case soExtra so of
A.Object o ->
let extra' = A.Object $ HM.insert "inputStamp" (A.toJSON dis) o
so' = so { soExtra = extra' }
in pure $ NodeDistributedOp so'
x -> tryError $ "updateSourceStamp: Expected dict, got " <> show' x
updateSourceStamp x _ =
tryError $ "updateSourceStamp: Expected NodeDistributedOp, got " <> show' x
_jsonShowAggTrans :: AggTransform -> Text
_jsonShowAggTrans (OpaqueAggTransform op) = soName op
_jsonShowAggTrans (InnerAggOp _) = "org.spark.StructuredReduction"
_jsonShowSGO :: SemiGroupOperator -> Text
_jsonShowSGO (OpaqueSemiGroupLaw so) = soName so
_jsonShowSGO (UdafSemiGroupOperator ucn) = ucn
_jsonShowSGO (ColumnSemiGroupLaw sfn) = sfn
_prettyShowAggOp :: AggOp -> T.Text
_prettyShowAggOp (AggUdaf _ ucn fp) = ucn <> "(" <> show' fp <> ")"
_prettyShowAggOp (AggFunction sfn v) = prettyShowColFun sfn r where
r = V.toList (show' <$> v)
_prettyShowAggOp (AggStruct v) =
"struct(" <> T.intercalate "," (_prettyShowAggOp . afValue <$> V.toList v) <> ")"
_prettyShowAggTrans :: AggTransform -> Text
_prettyShowAggTrans (OpaqueAggTransform op) = soName op
_prettyShowAggTrans (InnerAggOp ao) = _prettyShowAggOp ao
_prettyShowSGO :: SemiGroupOperator -> Text
_prettyShowSGO (OpaqueSemiGroupLaw so) = soName so
_prettyShowSGO (UdafSemiGroupOperator ucn) = ucn
_prettyShowSGO (ColumnSemiGroupLaw sfn) = sfn
extraNodeOpData :: NodeOp -> A.Value
extraNodeOpData (NodeLocalLit dt cell) =
A.object [ "type" .= toJSON dt,
"content" .= toJSON cell]
extraNodeOpData (NodeStructuredTransform st) = toJSON st
extraNodeOpData (NodeDistributedLit dt lst) =
A.object [ "cellType" .= toJSON dt,
"content" .= toJSON lst]
extraNodeOpData (NodeDistributedOp so) = soExtra so
extraNodeOpData (NodeGroupedReduction ao) = toJSON ao
extraNodeOpData (NodeAggregatorReduction ua) =
case uaoInitialOuter ua of
OpaqueAggTransform so -> toJSON (soExtra so)
InnerAggOp ao -> toJSON ao
extraNodeOpData (NodeOpaqueAggregator so) = soExtra so
extraNodeOpData (NodeLocalOp so) = soExtra so
extraNodeOpData NodeBroadcastJoin = A.Null
extraNodeOpData (NodeReduction _) = A.Null
extraNodeOpData (NodeAggregatorLocalReduction _) = A.Null
extraNodeOpData (NodePointer p) =
A.object [
"computation" .= toJSON (pointerComputation p),
"path" .= toJSON (pointerPath p)
]
hashUpdateNodeOp :: SHA.Ctx -> NodeOp -> SHA.Ctx
hashUpdateNodeOp ctx op = _hashUpdateJson ctx $ A.object [
"op" .= simpleShowOp op,
"extra" .= extraNodeOpData op]
prettyShowColFun :: T.Text -> [Text] -> T.Text
prettyShowColFun txt [col] | _isSym txt =
T.concat [txt, " ", col]
prettyShowColFun txt [col1, col2] | _isSym txt =
T.concat [col1, " ", txt, " ", col2]
prettyShowColFun txt cols =
let vals = T.intercalate ", " cols in
T.concat [txt, "(", vals, ")"]
_isSym :: T.Text -> Bool
_isSym txt = all isSymbol (T.unpack txt)
instance A.ToJSON ColOp where
toJSON (ColExtraction fp) = A.object [
"colOp" .= T.pack "extraction",
"field" .= toJSON fp]
toJSON (ColFunction txt cols) = A.object [
"colOp" .= T.pack "fun",
"function" .= txt,
"args" .= (toJSON <$> cols)]
toJSON (ColLit _ cell) = A.object [
"colOp" .= T.pack "literal",
"lit" .= toJSON cell]
toJSON (ColStruct v) =
let fun (TransformField fn colOp) =
A.object ["name" .= T.pack (show fn), "op" .= toJSON colOp]
in A.Array $ fun <$> v
instance A.ToJSON UdafApplication where
toJSON Algebraic = toJSON (T.pack "algebraic")
toJSON Complete = toJSON (T.pack "complete")
instance A.ToJSON AggField where
toJSON (AggField fn aggOp) =
A.object ["name" .= show' fn, "op" .= toJSON aggOp]
instance A.ToJSON AggOp where
toJSON (AggUdaf ua ucn fp) = A.object [
"aggOp" .= T.pack "udaf",
"udafApplication" .= toJSON ua,
"className" .= ucn,
"field" .= toJSON fp]
toJSON (AggFunction sfn v) = A.object [
"aggOp" .= toJSON (T.pack "function"),
"functionName" .= toJSON sfn,
"fields" .= toJSON (V.toList v)]
toJSON (AggStruct v) = toJSON (V.toList v)
_hashUpdateJson :: SHA.Ctx -> A.Value -> SHA.Ctx
_hashUpdateJson ctx val = SHA.update ctx bs where
bs = BS.concat . LBS.toChunks . encodeDeterministicPretty $ val