{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE OverloadedStrings #-} module Spark.IO.Internal.OutputCommon( SaveMode(..), OutputBucket, DynOutputBucket, OutputPartition, DynOutputPartition, SavingDescription(..), partition, partition', bucket, bucket', saveDefaults, saveCol ) where -- import Data.Text(Text) -- import qualified Data.Map.Strict as M -- import qualified Data.Aeson as A -- import Data.Aeson(toJSON, (.=)) -- import Spark.Core.Types -- import Spark.Core.Context import Spark.Core.Try import Spark.Core.Column -- import Spark.Core.ColumnFunctions -- import Spark.Core.Row import Spark.Core.Dataset import Spark.Core.Internal.ColumnStructures(UnknownReference, UntypedColumnData) import Spark.Core.Internal.ColumnFunctions(dropColReference) import Spark.Core.Internal.Utilities import Spark.IO.Internal.InputGeneric {-| The mode when saving the data. -} data SaveMode = Overwrite | Append | Ignore | ErrorIfExists deriving(Eq, Show) data OutputPartition ref = OutputPartition UntypedColumnData type DynOutputPartition = Try (OutputPartition UnknownReference) data OutputBucket ref = OutputBucket UntypedColumnData type DynOutputBucket = Try (OutputBucket UnknownReference) partition :: Column ref a -> OutputPartition ref partition = OutputPartition . dropColType . dropColReference partition' :: DynColumn -> DynOutputPartition partition' = fmap partition bucket :: Column ref a -> OutputBucket ref bucket = OutputBucket . dropColType . dropColReference bucket' :: DynColumn -> DynOutputBucket bucket' = fmap bucket data SavingDescription ref a = SavingDescription { partitions :: ![OutputPartition ref], buckets :: ![OutputBucket ref], savedCol :: !(Column ref a), saveFormat :: !DataFormat, savePath :: !SparkPath } saveDefaults :: SparkPath -> DataFormat -> Column ref a -> SavingDescription ref a saveDefaults sp f c = SavingDescription { partitions = [], buckets = [], savedCol = c, saveFormat = f, savePath = sp } {-| Inserts an action to store the given dataframe in the graph of computations. NOTE: Because of some limitations in Spark, all the columns used when forming the buckets and the parttions must be present inside the column being written. These columns will be appended to the column being written if they happen to be missing. The consequence is that more data may be written than expected. It returns true if the update was successful. The return type is subject to change. -} saveCol :: SavingDescription ref a -> LocalData Bool saveCol _ = missing "saveCol" -- test :: Int -- test = -- let c = undefined :: Column Int Int -- ld = saveCol (saveDefaults undefined JsonFormat c) { partitions = [partition c, partition c] } -- in 3 -- -- repeatDS :: Column ref Int -> Column ref a -> Dataset a -- -- repeatFast :: Column ref Int -> Column ref a -> Dataset a -- -- repeatScatter :: Int -> Column ref Int -> Column ref a -> Dataset a