{-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE OverloadedStrings #-} -- The communication protocol with the server module Spark.Core.Internal.Client where import Spark.Core.StructuresInternal import Spark.Core.Dataset(UntypedNode) import Spark.Core.Internal.Utilities import Spark.Core.Internal.TypesStructures(DataType) import Spark.Core.Internal.TypesFunctions() import Data.Text(Text, pack) import Data.Aeson import Data.Aeson.Types(Parser) import GHC.Generics -- Imports for the client {-| The ID of an RDD in Spark. -} data RDDId = RDDId { unRDDId :: !Int } deriving (Eq, Show, Ord) data LocalSessionId = LocalSessionId { unLocalSession :: !Text } deriving (Eq, Show) data Computation = Computation { cSessionId :: !LocalSessionId, cId :: !ComputationID, cNodes :: ![UntypedNode], -- Non-empty cTerminalNodes :: ![NodePath], -- The node at the top of the computation. -- Must be part of the terminal nodes. cCollectingNode :: !NodePath, -- This redundant information is not serialized. -- It is used internally to track the resulting nodes. cTerminalNodeIds :: ![NodeId] } deriving (Show, Generic) data BatchComputationKV = BatchComputationKV { bckvLocalPath :: !NodePath, bckvDeps :: ![NodePath], bckvResult :: !PossibleNodeStatus } deriving (Show, Generic) data BatchComputationResult = BatchComputationResult { bcrTargetLocalPath :: !NodePath, bcrResults :: ![(NodePath, [NodePath], PossibleNodeStatus)] } deriving (Show, Generic) data RDDInfo = RDDInfo { rddiId :: !RDDId, rddiClassName :: !Text, rddiRepr :: !Text, rddiParents :: ![RDDId] } deriving (Show, Generic) data SparkComputationItemStats = SparkComputationItemStats { scisRddInfo :: ![RDDInfo] } deriving (Show, Generic) data PossibleNodeStatus = NodeQueued | NodeRunning | NodeFinishedSuccess !(Maybe NodeComputationSuccess) !(Maybe SparkComputationItemStats) | NodeFinishedFailure NodeComputationFailure deriving (Show, Generic) data NodeComputationSuccess = NodeComputationSuccess { -- Because Row requires additional information to be deserialized. ncsData :: Value, -- The data type is also available, but it is not going to be parsed for now. ncsDataType :: DataType } deriving (Show, Generic) data NodeComputationFailure = NodeComputationFailure { ncfMessage :: !Text } deriving (Show, Generic) -- **** AESON INSTANCES *** instance ToJSON LocalSessionId where toJSON = toJSON . unLocalSession instance FromJSON RDDId where parseJSON x = RDDId <$> parseJSON x instance FromJSON RDDInfo where parseJSON = withObject "RDDInfo" $ \o -> do _id <- o .: "id" className <- o .: "className" repr <- o .: "repr" parents <- o .: "parents" return $ RDDInfo _id className repr parents instance FromJSON SparkComputationItemStats where parseJSON = withObject "SparkComputationItemStats" $ \o -> do rddinfo <- o .: "rddInfo" return $ SparkComputationItemStats rddinfo instance FromJSON BatchComputationKV where parseJSON = withObject "BatchComputationKV" $ \o -> do np <- o .: "localPath" deps <- o .: "pathDependencies" res <- o .: "result" return $ BatchComputationKV np deps res instance FromJSON BatchComputationResult where parseJSON = withObject "BatchComputationResult" $ \o -> do kvs <- o .: "results" tlp <- o .: "targetLocalPath" let f (BatchComputationKV k d v) = (k, d, v) return $ BatchComputationResult tlp (f <$> kvs) instance FromJSON NodeComputationSuccess where parseJSON = withObject "NodeComputationSuccess" $ \o -> NodeComputationSuccess <$> o .: "content" <*> o .: "type" -- Because we get a row back, we need to supply a SQLType for deserialization. instance FromJSON PossibleNodeStatus where parseJSON = let parseSuccess :: Object -> Parser PossibleNodeStatus parseSuccess o = NodeFinishedSuccess <$> o .:? "finalResult" <*> o .:? "stats" parseFailure :: Object -> Parser PossibleNodeStatus parseFailure o = (NodeFinishedFailure . NodeComputationFailure) <$> o .: pack "finalError" in withObject "PossibleNodeStatus" $ \o -> do status <- o .: pack "status" case pack status of "running" -> return NodeRunning "finished_success" -> parseSuccess o "finished_failure" -> parseFailure o "scheduled" -> return NodeQueued _ -> failure $ pack ("FromJSON PossibleNodeStatus " ++ show status)