{-# OPTIONS_GHC -fno-warn-orphans #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE FlexibleContexts #-} -- Implements a DSL for extracting some columns from dataframes and datasets. module Spark.Core.Internal.ColumnFunctions( -- Accessors colType, colOrigin, colOp, colFieldName, -- Standard functions broadcast, broadcast', -- Internal API iUntypedColData, iEmptyCol, -- Developer API (projections) -- unsafeStaticProjection, dropColReference, dropColType, extractPathUnsafe, colExtraction, unsafeProjectCol, genColOp, homoColOp2, makeColOp1, -- -- Developer API (projection builders) -- dynamicProjection, -- stringToDynColProj, -- pathToDynColProj, -- colStaticProjToDynProj, -- -- Developer API (projection transformers) -- projectDSDyn, -- projectDFDyn, -- projectDsCol, -- projectColCol, -- projectColDynCol, -- projectDColDCol, -- Public functions applyCol1, untypedCol, colFromObs, colFromObs', castTypeCol, castCol, castCol', colRef ) where import qualified Data.Text as T import qualified Data.Text.Format as TF import qualified Data.Vector as V import Data.String(IsString(fromString)) import Data.Text.Lazy(toStrict) import Data.Maybe(fromMaybe) import Data.List(find) import Formatting import Spark.Core.Internal.ColumnStructures import Spark.Core.Internal.DatasetFunctions import Spark.Core.Internal.DatasetStructures import Spark.Core.Internal.TypesStructures import Spark.Core.StructuresInternal import Spark.Core.Internal.TypesFunctions import Spark.Core.Internal.OpStructures import Spark.Core.Internal.OpFunctions(prettyShowColOp, prettyShowColFun) import Spark.Core.Internal.AlgebraStructures import Spark.Core.Internal.Utilities import Spark.Core.Internal.TypesGenerics(SQLTypeable, buildType) import Spark.Core.Try -- ********** Public methods ******** {-| The type of a column. -} colType :: Column ref a -> SQLType a colType = SQLType . _cType {-| Converts a type column to an antyped column. -} untypedCol :: Column ref a -> DynColumn untypedCol = pure . dropColType . dropColReference {-| Drops the type information, but kees the reference. -} dropColType :: Column ref a -> GenericColumn ref dropColType = _unsafeCastColData {-| Casts a dynamic column to a statically typed column. In this case, one must supply the reference (which can be obtained from another column with colRef, or from a dataset), and a type (which can be built using the buildType function). -} castCol :: ColumnReference ref -> SQLType a -> DynColumn -> Try (Column ref a) castCol r sqlt dc = dc >>= castTypeCol sqlt >>= _checkedCastRefColData r {-| Casts a dynamic column to a statically typed column, but does not attempt to enforce a single origin at the type level. This is useful when building a dataset from a dataframe: the origin information cannot be conveyed since it is not available in the first place. -} castCol' :: SQLType a -> DynColumn -> Try (Column UnknownReference a) castCol' = castCol ColumnReference -- | (internal) castTypeCol :: SQLType b -> ColumnData ref a -> Try (ColumnData ref b) castTypeCol sqlt cd = if unSQLType sqlt == unSQLType (colType cd) then pure (_unsafeCastColData cd) else tryError $ sformat ("Cannot cast column "%sh%" to type "%sh) cd sqlt {-| Takes some local data (contained in an observable) and broadacasts it along a reference column. -} -- TODO: it would be more logical to swap the inputs broadcast :: LocalData a -> Column ref b -> Column ref a broadcast ld c = ColumnData { _cOrigin = colOrigin c, _cType = unSQLType (nodeType ld), _cOp = BroadcastColOp (untypedLocalData ld), _cReferingPath = Nothing } broadcast' :: LocalFrame -> DynColumn -> DynColumn broadcast' lf dc = do ld <- lf c <- dc return $ broadcast ld c -- (internal) colOrigin :: Column ref a -> UntypedDataset colOrigin = _cOrigin -- (internal) colOp :: Column ref a -> GeneralizedColOp colOp = _cOp {-| A tag with the reference of a column. This is useful when casting dynamic columns to typed columns. -} colRef :: Column ref a -> ColumnReference ref colRef _ = ColumnReference -- | Takes an observable and makes it available as a column of the same type. colFromObs :: (HasCallStack) => LocalData a -> Column (LocalData a) a colFromObs = missing "colFromObs" -- | Takes a dynamic observable and makes it available as a dynamic column. colFromObs' :: (HasCallStack) => LocalFrame -> DynColumn colFromObs' = missing "colFromObs'" -- | (internal) colFieldName :: ColumnData ref a -> FieldName colFieldName c = fromMaybe (unsafeFieldName . _prettyShowColOp . _cOp $ c) (_cReferingPath c) {-| A converience function for applying one-argument typed functions to dynamic column. -} applyCol1 :: forall x y. (SQLTypeable x) => (forall ref. Column ref x -> Column ref y) -> DynColumn -> DynColumn applyCol1 f dc = do c <- dc let t = buildType :: SQLType x c1 <- castCol (colRef c) t dc let c2 = f c1 untypedCol c2 -- ******** Operations on column operations ******** genColOp :: ColOp -> GeneralizedColOp genColOp (ColExtraction fp) = GenColExtraction fp genColOp (ColFunction n v) = GenColFunction n (genColOp <$> v) -- TODO: replace in the ColOp by Cell instead of JSON. genColOp (ColLit dt _) = GenColLit dt (missing "genColOp (ColLit dt c)") genColOp (ColStruct v) = GenColStruct (f <$> v) where f (TransformField n v') = GeneralizedTransField n (genColOp v') -- ********* Projection operations *********** -- ****** Functions that build projections ******* iUntypedColData :: Column ref a -> UntypedColumnData iUntypedColData = _unsafeCastColData . dropColReference -- Recasts the column, trusting the user knows that the type is going to be compatible. _unsafeCastColData :: Column ref a -> Column ref b _unsafeCastColData c = c { _cType = _cType c } _checkedCastColData :: SQLType b -> ColumnData ref a -> Try (ColumnData ref b) _checkedCastColData sqlt cd = if unSQLType sqlt == unSQLType (colType cd) then pure (_unsafeCastColData cd) else tryError $ sformat ("Cannot cast column "%sh%" to type "%sh) cd sqlt _checkedCastRefColData :: ColumnReference ref2 -> ColumnData ref a -> Try (ColumnData ref2 a) _checkedCastRefColData _ cd = -- TODO: do some dynamic checks on the origin. pure $ cd { _cType = _cType cd } -- Performs the data projection. This is unsafe, it does not check that the -- field path is valid in this case, nor that the final type is valid either. unsafeProjectCol :: ColumnData ref a -> FieldPath -> DataType -> ColumnData ref b unsafeProjectCol cd (FieldPath p) dtTo = -- If the column is already a projection, flatten it. case colOp cd of -- No previous parent on an extraction -> we can safely append to that one. GenColExtraction (FieldPath p') -> cd { _cOp = GenColExtraction . FieldPath $ (p V.++ p'), _cType = dtTo} _ -> -- Extract from the previous column. cd { _cOp = GenColExtraction . FieldPath $ p, _cType = dtTo} extractPathUnsafe :: SQLType from -> FieldPath -> Maybe (SQLType to) extractPathUnsafe sqlt (FieldPath v) = _extractPath0 sqlt (V.toList v) _extractPath0 :: SQLType from -> [FieldName] -> Maybe (SQLType to) _extractPath0 sqlt [] = Just (unsafeCastType sqlt) _extractPath0 sqlt (field : l) = do inner <- _extractField sqlt field _extractPath0 inner l _extractField :: SQLType from -> FieldName -> Maybe (SQLType to) _extractField (SQLType (StrictType (Struct (StructType fields)))) f = -- There is probably a way to make it shorter... let z = find (\x -> structFieldName x == f) fields in SQLType . structFieldType <$> z _extractField (SQLType (NullableType (Struct (StructType fields)))) f = -- There is probably a way to make it shorter... let z = find (\x -> structFieldName x == f) fields in SQLType . structFieldType <$> z _extractField _ _ = Nothing dropColReference :: ColumnData ref a -> ColumnData UnknownReference a dropColReference c = c {_cOp = _cOp c} -- | (internal) creates a new column with some empty data iEmptyCol :: Dataset a -> SQLType b -> FieldPath -> Column a b iEmptyCol = _emptyColData -- | (internal) Creates a new column with a dynamic type. colExtraction :: Dataset a -> DataType -> FieldPath -> DynColumn colExtraction ds dt fp = pure $ dropColReference $ _emptyColData ds (SQLType dt) fp -- | Homogeneous operation betweet 2 columns. homoColOp2 :: T.Text -> Column ref x -> Column ref x -> Column ref x homoColOp2 opName c1 c2 = let co = GenColFunction opName (V.fromList (colOp <$> [c1, c2])) in ColumnData { _cOrigin = _cOrigin c1, _cType = _cType c1, _cOp = co, _cReferingPath = Nothing } makeColOp1 :: T.Text -> SQLType y -> Column ref x -> Column ref y makeColOp1 opName sqlt c = let co = GenColFunction opName (V.fromList (colOp <$> [c])) in ColumnData { _cOrigin = _cOrigin c, _cType = unSQLType sqlt, _cOp = co, _cReferingPath = Nothing } _prettyShowColOp :: GeneralizedColOp -> T.Text _prettyShowColOp (GenColExtraction fp) = prettyShowColOp (ColExtraction fp) _prettyShowColOp (GenColFunction n v) = prettyShowColFun n (V.toList (_prettyShowColOp <$> v)) _prettyShowColOp (GenColLit _ c) = show' c _prettyShowColOp (BroadcastColOp uld) = "BROADCAST(" <> prettyNodePath (nodePath uld) <> ")" _prettyShowColOp (GenColStruct v) = "struct(" <> T.intercalate "," (_prettyShowColOp . gtfValue <$> V.toList v) <> ")" -- A new column data structure. _emptyColData :: Dataset a -> SQLType b -> FieldPath -> ColumnData a b _emptyColData ds sqlt path = ColumnData { _cOrigin = untypedDataset ds, _cType = unSQLType sqlt, _cOp = GenColExtraction path, _cReferingPath = Nothing } _homoColOp2' :: T.Text -> DynColumn -> DynColumn -> DynColumn _homoColOp2' opName c1' c2' = do c1 <- c1' c2 <- c2' -- TODO check same origin return $ homoColOp2 opName c1 c2 -- ******** Displaying and pretty printing ************ instance forall ref a. Show (Column ref a) where show c = let name = case _cReferingPath c of Just fn -> show' fn Nothing -> _prettyShowColOp . colOp $ c txt = fromString "{}{{}}->{}" :: TF.Format -- path = T.pack . show . _cReferingPath $ c -- no = prettyShowColOp . colOp $ c fields = T.pack . show . colType $ c nn = prettyNodePath . nodePath . _cOrigin $ c in T.unpack $ toStrict $ TF.format txt (name, fields, nn) -- *********** Arithmetic operations ********** instance forall a. HomoBinaryOp2 a a a where _liftFun = BinaryOpFun id id instance forall ref a. HomoBinaryOp2 (Column ref a) DynColumn DynColumn where _liftFun = BinaryOpFun untypedCol id instance forall ref a. HomoBinaryOp2 DynColumn (Column ref a) DynColumn where _liftFun = BinaryOpFun id untypedCol instance (Fractional x) => Fractional (Column ref x) where (/) = homoColOp2 "/" recip = missing "Fractional (Column ref x): recip" fromRational = missing "Fractional (Column ref x): fromRational" instance (Num x) => Num (Column ref x) where (+) = homoColOp2 "+" (*) = homoColOp2 "*" abs _ = missing "Num (Column x): abs" signum _ = missing "Num (Column x): signum" fromInteger _ = missing "Num (Column x): fromInteger" negate _ = missing "Num (Column x): negate" instance Num DynColumn where (+) = _homoColOp2' "+" (*) = _homoColOp2' "*" abs _ = missing "Num (DynColumn x): abs" signum _ = missing "Num (DynColumn x): signum" fromInteger _ = missing "Num (DynColumn x): fromInteger" negate _ = missing "Num (DynColumn x): negate"