module Spark.IO.Internal.InputGeneric(
SparkPath(..),
DataSchema(..),
InputOptionValue(..),
InputOptionKey(..),
DataFormat(..),
SourceDescription(..),
generic',
genericWithSchema',
genericWithSchema
) where
import Data.Text(Text)
import Data.String(IsString(..))
import qualified Data.Map.Strict as M
import qualified Data.Aeson as A
import qualified Data.Text as T
import Data.Aeson(toJSON, (.=))
import Spark.Core.Types
import Spark.Core.Context
import Spark.Core.Try
import Spark.Core.Dataset
import Spark.Core.Internal.Utilities(forceRight)
import Spark.Core.Internal.DatasetFunctions(asDF, emptyDataset, emptyLocalData)
import Spark.Core.Internal.TypesStructures(SQLType(..))
import Spark.Core.Internal.OpStructures
newtype SparkPath = SparkPath Text deriving (Show, Eq)
data DataSchema = InferSchema | UseSchema DataType deriving (Show, Eq)
data InputOptionValue =
InputIntOption Int
| InputDoubleOption Double
| InputStringOption Text
| InputBooleanOption Bool
deriving (Eq, Show)
instance A.ToJSON InputOptionValue where
toJSON (InputIntOption i) = toJSON i
toJSON (InputDoubleOption d) = toJSON d
toJSON (InputStringOption s) = toJSON s
toJSON (InputBooleanOption b) = toJSON b
newtype InputOptionKey = InputOptionKey { unInputOptionKey :: Text } deriving (Eq, Show, Ord)
data DataFormat =
JsonFormat
| TextFormat
| CsvFormat
| CustomSourceFormat !Text
deriving (Eq, Show)
data SourceDescription = SourceDescription {
inputPath :: !SparkPath,
inputSource :: !DataFormat,
inputSchema :: !DataSchema,
sdOptions :: !(M.Map InputOptionKey InputOptionValue),
inputStamp :: !(Maybe DataInputStamp)
} deriving (Eq, Show)
instance IsString SparkPath where
fromString = SparkPath . T.pack
generic' :: SourceDescription -> SparkState DataFrame
generic' sd = do
dtt <- _inferSchema sd
return $ dtt >>= \dt -> genericWithSchema' dt sd
genericWithSchema' :: DataType -> SourceDescription -> DataFrame
genericWithSchema' dt sd = asDF $ emptyDataset no (SQLType dt) where
sd' = sd { inputSchema = UseSchema dt }
so = StandardOperator {
soName = "org.spark.GenericDatasource",
soOutputType = dt,
soExtra = A.toJSON sd'
}
no = NodeDistributedOp so
genericWithSchema :: forall a. (SQLTypeable a) => SourceDescription -> Dataset a
genericWithSchema sd =
let sqlt = buildType :: SQLType a
dt = unSQLType sqlt in
forceRight $ castType sqlt =<< genericWithSchema' dt sd
_inferSchema :: SourceDescription -> SparkState (Try DataType)
_inferSchema = executeCommand1 . _inferSchemaCmd
_inferSchemaCmd :: SourceDescription -> LocalData DataType
_inferSchemaCmd sd = emptyLocalData no sqlt where
sqlt = buildType :: SQLType DataType
dt = unSQLType sqlt
so = StandardOperator {
soName = "org.spark.InferSchema",
soOutputType = dt,
soExtra = A.toJSON sd
}
no = NodeOpaqueAggregator so
instance A.ToJSON SparkPath where
toJSON (SparkPath p) = toJSON p
instance A.ToJSON DataSchema where
toJSON InferSchema = "infer_schema"
toJSON (UseSchema dt) = toJSON dt
instance A.ToJSON DataFormat where
toJSON JsonFormat = "json"
toJSON TextFormat = "text"
toJSON CsvFormat = "csv"
toJSON (CustomSourceFormat s) = toJSON s
instance A.ToJSON SourceDescription where
toJSON sd = A.object [
"inputPath" .= toJSON (inputPath sd),
"inputSource" .= toJSON (inputSource sd),
"inputSchema" .= toJSON (inputSchema sd),
"inputStamp" .= A.Null,
"options" .= A.object (f <$> M.toList (sdOptions sd))
] where
f (k, v) = unInputOptionKey k .= toJSON v