{-|
Module      : Data.Conduit.Algorithms
Copyright   : 2013-2021 Luis Pedro Coelho
License     : MIT
Maintainer  : luis@luispedro.org

Simple algorithms packaged as Conduits
-}
{-# LANGUAGE Rank2Types #-}
module Data.Conduit.Algorithms
    ( uniqueOnC
    , uniqueC
    , removeRepeatsC
    , mergeC
    , mergeC2
    ) where

import qualified Data.Conduit as C
import qualified Data.Conduit.Internal as CI
import qualified Data.Set as S
import qualified Data.PriorityQueue.FingerTree as PQ
import           Control.Monad.Trans.Class (lift)
import           Control.Monad (foldM)

import           Data.Conduit.Algorithms.Utils (awaitJust)


-- | Unique conduit.
--
-- For each element, it checks its key (using the @a -> b@ key function) and
-- yields it if it has not seen it before.
--
-- Note that this conduit /does not/ assume that the input is sorted. Instead
-- it uses a 'Data.Set' to store previously seen elements. Thus, memory usage
-- is O(N) and time is O(N log N). If the input is sorted, you can use
-- 'removeRepeatsC'
uniqueOnC :: (Ord b, Monad m) => (a -> b) -> C.ConduitT a a m ()
uniqueOnC :: (a -> b) -> ConduitT a a m ()
uniqueOnC a -> b
f = Set b -> ConduitT a a m ()
forall (m :: * -> *). Monad m => Set b -> ConduitT a a m ()
checkU (forall a. Set a
S.empty :: S.Set b)
    where
        checkU :: Set b -> ConduitT a a m ()
checkU Set b
cur = (a -> ConduitT a a m ()) -> ConduitT a a m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> ConduitT a b m ()) -> ConduitT a b m ()
awaitJust ((a -> ConduitT a a m ()) -> ConduitT a a m ())
-> (a -> ConduitT a a m ()) -> ConduitT a a m ()
forall a b. (a -> b) -> a -> b
$ \a
val ->
                        if a -> b
f a
val b -> Set b -> Bool
forall a. Ord a => a -> Set a -> Bool
`S.member` Set b
cur
                            then Set b -> ConduitT a a m ()
checkU Set b
cur
                            else do
                                a -> ConduitT a a m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield a
val
                                Set b -> ConduitT a a m ()
checkU (b -> Set b -> Set b
forall a. Ord a => a -> Set a -> Set a
S.insert (a -> b
f a
val) Set b
cur)
-- | Unique conduit
--
-- See 'uniqueOnC' and 'removeRepeatsC'
uniqueC :: (Ord a, Monad m) => C.ConduitT a a m ()
uniqueC :: ConduitT a a m ()
uniqueC = (a -> a) -> ConduitT a a m ()
forall b (m :: * -> *) a.
(Ord b, Monad m) =>
(a -> b) -> ConduitT a a m ()
uniqueOnC a -> a
forall a. a -> a
id

-- | Removes repeated elements
--
-- @
--  yieldMany [0, 0, 1, 1, 1, 2, 2, 0] .| removeRepeatsC .| consume
-- @
--
-- is equivalent to @[0, 1, 2, 0]@
--
-- See 'uniqueC' and 'uniqueOnC'
removeRepeatsC :: (Eq a, Monad m) => C.ConduitT a a m ()
removeRepeatsC :: ConduitT a a m ()
removeRepeatsC = (a -> ConduitT a a m ()) -> ConduitT a a m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> ConduitT a b m ()) -> ConduitT a b m ()
awaitJust a -> ConduitT a a m ()
forall (m :: * -> *) o. (Monad m, Eq o) => o -> ConduitT o o m ()
removeRepeatsC'
    where
        removeRepeatsC' :: o -> ConduitT o o m ()
removeRepeatsC' o
prev = ConduitT o o m (Maybe o)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await ConduitT o o m (Maybe o)
-> (Maybe o -> ConduitT o o m ()) -> ConduitT o o m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
                                        Maybe o
Nothing -> o -> ConduitT o o m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield o
prev
                                        Just o
next
                                            | o
next o -> o -> Bool
forall a. Eq a => a -> a -> Bool
== o
prev -> o -> ConduitT o o m ()
removeRepeatsC' o
prev
                                            | Bool
otherwise -> do
                                                        o -> ConduitT o o m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield o
prev
                                                        o -> ConduitT o o m ()
removeRepeatsC' o
next


-- | Merge a list of sorted sources to produce a single (sorted) source
--
-- This takes a list of sorted sources and produces a 'C.Source' which outputs
-- all elements in sorted order.
--
-- See 'mergeC2'
mergeC :: (Ord a, Monad m) => [C.ConduitT () a m ()] -> C.ConduitT () a m ()
mergeC :: [ConduitT () a m ()] -> ConduitT () a m ()
mergeC [ConduitT () a m ()
a] = ConduitT () a m ()
a
mergeC [ConduitT () a m ()
a,ConduitT () a m ()
b] = ConduitT () a m () -> ConduitT () a m () -> ConduitT () a m ()
forall a (m :: * -> *).
(Ord a, Monad m) =>
ConduitT () a m () -> ConduitT () a m () -> ConduitT () a m ()
mergeC2 ConduitT () a m ()
a ConduitT () a m ()
b
mergeC [ConduitT () a m ()]
cs = (forall b. (() -> Pipe () () a () m b) -> Pipe () () a () m b)
-> ConduitT () a m ()
forall i o (m :: * -> *) r.
(forall b. (r -> Pipe i i o () m b) -> Pipe i i o () m b)
-> ConduitT i o m r
CI.ConduitT ((forall b. (() -> Pipe () () a () m b) -> Pipe () () a () m b)
 -> ConduitT () a m ())
-> (forall b. (() -> Pipe () () a () m b) -> Pipe () () a () m b)
-> ConduitT () a m ()
forall a b. (a -> b) -> a -> b
$ \() -> Pipe () () a () m b
rest -> let
        go :: PQueue a (Pipe () () a () m ()) -> Pipe () () a () m b
go PQueue a (Pipe () () a () m ())
q = case PQueue a (Pipe () () a () m ())
-> Maybe (Pipe () () a () m (), PQueue a (Pipe () () a () m ()))
forall k v. Ord k => PQueue k v -> Maybe (v, PQueue k v)
PQ.minView PQueue a (Pipe () () a () m ())
q of
            Maybe (Pipe () () a () m (), PQueue a (Pipe () () a () m ()))
Nothing -> () -> Pipe () () a () m b
rest()
            Just (CI.HaveOutput Pipe () () a () m ()
c_next a
v, PQueue a (Pipe () () a () m ())
q') -> Pipe () () a () m b -> a -> Pipe () () a () m b
forall l i o u (m :: * -> *) r.
Pipe l i o u m r -> o -> Pipe l i o u m r
CI.HaveOutput (PQueue a (Pipe () () a () m ())
-> Pipe () () a () m ()
-> Pipe () () a () m (PQueue a (Pipe () () a () m ()))
forall (m :: * -> *) o i.
(Monad m, Ord o) =>
PQueue o (Pipe () i o () m ())
-> Pipe () i o () m ()
-> Pipe () i o () m (PQueue o (Pipe () i o () m ()))
norm1insert PQueue a (Pipe () () a () m ())
q' Pipe () () a () m ()
c_next Pipe () () a () m (PQueue a (Pipe () () a () m ()))
-> (PQueue a (Pipe () () a () m ()) -> Pipe () () a () m b)
-> Pipe () () a () m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= PQueue a (Pipe () () a () m ()) -> Pipe () () a () m b
go)  a
v
            Maybe (Pipe () () a () m (), PQueue a (Pipe () () a () m ()))
_ -> [Char] -> Pipe () () a () m b
forall a. HasCallStack => [Char] -> a
error [Char]
"This situation should have been impossible (mergeC/go)"
        -- norm1insert inserts the pipe in into the queue after ensuring that the pipe is CI.HaveOutput
        norm1insert :: (Monad m, Ord o)
                            => PQ.PQueue o (CI.Pipe () i o () m ())
                            -> CI.Pipe () i o () m ()
                            -> CI.Pipe () i o () m (PQ.PQueue o (CI.Pipe () i o () m ()))
        norm1insert :: PQueue o (Pipe () i o () m ())
-> Pipe () i o () m ()
-> Pipe () i o () m (PQueue o (Pipe () i o () m ()))
norm1insert PQueue o (Pipe () i o () m ())
q c :: Pipe () i o () m ()
c@(CI.HaveOutput Pipe () i o () m ()
_ o
v) = PQueue o (Pipe () i o () m ())
-> Pipe () i o () m (PQueue o (Pipe () i o () m ()))
forall (m :: * -> *) a. Monad m => a -> m a
return (o
-> Pipe () i o () m ()
-> PQueue o (Pipe () i o () m ())
-> PQueue o (Pipe () i o () m ())
forall k v. Ord k => k -> v -> PQueue k v -> PQueue k v
PQ.insert o
v Pipe () i o () m ()
c PQueue o (Pipe () i o () m ())
q)
        norm1insert PQueue o (Pipe () i o () m ())
q CI.Done{} = PQueue o (Pipe () i o () m ())
-> Pipe () i o () m (PQueue o (Pipe () i o () m ()))
forall (m :: * -> *) a. Monad m => a -> m a
return PQueue o (Pipe () i o () m ())
q
        norm1insert PQueue o (Pipe () i o () m ())
q (CI.PipeM m (Pipe () i o () m ())
p) = m (Pipe () i o () m ()) -> Pipe () i o () m (Pipe () i o () m ())
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m (Pipe () i o () m ())
p Pipe () i o () m (Pipe () i o () m ())
-> (Pipe () i o () m ()
    -> Pipe () i o () m (PQueue o (Pipe () i o () m ())))
-> Pipe () i o () m (PQueue o (Pipe () i o () m ()))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= PQueue o (Pipe () i o () m ())
-> Pipe () i o () m ()
-> Pipe () i o () m (PQueue o (Pipe () i o () m ()))
forall (m :: * -> *) o i.
(Monad m, Ord o) =>
PQueue o (Pipe () i o () m ())
-> Pipe () i o () m ()
-> Pipe () i o () m (PQueue o (Pipe () i o () m ()))
norm1insert PQueue o (Pipe () i o () m ())
q
        norm1insert PQueue o (Pipe () i o () m ())
q (CI.NeedInput i -> Pipe () i o () m ()
_ () -> Pipe () i o () m ()
next) = PQueue o (Pipe () i o () m ())
-> Pipe () i o () m ()
-> Pipe () i o () m (PQueue o (Pipe () i o () m ()))
forall (m :: * -> *) o i.
(Monad m, Ord o) =>
PQueue o (Pipe () i o () m ())
-> Pipe () i o () m ()
-> Pipe () i o () m (PQueue o (Pipe () i o () m ()))
norm1insert PQueue o (Pipe () i o () m ())
q (() -> Pipe () i o () m ()
next ())
        norm1insert PQueue o (Pipe () i o () m ())
q (CI.Leftover Pipe () i o () m ()
next ()) = PQueue o (Pipe () i o () m ())
-> Pipe () i o () m ()
-> Pipe () i o () m (PQueue o (Pipe () i o () m ()))
forall (m :: * -> *) o i.
(Monad m, Ord o) =>
PQueue o (Pipe () i o () m ())
-> Pipe () i o () m ()
-> Pipe () i o () m (PQueue o (Pipe () i o () m ()))
norm1insert PQueue o (Pipe () i o () m ())
q Pipe () i o () m ()
next
    in do
        let st :: [Pipe () () a () m ()]
st = (ConduitT () a m () -> Pipe () () a () m ())
-> [ConduitT () a m ()] -> [Pipe () () a () m ()]
forall a b. (a -> b) -> [a] -> [b]
map (\ConduitT () a m ()
c -> ConduitT () a m ()
-> forall b. (() -> Pipe () () a () m b) -> Pipe () () a () m b
forall i o (m :: * -> *) r.
ConduitT i o m r
-> forall b. (r -> Pipe i i o () m b) -> Pipe i i o () m b
CI.unConduitT ConduitT () a m ()
c ((() -> Pipe () () a () m ()) -> Pipe () () a () m ())
-> (() -> Pipe () () a () m ()) -> Pipe () () a () m ()
forall a b. (a -> b) -> a -> b
$ () -> Pipe () () a () m ()
forall l i o u (m :: * -> *) r. r -> Pipe l i o u m r
CI.Done) [ConduitT () a m ()]
cs
        PQueue a (Pipe () () a () m ()) -> Pipe () () a () m b
go (PQueue a (Pipe () () a () m ()) -> Pipe () () a () m b)
-> Pipe () () a () m (PQueue a (Pipe () () a () m ()))
-> Pipe () () a () m b
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (PQueue a (Pipe () () a () m ())
 -> Pipe () () a () m ()
 -> Pipe () () a () m (PQueue a (Pipe () () a () m ())))
-> PQueue a (Pipe () () a () m ())
-> [Pipe () () a () m ()]
-> Pipe () () a () m (PQueue a (Pipe () () a () m ()))
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM PQueue a (Pipe () () a () m ())
-> Pipe () () a () m ()
-> Pipe () () a () m (PQueue a (Pipe () () a () m ()))
forall (m :: * -> *) o i.
(Monad m, Ord o) =>
PQueue o (Pipe () i o () m ())
-> Pipe () i o () m ()
-> Pipe () i o () m (PQueue o (Pipe () i o () m ()))
norm1insert PQueue a (Pipe () () a () m ())
forall k v. Ord k => PQueue k v
PQ.empty [Pipe () () a () m ()]
st

-- | Take two sorted sources and merge them.
--
-- See 'mergeC'
mergeC2 :: (Ord a, Monad m) => C.ConduitT () a m () -> C.ConduitT () a m () -> C.ConduitT () a m ()
mergeC2 :: ConduitT () a m () -> ConduitT () a m () -> ConduitT () a m ()
mergeC2 (CI.ConduitT forall b. (() -> Pipe () () a () m b) -> Pipe () () a () m b
s1) (CI.ConduitT forall b. (() -> Pipe () () a () m b) -> Pipe () () a () m b
s2) = (forall b. (() -> Pipe () () a () m b) -> Pipe () () a () m b)
-> ConduitT () a m ()
forall i o (m :: * -> *) r.
(forall b. (r -> Pipe i i o () m b) -> Pipe i i o () m b)
-> ConduitT i o m r
CI.ConduitT ((forall b. (() -> Pipe () () a () m b) -> Pipe () () a () m b)
 -> ConduitT () a m ())
-> (forall b. (() -> Pipe () () a () m b) -> Pipe () () a () m b)
-> ConduitT () a m ()
forall a b. (a -> b) -> a -> b
$ \() -> Pipe () () a () m b
rest -> let
        go :: Pipe () i a () m r -> Pipe () i a () m r -> Pipe () () a () m b
go right :: Pipe () i a () m r
right@(CI.HaveOutput Pipe () i a () m r
s1' a
v1) left :: Pipe () i a () m r
left@(CI.HaveOutput Pipe () i a () m r
s2' a
v2)
            | a
v1 a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
v2 = Pipe () () a () m b -> a -> Pipe () () a () m b
forall l i o u (m :: * -> *) r.
Pipe l i o u m r -> o -> Pipe l i o u m r
CI.HaveOutput (Pipe () i a () m r -> Pipe () i a () m r -> Pipe () () a () m b
go Pipe () i a () m r
s1' Pipe () i a () m r
left) a
v1
            | Bool
otherwise = Pipe () () a () m b -> a -> Pipe () () a () m b
forall l i o u (m :: * -> *) r.
Pipe l i o u m r -> o -> Pipe l i o u m r
CI.HaveOutput (Pipe () i a () m r -> Pipe () i a () m r -> Pipe () () a () m b
go Pipe () i a () m r
right Pipe () i a () m r
s2') a
v2
        go right :: Pipe () i a () m r
right@CI.Done{} (CI.HaveOutput Pipe () i a () m r
s a
v) = Pipe () () a () m b -> a -> Pipe () () a () m b
forall l i o u (m :: * -> *) r.
Pipe l i o u m r -> o -> Pipe l i o u m r
CI.HaveOutput (Pipe () i a () m r -> Pipe () i a () m r -> Pipe () () a () m b
go Pipe () i a () m r
right Pipe () i a () m r
s) a
v
        go (CI.HaveOutput Pipe () i a () m r
s a
v) left :: Pipe () i a () m r
left@CI.Done{}  = Pipe () () a () m b -> a -> Pipe () () a () m b
forall l i o u (m :: * -> *) r.
Pipe l i o u m r -> o -> Pipe l i o u m r
CI.HaveOutput (Pipe () i a () m r -> Pipe () i a () m r -> Pipe () () a () m b
go Pipe () i a () m r
s Pipe () i a () m r
left)  a
v
        go CI.Done{} CI.Done{} = () -> Pipe () () a () m b
rest ()
        go (CI.PipeM m (Pipe () i a () m r)
p) Pipe () i a () m r
left = do
            Pipe () i a () m r
next <- m (Pipe () i a () m r) -> Pipe () () a () m (Pipe () i a () m r)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m (Pipe () i a () m r)
p
            Pipe () i a () m r -> Pipe () i a () m r -> Pipe () () a () m b
go Pipe () i a () m r
next Pipe () i a () m r
left
        go Pipe () i a () m r
right (CI.PipeM m (Pipe () i a () m r)
p) = do
            Pipe () i a () m r
next <- m (Pipe () i a () m r) -> Pipe () () a () m (Pipe () i a () m r)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m (Pipe () i a () m r)
p
            Pipe () i a () m r -> Pipe () i a () m r -> Pipe () () a () m b
go Pipe () i a () m r
right Pipe () i a () m r
next
        go (CI.NeedInput i -> Pipe () i a () m r
_ () -> Pipe () i a () m r
next) Pipe () i a () m r
left = Pipe () i a () m r -> Pipe () i a () m r -> Pipe () () a () m b
go (() -> Pipe () i a () m r
next ()) Pipe () i a () m r
left
        go Pipe () i a () m r
right (CI.NeedInput i -> Pipe () i a () m r
_ () -> Pipe () i a () m r
next) = Pipe () i a () m r -> Pipe () i a () m r -> Pipe () () a () m b
go Pipe () i a () m r
right (() -> Pipe () i a () m r
next ())
        go (CI.Leftover Pipe () i a () m r
next ()) Pipe () i a () m r
left = Pipe () i a () m r -> Pipe () i a () m r -> Pipe () () a () m b
go Pipe () i a () m r
next Pipe () i a () m r
left
        go Pipe () i a () m r
right (CI.Leftover Pipe () i a () m r
next ()) = Pipe () i a () m r -> Pipe () i a () m r -> Pipe () () a () m b
go Pipe () i a () m r
right Pipe () i a () m r
next
    in Pipe () () a () m () -> Pipe () () a () m () -> Pipe () () a () m b
forall i r i r.
Pipe () i a () m r -> Pipe () i a () m r -> Pipe () () a () m b
go ((() -> Pipe () () a () m ()) -> Pipe () () a () m ()
forall b. (() -> Pipe () () a () m b) -> Pipe () () a () m b
s1 () -> Pipe () () a () m ()
forall l i o u (m :: * -> *) r. r -> Pipe l i o u m r
CI.Done) ((() -> Pipe () () a () m ()) -> Pipe () () a () m ()
forall b. (() -> Pipe () () a () m b) -> Pipe () () a () m b
s2 () -> Pipe () () a () m ()
forall l i o u (m :: * -> *) r. r -> Pipe l i o u m r
CI.Done)