module Spark.IO.Internal.Json(
JsonMode,
JsonOptions(..),
json',
json,
jsonInfer,
jsonOpt',
jsonOpt,
defaultJsonOptions
) where
import qualified Data.Map.Strict as M
import Data.Text(pack)
import Spark.Core.Types
import Spark.Core.Dataset(DataFrame, Dataset, castType')
import Spark.Core.Context
import Spark.Core.Try
import Spark.IO.Internal.InputGeneric
data JsonMode = Permissive | DropMalformed | FailFast
data JsonOptions = JsonOptions {
mode :: !JsonMode,
jsonSchema :: !DataSchema
}
json' :: DataType -> String -> DataFrame
json' dt p = genericWithSchema' dt (_jsonSourceDescription (SparkPath (pack p)) defaultJsonOptions)
json :: (SQLTypeable a) => String -> Dataset a
json p = genericWithSchema (_jsonSourceDescription (SparkPath (pack p)) defaultJsonOptions)
jsonInfer :: SparkPath -> SparkState DataFrame
jsonInfer = jsonOpt' defaultJsonOptions
jsonOpt' :: JsonOptions -> SparkPath -> SparkState DataFrame
jsonOpt' jo sp = generic' (_jsonSourceDescription sp jo)
jsonOpt :: forall a. (SQLTypeable a) => JsonOptions -> SparkPath -> SparkState (Try (Dataset a))
jsonOpt jo sp =
let sqlt = buildType :: SQLType a
dt = unSQLType sqlt
jo' = jo { jsonSchema = UseSchema dt }
in castType' sqlt <$> jsonOpt' jo' sp
defaultJsonOptions :: JsonOptions
defaultJsonOptions = JsonOptions {
mode = FailFast,
jsonSchema = InferSchema
}
_jsonSourceDescription :: SparkPath -> JsonOptions -> SourceDescription
_jsonSourceDescription sp jo = SourceDescription {
inputSource = JsonFormat,
inputPath = sp,
inputSchema = jsonSchema jo,
sdOptions = _jsonOptions jo,
inputStamp = Nothing
}
_jsonOptions :: JsonOptions -> M.Map InputOptionKey InputOptionValue
_jsonOptions jo = M.fromList [(InputOptionKey "mode", _mode (mode jo))]
_mode :: JsonMode -> InputOptionValue
_mode Permissive = InputStringOption "PERMISSIVE"
_mode DropMalformed = InputStringOption "DROPMALFORMED"
_mode FailFast = InputStringOption "FAILFAST"