{-|
A description of the operations that can be performed on
nodes and columns.
-}
module Spark.Core.Internal.OpStructures where

import Data.Text as T
import Data.Aeson(Value, Value(Null))
import Data.Vector(Vector)

import Spark.Core.StructuresInternal
import Spark.Core.Internal.TypesStructures(DataType, SQLType, SQLType(unSQLType))

{-| The invariant respected by a transform.

Depending on the value of the invariant, different optimizations
may be available.
-}
data TransformInvariant =
    -- | This operator has no special property. It may depend on
    -- the partitioning layout, the number of partitions, the order
    -- of elements in the partitions, etc.
    -- This sort of operator is unwelcome in Krapsh...
    Opaque
    -- | This operator respects the canonical partition order, but may
    -- not have the same number of elements.
    -- For example, this could be a flatMap on an RDD (filter, etc.).
    -- This operator can be used locally with the signature a -> [a]
  | PartitioningInvariant
    -- | The strongest invariant. It respects the canonical partition order
    -- and it outputs the same number of elements.
    -- This is typically a map.
    -- This operator can be used locally with the signature a -> a
  | DirectPartitioningInvariant


-- | The dynamic value of locality.
-- There is still a tag on it, but it can be easily dropped.
data Locality =
    -- | The data associated to this node is local. It can be materialized
    -- and accessed by the user.
    Local
    -- | The data associated to this node is distributed or not accessible
    -- locally. It cannot be accessed by the user.
  | Distributed deriving (Show, Eq)

-- ********* PHYSICAL OPERATORS ***********
-- These structures declare some operations that correspond to operations found
-- in Spark itself, or in the surrounding libraries.

-- | An operator defined by default in the release of Krapsh.
-- All other physical operators can be converted to a standard operators.
data StandardOperator = StandardOperator {
  soName :: !T.Text,
  soOutputType :: !DataType,
  soExtra :: !Value
} deriving (Eq, Show)

-- | A scala method of a singleton object.
data ScalaStaticFunctionApplication = ScalaStaticFunctionApplication {
  sfaObjectName :: !T.Text,
  sfaMethodName :: !T.Text
  -- TODO add the input and output types?
}


-- | The different kinds of column operations.
-- These operations describe the physical operations on columns as supported
-- by Spark SQL. They can operate on column -> column, column -> row, row->row.
-- Of course, not all operators are valid for each configuration.
data ColOp =
    -- | A projection onto a single column
    -- An extraction is always direct.
    ColExtraction !FieldPath
    -- | A function of other columns.
    -- In this case, the other columns may matter
    -- TODO(kps) add if this function is partition invariant.
    -- It should be the case most of the time.
  | ColFunction !T.Text !(Vector ColOp)
    -- | A constant defined for each element.
    -- The type should be the same as for the column
    -- A literal is always direct
  | ColLit !DataType !Value
    -- | A structure.
  | ColStruct !(Vector TransformField)
  deriving (Eq, Show)

-- | A field in a structure.
data TransformField = TransformField {
  tfName :: !FieldName,
  tfValue :: !ColOp
} deriving (Eq, Show)

-- | The content of a structured transform.
data StructuredTransform =
    InnerOp !ColOp
  | InnerStruct !(Vector TransformField)
  deriving (Eq, Show)


-- ********* DATASET OPERATORS ************
-- These describe Dataset -> Dataset transforms.


data DatasetTransformDesc =
    DSScalaStaticFunction !ScalaStaticFunctionApplication
  | DSStructuredTransform !ColOp
  | DSOperator !StandardOperator


-- ****** OBSERVABLE OPERATORS *******
-- These operators describe Observable -> Observable transforms

-- **** AGGREGATION OPERATORS *****
-- The different types of aggregators

-- The low-level description of a
-- The name of the aggregator is the name of the
-- Dataset -> Local data transform
data UniversalAggregatorOp = UniversalAggregatorOp {
  uaoMergeType :: !DataType,
  uaoInitialOuter :: !StandardOperator,
  uaoMergeBuffer :: !StandardOperator
} deriving (Eq, Show)

{-
A node operation.
A description of all the operations between nodes.
These are the low-level, physical operations that Spark implements.

Each node operation is associated with:
 - a locality
 - an operation name (implicit or explicit)
 - a data type
 - a representation in JSON

Additionally, some operations are associated with algebraic invariants
to enable programmatic transformations.
-}
data NodeOp =
    -- | An operation between local nodes: [Observable] -> Observable
    NodeLocalOp StandardOperator
    -- | An observable literal
  | NodeLocalLit !DataType !Value
    -- | Some aggregator that does not respect any particular invariant.
  | NodeOpaqueAggregator StandardOperator
    -- | A universal aggregator.
  | NodeUniversalAggregator UniversalAggregatorOp
    -- | A structured transform, performed either on a local node or a
    -- distributed node.
  | NodeStructuredTransform !ColOp
    -- | A distributed dataset (with no partition information)
  | NodeDistributedLit !DataType !(Vector Value)
    -- | An opaque distributed operator.
  | NodeDistributedOp StandardOperator
  deriving (Eq, Show)

-- | Makes a standard operator with no extra value
makeOperator :: T.Text -> SQLType a -> StandardOperator
makeOperator txt sqlt =
  StandardOperator {
    soName = txt,
    soOutputType = unSQLType sqlt,
    soExtra = Null }