module Database.RethinkDB.MapReduce where
import Control.Monad.State
import Control.Monad.Writer
import qualified Data.Text as T
import Data.Maybe
import Data.Foldable (toList)
import Database.RethinkDB.Wire.Term
import Database.RethinkDB.ReQL
import Database.RethinkDB.Types
import qualified Database.RethinkDB.Functions as R
import Database.RethinkDB.NoClash hiding (get, collect, args)
import Database.RethinkDB.Datum
termToMapReduce :: (ReQL -> ReQL) -> State QuerySettings (ReQL -> ReQL)
termToMapReduce f = do
v <- newVarId
body <- runReQL $ f (op VAR [v])
return . applyChain $ toMapReduce v body
sameVar :: Int -> [Term] -> Bool
sameVar x [Datum d] | Success y <- fromDatum d = x == y
sameVar _ _ = False
notConst :: Chain -> Bool
notConst None{} = False
notConst SingletonArray{} = False
notConst _ = True
wrap :: Term -> ReQL
wrap = ReQL . return
toFun1 :: Term -> (ReQL -> ReQL)
toFun1 f a = op FUNCALL (wrap f, a)
toFun2 :: Term -> (ReQL -> ReQL -> ReQL)
toFun2 f a b = op FUNCALL (wrap f, a, b)
data MRF = MRF {
_mrfMapFun :: MapFun,
_mrfReduceFun :: ReQL -> ReQL -> ReQL,
_mrfBase :: Maybe ReQL,
_mrfFinally :: ReQL -> ReQL }
data Chain =
None ReQL |
Map [Map] |
MapReduceChain [Map] Reduce |
MapReduce MRF |
SingletonArray ReQL |
AddBase ReQL Chain
data Map =
BuiltInMap TermType [ReQL] [OptArg] MapFun
data MapFun =
MapFun (ReQL -> ReQL) |
ConcatMapFun (ReQL -> ReQL)
data Reduce =
BuiltInReduce TermType [ReQL] [OptArg] MRF
applyChain :: Chain -> (ReQL -> ReQL)
applyChain (None t) x = op FUNCALL (t, x)
applyChain (Map maps) s = applyMaps maps s
applyChain (MapReduceChain maps red) s =
applyReduce red $ applyMaps maps s
applyChain (MapReduce mrf) s = applyMRF mrf s
applyChain (SingletonArray x) s = op FUNCALL (op MAKE_ARRAY [x], s)
applyChain (AddBase b c) s = applyChain c s `union` [b]
applyMRF :: MRF -> ReQL -> ReQL
applyMRF (MRF m r Nothing f) s = f `apply` [reduce r (applyMapFun m s)]
applyMRF (MRF m r (Just base) f) s =
f $
apply (\x -> branch (isEmpty x) base (x R.! 0)) . return $
reduce (\a b -> [a R.! 0 `r` b R.! 0]) $
R.map (\x -> [x]) $
applyMapFun m s
applyMaps :: [Map] -> ReQL -> ReQL
applyMaps maps s = foldr applyMap s maps
applyMap :: Map -> ReQL -> ReQL
applyMap (BuiltInMap tt a oa _) s = op' tt (s : a) oa
applyMapFun :: MapFun -> ReQL -> ReQL
applyMapFun (MapFun f) = R.map f
applyMapFun (ConcatMapFun f) = R.concatMap f
applyReduce :: Reduce -> ReQL -> ReQL
applyReduce (BuiltInReduce tt a oa _) s = op' tt (s : a) oa
chainToMRF :: Chain -> Either ReQL MRF
chainToMRF (None t) = Left t
chainToMRF (Map maps) = Right $ maps `thenMRF` collect
chainToMRF (MapReduceChain maps red) = Right $ maps `thenReduce` red
chainToMRF (MapReduce mrf) = Right $ mrf
chainToMRF (SingletonArray x) = Left $ op MAKE_ARRAY [x]
chainToMRF (AddBase b c) = fmap (`thenFinally` \x -> op UNION [b, x]) $ chainToMRF c
thenFinally :: MRF -> (ReQL -> ReQL) -> MRF
thenFinally (MRF m r b f1) f2 = MRF m r b $ f2 . f1
thenMRF :: [Map] -> MRF -> MRF
thenMRF maps (MRF m r b f) =
MRF (m `composeMapFun` composeMaps maps) r b f
composeMaps :: [Map] -> MapFun
composeMaps = foldr composeMapFun (MapFun id) . map getMapFun
where getMapFun (BuiltInMap _ _ _ mf) = mf
composeMapFun :: MapFun -> MapFun -> MapFun
composeMapFun (MapFun f) (MapFun g) = MapFun (f . g)
composeMapFun (ConcatMapFun f) (MapFun g) = ConcatMapFun (f . g)
composeMapFun (MapFun f) (ConcatMapFun g) = ConcatMapFun (R.map f . g)
composeMapFun (ConcatMapFun f) (ConcatMapFun g) = ConcatMapFun (R.concatMap f . g)
thenReduce :: [Map] -> Reduce -> MRF
thenReduce maps (BuiltInReduce _ _ _ mrf) = maps `thenMRF` mrf
collect :: MRF
collect = MRF (MapFun $ \x -> expr [x]) union (Just (expr ())) id
toMapReduce :: Int -> Term -> Chain
toMapReduce v (Note _ t) = toMapReduce v t
toMapReduce _ (Datum (Array a))
| [datum] <- toList a =
SingletonArray . wrap $ Datum datum
toMapReduce _ t@(Datum _) = None $ wrap t
toMapReduce v (Term VAR w _) | sameVar v w = Map []
toMapReduce v t@(Term type' args optargs) = let
args' = map (toMapReduce v) args
optargs' = map (\(TermAttribute k vv) -> (k, toMapReduce v vv)) optargs
nb = length $ filter notConst $ args' ++ map snd optargs'
rewrite = MapReduce $
(if nb == 1 then rewrite1 else rewritex) type' args' optargs'
in case nb of
0 | Just sing <- singleton type' args' optargs -> SingletonArray sing
1 | UNION <- type', [x, SingletonArray s] <- args', [] <- optargs'
-> AddBase s x
0 -> None $ wrap t
1 | (arg1 : _) <- args', notConst arg1 ->
fromMaybe rewrite $ mrChain type' arg1 (tail args) optargs
_ -> rewrite
singleton :: TermType -> [Chain] -> [TermAttribute] -> Maybe ReQL
singleton MAKE_ARRAY [None el] [] = Just el
singleton _ _ _ = Nothing
mrChain :: TermType -> Chain -> [Term] -> [TermAttribute] -> Maybe Chain
mrChain REDUCE (AddBase base (Map maps)) [f] [] =
Just $
MapReduceChain maps $
BuiltInReduce REDUCE [wrap f] [] $
MRF (MapFun id) (toFun2 f) (Just base) id
mrChain tt (Map maps) args optargs
| Just mrf <- mapMRF tt args optargs =
Just . Map . (: maps) $
BuiltInMap tt (map wrap args) (map baseAttrToOptArg optargs) mrf
mrChain tt (Map maps) args optargs
| Just mrf <- reduceMRF tt args optargs =
Just . MapReduceChain maps $
BuiltInReduce tt (map wrap args) (map baseAttrToOptArg optargs) mrf
mrChain _ _ _ _ = Nothing
mapMRF :: TermType -> [Term] -> [TermAttribute]
-> Maybe MapFun
mapMRF MAP [f] [] = Just . MapFun $ toFun1 f
mapMRF PLUCK ks [] =
Just . MapFun $ \s -> op' PLUCK (s : map wrap ks) [noRecurse]
mapMRF WITHOUT ks [] =
Just . MapFun $ \s -> op' WITHOUT (s : map wrap ks) [noRecurse]
mapMRF MERGE [b] [] =
Just . MapFun $ \s -> op' MERGE [s, wrap b] [noRecurse]
mapMRF CONCAT_MAP [f] [] = Just . ConcatMapFun $ toFun1 f
mapMRF FILTER [f] [] =
Just . ConcatMapFun $ \x -> branch (toFun1 f x # handle (const False)) x ()
mapMRF FILTER [f] [TermAttribute "default" defval] =
Just . ConcatMapFun $ \x -> branch (toFun1 f x # handle (const defval)) x ()
mapMRF GET_FIELD [attr] [] =
Just . ConcatMapFun $ \x ->
branch (op' HAS_FIELDS (x, wrap attr) [noRecurse])
[op' GET_FIELD (x, attr) [noRecurse]] ()
mapMRF HAS_FIELDS sel [] =
Just . ConcatMapFun $ \x ->
branch (op' HAS_FIELDS (x : map wrap sel) [noRecurse]) [x] ()
mapMRF WITH_FIELDS sel [] =
Just . ConcatMapFun $ \x ->
branch (op' HAS_FIELDS (x : map wrap sel) [noRecurse])
[op' PLUCK (x : map wrap sel) [noRecurse]] ()
mapMRF BRACKET [k] [] =
Just . MapFun $ \s -> op' BRACKET (s, k) [noRecurse]
mapMRF _ _ _ = Nothing
reduceMRF :: TermType -> [Term] -> [TermAttribute]
-> Maybe MRF
reduceMRF REDUCE [f] [] = Just $ MRF (MapFun id) (toFun2 f) Nothing id
reduceMRF COUNT [] [] = Just $ MRF (MapFun $ const (num 1)) (\a b -> op ADD (a, b)) (Just 0) id
reduceMRF AVG [] [] =
Just $ MRF (MapFun $ \x -> expr [x, 1])
(\a b -> expr [a R.! 0 R.+ b R.! 0, a R.! 1 R.+ b R.! 1])
Nothing
(\x -> x R.! 0 R./ x R.! 1)
reduceMRF SUM [] [] = Just $ MRF (MapFun id) (R.+) (Just 0) id
reduceMRF SUM [sel] [] = Just $ MRF (MapFun $ toFun1 sel) (R.+) (Just 0) id
reduceMRF MIN [] [] = Just $ MRF (MapFun id) (\a b -> branch (a R.< b) a b) Nothing id
reduceMRF MIN [sel] [] =
Just $ MRF (MapFun $ \x -> expr [x, toFun1 sel x])
(\a b -> branch (a R.! 1 R.< b R.! 1) a b)
Nothing
(R.! 0)
reduceMRF MAX [] [] = Just $ MRF (MapFun id) (\a b -> branch (a R.> b) a b) Nothing id
reduceMRF MAX [sel] [] =
Just $ MRF (MapFun $ \x -> expr [x, toFun1 sel x])
(\a b -> branch (a R.! 1 R.> b R.! 1) a b)
Nothing
(R.! 0)
reduceMRF DISTINCT [] [] = Just $ MRF (MapFun $ \a -> expr [a]) (\a b -> distinct (a `union` b)) (Just (expr ())) id
reduceMRF _ _ _ = Nothing
baseAttrToOptArg :: TermAttribute -> OptArg
baseAttrToOptArg (TermAttribute k v) = k := v
noRecurse :: OptArg
noRecurse = "_NO_RECURSE_" := True
rewrite1 :: TermType -> [Chain] -> [(T.Text, Chain)] -> MRF
rewrite1 ttype args optargs = MRF maps red mbase finals where
(finally2, [mr]) = extract Nothing ttype args optargs
MRF maps red mbase fin1 = mr
finals = finally2 . return . fin1
rewritex :: TermType -> [Chain] -> [(Key, Chain)] -> MRF
rewritex ttype args optargs = MRF maps reduces Nothing finallys where
(finally, mrs) = extract (Just 0) ttype args optargs
index = zip $ map expr ([0..] :: [Int])
maps = MapFun $ \x -> expr $ map (($ x) . getMapFun) mrs
reduces a b = expr $ map (uncurry $ mkReduce a b) . index $ map getReduceFun mrs
finallys = let fs = map getFinallyFun mrs in
\x -> finally . map (uncurry $ mkFinally x) $ index fs
mkReduce a b i f = f (a!i) (b!i)
mkFinally x i f = f (x!i)
getMapFun (MRF (MapFun f) _ _ _) = f
getMapFun (MRF (ConcatMapFun f) _ _ _) = f
getReduceFun (MRF (MapFun _) f _ _) = f
getReduceFun (MRF (ConcatMapFun _) f _ _) =
\a b -> flip apply [a `union` b] $ \l ->
branch (isEmpty l) () [reduce f l]
getFinallyFun (MRF (MapFun _) _ _ f) = f
getFinallyFun (MRF (ConcatMapFun _) _ mbase f) =
f . maybe (R.! 0) (\base s ->
flip apply [s] $ handle (const base) $ s R.! 0) mbase
extract ::
Maybe Int -> TermType -> [Chain] -> [(Key, Chain)]
-> ([ReQL] -> ReQL, [MRF])
extract st tt args optargs = fst $ flip runState st $ runWriterT $ do
args' <- sequence $ map extractOne args
optargvs' <- sequence $ map extractOne (map snd optargs)
let optargks = map fst optargs
return $ \v -> op' tt (map ($ v) args') (Prelude.zipWith (:=) optargks $ map ($ v) optargvs')
where
extractOne chain = either (return . const) go $ chainToMRF chain
go :: MRF -> WriterT [MRF] (State (Maybe Int)) ([ReQL] -> ReQL)
go mrf = do
tell [mrf]
st' <- get
case st' of
Nothing -> return head
Just n -> do
put $ Just $ n + 1
return $ \v -> v !! n