{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE OverloadedStrings #-} module Spark.IO.Internal.InputGeneric( SparkPath(..), DataSchema(..), InputOptionValue(..), InputOptionKey(..), DataFormat(..), SourceDescription(..), generic', genericWithSchema', genericWithSchema ) where import Data.Text(Text) import Data.String(IsString(..)) import qualified Data.Map.Strict as M import qualified Data.Aeson as A import qualified Data.Text as T import Data.Aeson(toJSON, (.=)) -- import Debug.Trace import Spark.Core.Types import Spark.Core.Context import Spark.Core.Try import Spark.Core.Dataset import Spark.Core.Internal.Utilities(forceRight) import Spark.Core.Internal.DatasetFunctions(asDF, emptyDataset, emptyLocalData) import Spark.Core.Internal.TypesStructures(SQLType(..)) import Spark.Core.Internal.OpStructures {-| A path to some data that can be read by Spark. -} newtype SparkPath = SparkPath Text deriving (Show, Eq) {-| The schema policty with respect to a data source. It should either request Spark to infer the schema from the source, or it should try to match the source against a schema provided by the user. -} data DataSchema = InferSchema | UseSchema DataType deriving (Show, Eq) {-| The low-level option values accepted by the Spark reader API. -} data InputOptionValue = InputIntOption Int | InputDoubleOption Double | InputStringOption Text | InputBooleanOption Bool deriving (Eq, Show) instance A.ToJSON InputOptionValue where toJSON (InputIntOption i) = toJSON i toJSON (InputDoubleOption d) = toJSON d toJSON (InputStringOption s) = toJSON s toJSON (InputBooleanOption b) = toJSON b newtype InputOptionKey = InputOptionKey { unInputOptionKey :: Text } deriving (Eq, Show, Ord) {-| The type of the source. This enumeration contains all the data formats that are natively supported by Spark, either for input or for output, and allows the users to express their own format if requested. -} data DataFormat = JsonFormat | TextFormat | CsvFormat | CustomSourceFormat !Text deriving (Eq, Show) -- data InputSource = JsonSource | TextSource | CsvSource | InputSource SparkPath {-| A description of a data source, following Spark's reader API version 2. Eeach source constists in an input source (json, xml, etc.), an optional schema for this source, and a number of options specific to this source. Since this descriptions is rather low-level, a number of wrappers of provided for each of the most popular sources that are already built into Spark. -} data SourceDescription = SourceDescription { inputPath :: !SparkPath, inputSource :: !DataFormat, inputSchema :: !DataSchema, sdOptions :: !(M.Map InputOptionKey InputOptionValue), inputStamp :: !(Maybe DataInputStamp) } deriving (Eq, Show) instance IsString SparkPath where fromString = SparkPath . T.pack {-| Generates a dataframe from a source description. This may trigger some calculations on the Spark side if schema inference is required. -} generic' :: SourceDescription -> SparkState DataFrame generic' sd = do dtt <- _inferSchema sd return $ dtt >>= \dt -> genericWithSchema' dt sd {-| Generates a dataframe from a source description, and assumes a given schema. This schema overrides whatever may have been given in the source description. If the source description specified that the schema must be checked or inferred, this instruction is overriden. While this is convenient, it may lead to runtime errors that are hard to understand if the data does not follow the given schema. -} genericWithSchema' :: DataType -> SourceDescription -> DataFrame genericWithSchema' dt sd = asDF $ emptyDataset no (SQLType dt) where sd' = sd { inputSchema = UseSchema dt } so = StandardOperator { soName = "org.spark.GenericDatasource", soOutputType = dt, soExtra = A.toJSON sd' } no = NodeDistributedOp so {-| Generates a dataframe from a source description, and assumes a certain schema on the source. -} genericWithSchema :: forall a. (SQLTypeable a) => SourceDescription -> Dataset a genericWithSchema sd = let sqlt = buildType :: SQLType a dt = unSQLType sqlt in forceRight $ castType sqlt =<< genericWithSchema' dt sd -- Wraps the action of inferring the schema. -- This is not particularly efficient here: it does a first pass to get the -- schema, and then will do a second pass in order to read the data. _inferSchema :: SourceDescription -> SparkState (Try DataType) _inferSchema = executeCommand1 . _inferSchemaCmd -- TODO: this is a monoidal operation, it could be turned into a universal -- aggregator. _inferSchemaCmd :: SourceDescription -> LocalData DataType _inferSchemaCmd sd = emptyLocalData no sqlt where sqlt = buildType :: SQLType DataType dt = unSQLType sqlt so = StandardOperator { soName = "org.spark.InferSchema", soOutputType = dt, soExtra = A.toJSON sd } no = NodeOpaqueAggregator so instance A.ToJSON SparkPath where toJSON (SparkPath p) = toJSON p instance A.ToJSON DataSchema where toJSON InferSchema = "infer_schema" toJSON (UseSchema dt) = toJSON dt instance A.ToJSON DataFormat where toJSON JsonFormat = "json" toJSON TextFormat = "text" toJSON CsvFormat = "csv" toJSON (CustomSourceFormat s) = toJSON s instance A.ToJSON SourceDescription where toJSON sd = A.object [ "inputPath" .= toJSON (inputPath sd), "inputSource" .= toJSON (inputSource sd), "inputSchema" .= toJSON (inputSchema sd), "inputStamp" .= A.Null, "options" .= A.object (f <$> M.toList (sdOptions sd)) ] where f (k, v) = unInputOptionKey k .= toJSON v