{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} module Spark.IO.Internal.Json( JsonMode, JsonOptions(..), json', json, jsonInfer, jsonOpt', jsonOpt, defaultJsonOptions ) where import qualified Data.Map.Strict as M import Data.Text(pack) import Spark.Core.Types import Spark.Core.Dataset(DataFrame, Dataset, castType') import Spark.Core.Context import Spark.Core.Try import Spark.IO.Internal.InputGeneric {-| -} data JsonMode = Permissive | DropMalformed | FailFast {-| The options for the json input. -} data JsonOptions = JsonOptions { mode :: !JsonMode, jsonSchema :: !DataSchema } {-| Declares a source of data of the given data type. The source is not read at this point, it is just declared. It may be found to be invalid in subsequent computations. -} json' :: DataType -> String -> DataFrame json' dt p = genericWithSchema' dt (_jsonSourceDescription (SparkPath (pack p)) defaultJsonOptions) {-| Declares a source of data of the given data type. The source is not read at this point, it is just declared. -} json :: (SQLTypeable a) => String -> Dataset a json p = genericWithSchema (_jsonSourceDescription (SparkPath (pack p)) defaultJsonOptions) {-| Reads a source of data expected to be in the JSON format. The schema is not required and Spark will infer the schema of the source. However, all the data contained in the source may end up being read in the process. -} jsonInfer :: SparkPath -> SparkState DataFrame jsonInfer = jsonOpt' defaultJsonOptions {-| Reads a source of data expected to be in the JSON format. The schema is not required and Spark will infer the schema of the source. However, all the data contained in the source may end up being read in the process. -} jsonOpt' :: JsonOptions -> SparkPath -> SparkState DataFrame jsonOpt' jo sp = generic' (_jsonSourceDescription sp jo) {-| Reads a source of data expected to be in the JSON format. The schema is not required and Spark will infer the schema of the source. However, all the data contained in the source may end up being read in the process. -} jsonOpt :: forall a. (SQLTypeable a) => JsonOptions -> SparkPath -> SparkState (Try (Dataset a)) jsonOpt jo sp = let sqlt = buildType :: SQLType a dt = unSQLType sqlt jo' = jo { jsonSchema = UseSchema dt } in castType' sqlt <$> jsonOpt' jo' sp defaultJsonOptions :: JsonOptions defaultJsonOptions = JsonOptions { -- Fail fast by default, to be conservative about errors, -- and respect the strictness arguments. mode = FailFast, jsonSchema = InferSchema } _jsonSourceDescription :: SparkPath -> JsonOptions -> SourceDescription _jsonSourceDescription sp jo = SourceDescription { inputSource = JsonFormat, inputPath = sp, inputSchema = jsonSchema jo, sdOptions = _jsonOptions jo, inputStamp = Nothing } _jsonOptions :: JsonOptions -> M.Map InputOptionKey InputOptionValue _jsonOptions jo = M.fromList [(InputOptionKey "mode", _mode (mode jo))] _mode :: JsonMode -> InputOptionValue _mode Permissive = InputStringOption "PERMISSIVE" _mode DropMalformed = InputStringOption "DROPMALFORMED" _mode FailFast = InputStringOption "FAILFAST"