{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}

-- Implements a DSL for extracting some columns from dataframes and datasets.

module Spark.Core.Internal.ColumnFunctions(
  -- Accessors
  colType,
  colOrigin,
  colOp,
  colFieldName,
  -- Internal API
  iUntypedColData,
  iEmptyCol,
  -- Developer API (projections)
  unsafeStaticProjection,
  dynamicProjection,
  dropColReference,
  -- Public functions
  untypedCol,
  colFromObs,
  colFromObs',
  castTypeCol,
  castCol,
  castCol',
  colRef
) where

import qualified Data.Text as T
import qualified Data.Text.Format as TF
import qualified Data.Vector as V
import Data.String(IsString(fromString))
import Data.Text.Lazy(toStrict)
import Data.Maybe(fromMaybe)
import Data.List(find)
import Formatting

import Spark.Core.Internal.ColumnStructures
import Spark.Core.Internal.DatasetFunctions
import Spark.Core.Internal.DatasetStructures
import Spark.Core.Internal.TypesStructures
import Spark.Core.Try
import Spark.Core.StructuresInternal
import Spark.Core.Internal.TypesFunctions
import Spark.Core.Internal.OpStructures
import Spark.Core.Internal.OpFunctions(prettyShowColOp)
import Spark.Core.Internal.AlgebraStructures
import Spark.Core.Internal.Utilities

-- ********** Public methods ********

{-| Lets the users define their own static projections.

Throws an error if the type cannot be found, so should be used with caution.

String has to be used because of type inferrence issues
-}
unsafeStaticProjection :: forall from to. (HasCallStack) =>
  SQLType from     -- ^ The start type
  -> String        -- ^ The name of a field assumed to be found in the start type.
                   --   This only has to be valid for Spark purposes, not
                   --   internal Haskell representation.
  -> StaticColProjection from to
unsafeStaticProjection sqlt field =
  let
    f = forceRight . fieldPath . T.pack $ field
    sqlt' = fromMaybe
      (failure $ sformat ("unsafeStaticProjection: Cannot find the field "%sh%" in type "%sh) field sqlt)
      (_extractPath sqlt f)
  in StaticColProjection (sqlt, f, sqlt')

-- Returns a projection from a path (even if invalid data)
dynamicProjection :: String -> DynamicColProjection
dynamicProjection txt = case fieldPath (T.pack txt) of
  Left msg -> DynamicColProjection $ \_ ->
    tryError $ sformat ("dynamicProjection: invalid syntax for path "%shown%": "%shown) txt msg
  Right fpath -> _dynamicProjection fpath

{-| The type of a column.
-}
colType :: Column ref a -> SQLType a
colType = SQLType . _cType

{-| Converts a type column to an antyped column.
-}
untypedCol :: Column ref a -> DynColumn
untypedCol = pure . _unsafeCastColData . dropColReference

{-| Casts a dynamic column to a statically typed column.

In this case, one must supply the reference (which can be obtained from
another column with colRef, or from a dataset), and a type (which can be
built using the buildType function).
-}
castCol :: ColumnReference ref -> SQLType a -> DynColumn -> Try (Column ref a)
castCol r sqlt dc =
  dc >>= castTypeCol sqlt >>= _checkedCastRefColData r

{-| Casts a dynamic column to a statically typed column, but does not attempt
to enforce a single origin at the type level.

This is useful when building a dataset from a dataframe: the origin information
cannot be conveyed since it is not available in the first place.
-}
castCol' :: SQLType a -> DynColumn -> Try (Column UnknownReference a)
castCol' = castCol ColumnReference


-- | (internal)
castTypeCol :: SQLType b -> ColumnData ref a -> Try (ColumnData ref b)
castTypeCol sqlt cd =
  if unSQLType sqlt == unSQLType (colType cd)
    then pure (_unsafeCastColData cd)
    else tryError $ sformat ("Cannot cast column "%sh%" to type "%sh) cd sqlt



-- (internal)
colOrigin :: Column ref a -> UntypedDataset
colOrigin = _cOrigin

-- (internal)
colOp :: Column ref a -> ColOp
colOp = _cOp

{-| A tag with the reference of a column.

This is useful when casting dynamic columns to typed columns.
-}
colRef :: Column ref a -> ColumnReference ref
colRef _ = ColumnReference

-- | Takes an observable and makes it available as a column of the same type.
colFromObs :: (HasCallStack) => LocalData a -> Column (LocalData a) a
colFromObs = missing "colFromObs"

-- | Takes a dynamic observable and makes it available as a dynamic column.
colFromObs' :: (HasCallStack) => LocalFrame -> DynColumn
colFromObs' = missing "colFromObs'"

-- | (internal)
colFieldName :: ColumnData ref a -> FieldName
colFieldName c =
  fromMaybe (unsafeFieldName . prettyShowColOp . _cOp $ c)
    (_cReferingPath c)

-- ********* Projection operations ***********

-- dataset -> static projection -> column
instance forall a to. Projection (Dataset a) (StaticColProjection a to) (Column a to) where
  _performProjection = _project

-- dataset -> dynamic projection -> DynColumn
instance forall a. Projection (Dataset a) DynamicColProjection DynColumn where
  _performProjection = _projectDyn

-- dataset -> string -> DynColumn
instance forall a . Projection (Dataset a) String DynColumn where
  _performProjection ds s = _projectDyn ds (_strToDynProj s)

-- dataframe -> dynamic projection -> dyncolumn
instance Projection DataFrame DynamicColProjection DynColumn where
  _performProjection = _projectDF

-- dataframe -> static projection -> dyncolumn
-- This is a relaxation as we could return Try (Column to) intead.
-- It makes more sense from an API perspective to just return a dynamic result.
instance forall a to. Projection DataFrame (StaticColProjection a to) DynColumn where
  _performProjection df proj = _projectDF df (_colStaticProjToDynProj proj)

-- dataframe -> string -> dyncolumn
instance Projection DataFrame String DynColumn where
  _performProjection df s = _projectDF df (_strToDynProj s)

-- column -> static projection -> column
instance forall ref a to. Projection (Column ref a) (StaticColProjection a to) (Column ref to) where
  _performProjection = _projectCol


-- dyncolumn -> dynamic projection -> dyncolumn
instance Projection DynColumn DynamicColProjection DynColumn where
  _performProjection = _projectDynCol

instance forall a to. Projection DynColumn (StaticColProjection a to) DynColumn where
  _performProjection dc proj = _projectDynCol dc (_colStaticProjToDynProj proj)

-- dyncolumn -> string -> dyncolumn
instance Projection DynColumn String DynColumn where
  _performProjection dc s = _performProjection dc (_strToDynProj s)

_strToDynProj :: String -> DynamicColProjection
_strToDynProj s =
  let
    fun dt =
      case fieldPath (T.pack s) of
        Right fp -> _dynProjTry (_dynamicProjection fp) dt
        Left msg -> tryError (T.pack msg)
  in DynamicColProjection fun

-- | Converts a static project to a dynamic projector.
_colStaticProjToDynProj :: forall from to. StaticColProjection from to -> DynamicColProjection
_colStaticProjToDynProj (StaticColProjection (SQLType dtFrom, fp, SQLType dtTo)) =
  DynamicColProjection $ \dt ->
    -- TODO factorize this as a projection on types.
    if dt /= dtFrom then
      tryError $ sformat ("Cannot convert type "%shown%" into type "%shown) dt dtFrom
    else pure (fp, dtTo)

iUntypedColData :: Column ref a -> UntypedColumnData
iUntypedColData = _unsafeCastColData . dropColReference

-- Recasts the column, trusting the user knows that the type is going to be compatible.
_unsafeCastColData :: Column ref a -> Column ref b
_unsafeCastColData c = c { _cType = _cType c }

_checkedCastColData :: SQLType b -> ColumnData ref a -> Try (ColumnData ref b)
_checkedCastColData sqlt cd =
  if unSQLType sqlt == unSQLType (colType cd)
    then pure (_unsafeCastColData cd)
    else tryError $ sformat ("Cannot cast column "%sh%" to type "%sh) cd sqlt

_checkedCastRefColData :: ColumnReference ref2 -> ColumnData ref a -> Try (ColumnData ref2 a)
_checkedCastRefColData _ cd =
  -- TODO: do some dynamic checks on the origin.
  pure $ cd { _cType = _cType cd }

_dynamicProjection :: FieldPath -> DynamicColProjection
_dynamicProjection fpath =
  let
    fun dt = case _extractPath (SQLType dt) fpath of
        Just (SQLType dt') -> pure (fpath, dt') -- TODO(kps) I have a doubt
        Nothing ->
          tryError $ sformat ("unsafeStaticProjection: Cannot find the field "%shown%" in type "%shown) fpath dt
   in DynamicColProjection fun

-- TODO: take a compute node instead
_projectDyn :: Dataset from -> DynamicColProjection -> DynColumn
_projectDyn ds proj = do
  (p, dt) <- _dynProjTry proj (unSQLType . nodeType $ ds)
  _emptyDynCol ds dt p

_projectDF :: DataFrame -> DynamicColProjection -> DynColumn
_projectDF df proj = do
  node <- df
  _projectDyn node proj

_project :: Dataset from -> StaticColProjection from to -> Column from to
_project ds proj = let (_, p, sqlt) = _staticProj proj in
  iEmptyCol ds sqlt p

_projectCol :: Column ref from -> StaticColProjection from to -> Column ref to
_projectCol c (StaticColProjection (_, fp, SQLType dt)) =
  _projectColData0 c fp dt

-- Performs the data projection. This is unsafe, it does not check that the
-- field path is valid in this case, nor that the final type is valid either.
_projectColData0 :: ColumnData ref a -> FieldPath -> DataType -> ColumnData ref b
_projectColData0 cd (FieldPath p) dtTo =
  -- If the column is already a projection, flatten it.
  case colOp cd of
    -- No previous parent on an extraction -> we can safely append to that one.
    ColExtraction (FieldPath p') ->
      cd { _cOp = ColExtraction (FieldPath (p V.++ p')),
           _cType = dtTo}
    _ ->
      -- Extract from the previous column.
      cd { _cOp = ColExtraction (FieldPath p),
          _cType = dtTo}

_projectDynColData :: ColumnData ref a -> DynamicColProjection -> DynColumn
_projectDynColData cd proj =
  _dynProjTry proj (_cType cd) <&> uncurry (_projectColData0 . dropColReference $ cd)

_projectDynCol :: DynColumn -> DynamicColProjection -> DynColumn
_projectDynCol c proj = do
  cd <- c
  _projectDynColData cd proj

_extractPath :: SQLType from -> FieldPath -> Maybe (SQLType to)
_extractPath sqlt (FieldPath v) = _extractPath0 sqlt (V.toList v)

_extractPath0 :: SQLType from -> [FieldName] -> Maybe (SQLType to)
_extractPath0 sqlt [] = Just (unsafeCastType sqlt)
_extractPath0 sqlt (field : l) = do
  inner <- _extractField sqlt field
  _extractPath0 inner l

_extractField :: SQLType from -> FieldName -> Maybe (SQLType to)
_extractField (SQLType (StrictType (Struct (StructType fields)))) f =
  -- There is probably a way to make it shorter...
  let z = find (\x -> structFieldName x == f) fields in
  SQLType . structFieldType <$> z
_extractField (SQLType (NullableType (Struct (StructType fields)))) f =
  -- There is probably a way to make it shorter...
  let z = find (\x -> structFieldName x == f) fields in
  SQLType . structFieldType <$> z
_extractField _ _ = Nothing

dropColReference :: ColumnData ref a -> ColumnData UnknownReference a
dropColReference c = c {_cOp = _cOp c}

-- | (internal) creates a new column with some empty data
iEmptyCol :: Dataset a -> SQLType b -> FieldPath -> Column a b
iEmptyCol = _emptyColData

-- | (internal) Creates a new column with a dynamic type.
_emptyDynCol :: Dataset a -> DataType -> FieldPath -> DynColumn
_emptyDynCol ds dt fp = Right $ dropColReference $ _emptyColData ds (SQLType dt) fp

-- A new column data structure.
_emptyColData :: Dataset a -> SQLType b -> FieldPath -> ColumnData a b
_emptyColData ds sqlt path = ColumnData {
  _cOrigin = untypedDataset ds,
  _cType = unSQLType sqlt,
  _cOp = ColExtraction path,
  _cReferingPath = Nothing
}

-- Homogeneous operation betweet 2 columns.
_homoColOp2 :: T.Text -> Column ref x -> Column ref x -> Column ref x
_homoColOp2 opName c1 c2 =
  let co = ColFunction opName (V.fromList (colOp <$> [c1, c2]))
  in ColumnData {
      _cOrigin = _cOrigin c1,
      _cType = _cType c1,
      _cOp = co,
      _cReferingPath = Nothing }

_homoColOp2' :: T.Text -> DynColumn -> DynColumn -> DynColumn
_homoColOp2' opName c1' c2' = do
  c1 <- c1'
  c2 <- c2'
  -- TODO check same origin
  return $ _homoColOp2 opName c1 c2

-- ******** Displaying and pretty printing ************

instance forall ref a. Show (Column ref a) where
  show c =
    let
      name = case _cReferingPath c of
        Just fn -> show' fn
        Nothing -> prettyShowColOp . colOp $ c
      txt = fromString "{}{{}}->{}" :: TF.Format
      -- path = T.pack . show . _cReferingPath $ c
      -- no = prettyShowColOp . colOp $ c
      fields = T.pack . show . colType $ c
      nn = unNodeName . nodeName . _cOrigin $ c
    in T.unpack $ toStrict $ TF.format txt (name, fields, nn)

-- *********** Renaming instances **************

instance forall ref a. CanRename (ColumnData ref a) FieldName where
  c @@ fn = c { _cReferingPath = Just fn }


instance forall ref a. CanRename (Column ref a) String where
  c @@ str = case fieldName (T.pack str) of
    Right fp -> c @@ fp
    Left msg ->
      -- The syntax check here is pretty lenient, so it fails, it has
      -- some good reasons. We stop here.
      failure $ sformat ("Could not make a field path out of string "%shown%" for column "%shown%":"%shown) str c msg

instance CanRename DynColumn FieldName where
  (Right cd) @@ fn = Right (cd @@ fn)
  -- TODO better error handling
  x @@ _ = x

instance CanRename DynColumn String where
  -- An error could happen when making a path out of a string.
  (Right cd) @@ str = case fieldName (T.pack str) of
    Right fp -> Right $ cd @@ fp
    Left msg ->
      -- The syntax check here is pretty lenient, so it fails, it has
      -- some good reasons. We stop here.
      tryError $ sformat ("Could not make a field path out of string "%shown%" for column "%shown%":"%shown) str cd msg
  -- TODO better error handling
  x @@ _ = x

-- *********** Arithmetic operations **********


instance forall a. HomoBinaryOp2 a a a where
  _liftFun = BinaryOpFun id id

instance forall ref a. HomoBinaryOp2 (Column ref a) DynColumn DynColumn where
  _liftFun = BinaryOpFun untypedCol id

instance forall ref a. HomoBinaryOp2 DynColumn (Column ref a) DynColumn where
  _liftFun = BinaryOpFun id untypedCol


instance (Num x) => Num (Column ref x) where
  (+) = _homoColOp2 "sum"
  (*) _ _ = missing "Num (Column x): *"
  abs _ = missing "Num (Column x): abs"
  signum _ = missing "Num (Column x): signum"
  fromInteger _ = missing "Num (Column x): fromInteger"
  negate _ = missing "Num (Column x): negate"

instance Num DynColumn where
  (+) = _homoColOp2' "sum"
  (*) _ _ = missing "Num (DynColumn x): *"
  abs _ = missing "Num (DynColumn x): abs"
  signum _ = missing "Num (DynColumn x): signum"
  fromInteger _ = missing "Num (DynColumn x): fromInteger"
  negate _ = missing "Num (DynColumn x): negate"