-- |
-- Module      : Streamly.Internal.Data.Stream.IsStream.Top
-- Copyright   : (c) 2020 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Top level IsStream module that can use all other lower level IsStream
-- modules.

module Streamly.Internal.Data.Stream.IsStream.Top
    (
    -- * Transformation
    -- ** Sampling
    -- | Value agnostic filtering.
      sampleFromThen
    , sampleIntervalStart
    , sampleIntervalEnd
    , sampleBurstStart
    , sampleBurstEnd

    -- ** Reordering
    , sortBy

    -- * Nesting
    -- ** Set like operations
    -- | These are not exactly set operations because streams are not
    -- necessarily sets, they may have duplicated elements.
    , intersectBy
    , mergeIntersectBy
    , differenceBy
    , mergeDifferenceBy
    , unionBy
    , mergeUnionBy

    -- ** Join operations
    , crossJoin
    , innerJoin
    , mergeInnerJoin
    , hashInnerJoin
    , leftJoin
    , mergeLeftJoin
    , hashLeftJoin
    , outerJoin
    , mergeOuterJoin
    , hashOuterJoin
    )
where

#include "inline.hs"

import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.State.Strict (get, put)
-- import Data.Hashable (Hashable)
import Data.IORef (newIORef, readIORef, modifyIORef')
import Data.Kind (Type)
#if !(MIN_VERSION_base(4,11,0))
import Data.Semigroup (Semigroup(..))
#endif
import Streamly.Internal.Data.SVar (MonadAsync)
import Streamly.Internal.Data.Stream.IsStream.Common (concatM)
import Streamly.Internal.Data.Stream.Prelude (foldl', fromList)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamK (IsStream)
import Streamly.Internal.Data.Time.Units (NanoSecond64(..), toRelTime64)

import qualified Data.List as List
import qualified Streamly.Internal.Data.Array as Array
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Stream.IsStream.Lift as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Eliminate as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Generate as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Expand as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Reduce as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Transform as Stream
import qualified Streamly.Internal.Data.Stream.StreamK as StreamK

import Prelude hiding (filter, zipWith, concatMap, concat)

-- $setup
-- >>> :m
-- >>> import Prelude hiding (filter, zipWith, concatMap, concat)
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import qualified Streamly.Internal.Data.Stream.IsStream as Stream

------------------------------------------------------------------------------
-- Sampling
------------------------------------------------------------------------------

-- XXX We can implement this using addition instead of "mod" to make it more
-- efficient.
--
-- | @sampleFromthen offset stride@ samples the element at @offset@ index and
-- then every element at strides of @stride@.
--
-- >>> Stream.toList $ Stream.sampleFromThen 2 3 $ Stream.enumerateFromTo 0 10
-- [2,5,8]
--
-- /Pre-release/
--
{-# INLINE sampleFromThen #-}
sampleFromThen :: (IsStream t, Monad m, Functor (t m)) =>
    Int -> Int -> t m a -> t m a
sampleFromThen :: Int -> Int -> t m a -> t m a
sampleFromThen Int
offset Int
stride =
    (t m a -> t m (Int, a))
-> (((Int, a) -> Bool) -> t m (Int, a) -> t m (Int, a))
-> ((Int, a) -> Bool)
-> t m a
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b s.
Functor (t m) =>
(t m a -> t m (s, a))
-> (((s, a) -> b) -> t m (s, a) -> t m (s, a))
-> ((s, a) -> b)
-> t m a
-> t m a
Stream.with t m a -> t m (Int, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m (Int, a)
Stream.indexed ((Int, a) -> Bool) -> t m (Int, a) -> t m (Int, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter
        (\(Int
i, a
_) -> Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
offset Bool -> Bool -> Bool
&& (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
stride Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0)

-- | Continuously evaluate the input stream and sample the last event in time
-- window of @n@ seconds.
--
-- This is also known as @throttle@ in some libraries.
--
-- @
-- sampleIntervalEnd n = Stream.catMaybes . Stream.intervalsOf n Fold.last
-- @
--
-- /Pre-release/
--
{-# INLINE sampleIntervalEnd #-}
sampleIntervalEnd :: (IsStream t, MonadAsync m, Functor (t m)) =>
    Double -> t m a -> t m a
sampleIntervalEnd :: Double -> t m a -> t m a
sampleIntervalEnd Double
n = t m (Maybe a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes (t m (Maybe a) -> t m a)
-> (t m a -> t m (Maybe a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Fold m a (Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
Stream.intervalsOf Double
n Fold m a (Maybe a)
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.last

-- | Like 'sampleInterval' but samples at the beginning of the time window.
--
-- @
-- sampleIntervalStart n = Stream.catMaybes . Stream.intervalsOf n Fold.head
-- @
--
-- /Pre-release/
--
{-# INLINE sampleIntervalStart #-}
sampleIntervalStart :: (IsStream t, MonadAsync m, Functor (t m)) =>
    Double -> t m a -> t m a
sampleIntervalStart :: Double -> t m a -> t m a
sampleIntervalStart Double
n = t m (Maybe a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes (t m (Maybe a) -> t m a)
-> (t m a -> t m (Maybe a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Fold m a (Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
Stream.intervalsOf Double
n Fold m a (Maybe a)
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.head

-- | Sample one event at the end of each burst of events.  A burst is a group
-- of events close together in time, it ends when an event is spaced by more
-- than the specified time interval from the previous event.
--
-- This is known as @debounce@ in some libraries.
--
-- The clock granularity is 10 ms.
--
-- /Pre-release/
--
{-# INLINE sampleBurstEnd #-}
sampleBurstEnd :: (IsStream t, MonadAsync m, Functor (t m)) =>
    Double -> t m a -> t m a
sampleBurstEnd :: Double -> t m a -> t m a
sampleBurstEnd Double
gap =
    let f :: (RelTime64, b) -> (RelTime64, b) -> Bool
f (RelTime64
t1, b
_) (RelTime64
t2, b
_) =
            RelTime64
t2 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t1 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= NanoSecond64 -> RelTime64
forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 (Int64 -> NanoSecond64
NanoSecond64 (Double -> Int64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
gap Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
10Double -> Int -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^(Int
9::Int))))
    in ((RelTime64, a) -> a) -> t m (RelTime64, a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (RelTime64, a) -> a
forall a b. (a, b) -> b
snd
        (t m (RelTime64, a) -> t m a)
-> (t m a -> t m (RelTime64, a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m (Maybe (RelTime64, a)) -> t m (RelTime64, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes
        (t m (Maybe (RelTime64, a)) -> t m (RelTime64, a))
-> (t m a -> t m (Maybe (RelTime64, a)))
-> t m a
-> t m (RelTime64, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((RelTime64, a) -> (RelTime64, a) -> Bool)
-> Fold m (RelTime64, a) (Maybe (RelTime64, a))
-> t m (RelTime64, a)
-> t m (Maybe (RelTime64, a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
Stream.groupsByRolling (RelTime64, a) -> (RelTime64, a) -> Bool
forall b b. (RelTime64, b) -> (RelTime64, b) -> Bool
f Fold m (RelTime64, a) (Maybe (RelTime64, a))
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.last
        (t m (RelTime64, a) -> t m (Maybe (RelTime64, a)))
-> (t m a -> t m (RelTime64, a))
-> t m a
-> t m (Maybe (RelTime64, a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> t m (RelTime64, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (RelTime64, a)
Stream.timeIndexed

-- | Like 'sampleBurstEnd' but samples the event at the beginning of the burst
-- instead of at the end of it.
--
-- /Pre-release/
--
{-# INLINE sampleBurstStart #-}
sampleBurstStart :: (IsStream t, MonadAsync m, Functor (t m)) =>
    Double -> t m a -> t m a
sampleBurstStart :: Double -> t m a -> t m a
sampleBurstStart Double
gap =
    let f :: (RelTime64, b) -> (RelTime64, b) -> Bool
f (RelTime64
t1, b
_) (RelTime64
t2, b
_) =
            RelTime64
t2 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t1 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= NanoSecond64 -> RelTime64
forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 (Int64 -> NanoSecond64
NanoSecond64 (Double -> Int64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
gap Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
10Double -> Int -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^(Int
9::Int))))
    in ((RelTime64, a) -> a) -> t m (RelTime64, a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (RelTime64, a) -> a
forall a b. (a, b) -> b
snd
        (t m (RelTime64, a) -> t m a)
-> (t m a -> t m (RelTime64, a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m (Maybe (RelTime64, a)) -> t m (RelTime64, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes
        (t m (Maybe (RelTime64, a)) -> t m (RelTime64, a))
-> (t m a -> t m (Maybe (RelTime64, a)))
-> t m a
-> t m (RelTime64, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((RelTime64, a) -> (RelTime64, a) -> Bool)
-> Fold m (RelTime64, a) (Maybe (RelTime64, a))
-> t m (RelTime64, a)
-> t m (Maybe (RelTime64, a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
Stream.groupsByRolling (RelTime64, a) -> (RelTime64, a) -> Bool
forall b b. (RelTime64, b) -> (RelTime64, b) -> Bool
f Fold m (RelTime64, a) (Maybe (RelTime64, a))
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.head
        (t m (RelTime64, a) -> t m (Maybe (RelTime64, a)))
-> (t m a -> t m (RelTime64, a))
-> t m a
-> t m (Maybe (RelTime64, a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> t m (RelTime64, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (RelTime64, a)
Stream.timeIndexed

------------------------------------------------------------------------------
-- Reordering
------------------------------------------------------------------------------
--
-- We could possibly choose different algorithms depending on whether the
-- input stream is almost sorted (ascending/descending) or random. We could
-- serialize the stream to an array and use quicksort.
--
-- | Sort the input stream using a supplied comparison function.
--
-- /O(n) space/
--
-- Note: this is not the fastest possible implementation as of now.
--
-- /Pre-release/
--
{-# INLINE sortBy #-}
sortBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a
-- XXX creating StreamD and using D.mergeBy may be more efficient due to fusion
sortBy :: (a -> a -> Ordering) -> t m a -> t m a
sortBy a -> a -> Ordering
f = (t m a -> t m a -> t m a) -> (a -> t m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
Stream.concatPairsWith ((a -> a -> Ordering) -> t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
Stream.mergeBy a -> a -> Ordering
f) a -> t m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
Stream.fromPure

------------------------------------------------------------------------------
-- SQL Joins
------------------------------------------------------------------------------
--
-- Some references:
-- * https://en.wikipedia.org/wiki/Relational_algebra
-- * https://en.wikipedia.org/wiki/Join_(SQL)

-- TODO: OrdSet/IntSet/hashmap based versions of these. With Eq only
-- constraint, the best would be to use an Array with linear search. If the
-- second stream is sorted we can also use a binary search, using Ord
-- constraint or an ordering function.
--
-- For Storables we can cache the second stream into an unboxed array for
-- possibly faster access/compact representation?
--
-- If we do not want to keep the stream in memory but always read it from the
-- source (disk/network) every time we iterate through it then we can do that
-- too by reading the stream every time, the stream must have immutable state
-- in that case and the user is responsible for the behavior if the stream
-- source changes during iterations. We can also use an Unfold instead of
-- stream. We probably need a way to distinguish streams that can be read
-- mutliple times without any interference (e.g. unfolding a stream using an
-- immutable handle would work i.e. using pread/pwrite instead of maintianing
-- an offset in the handle).

-- XXX We can do this concurrently.
--
-- | This is the same as 'Streamly.Internal.Data.Unfold.outerProduct' but less
-- efficient.
--
-- The second stream is evaluated multiple times. If the second stream is
-- consume-once stream then it can be cached in an 'Data.Array.Array' before
-- calling this function. Caching may also improve performance if the stream is
-- expensive to evaluate.
--
-- Time: O(m x n)
--
-- /Pre-release/
{-# INLINE crossJoin #-}
crossJoin :: Monad (t m) => t m a -> t m b -> t m (a, b)
crossJoin :: t m a -> t m b -> t m (a, b)
crossJoin t m a
s1 t m b
s2 = do
    -- XXX use concatMap instead?
    a
a <- t m a
s1
    b
b <- t m b
s2
    (a, b) -> t m (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, b
b)

-- XXX We can do this concurrently.
-- XXX If the second stream is sorted and passed as an Array we could use
-- binary search if we have an Ord instance or Ordering returning function. The
-- time complexity would then become (m x log n).
--
-- | For all elements in @t m a@, for all elements in @t m b@ if @a@ and @b@
-- are equal by the given equality pedicate then return the tuple (a, b).
--
-- The second stream is evaluated multiple times. If the stream is a
-- consume-once stream then the caller should cache it in an 'Data.Array.Array'
-- before calling this function. Caching may also improve performance if the
-- stream is expensive to evaluate.
--
-- For space efficiency use the smaller stream as the second stream.
--
-- Space: O(n) assuming the second stream is cached in memory.
--
-- Time: O(m x n)
--
-- /Pre-release/
{-# INLINE innerJoin #-}
innerJoin ::
    forall (t :: (Type -> Type) -> Type -> Type) m a b.
    (IsStream t, Monad (t m)) =>
        (a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
innerJoin :: (a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
innerJoin a -> b -> Bool
eq t m a
s1 t m b
s2 = do
    -- XXX use concatMap instead?
    a
a <- t m a
s1
    b
b <- t m b
s2
    if a
a a -> b -> Bool
`eq` b
b
    then (a, b) -> t m (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, b
b)
    else t m (a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
StreamK.nil

-- If the second stream is too big it can be partitioned based on hashes and
-- then we can process one parition at a time.
--
-- | Like 'innerJoin' but uses a hashmap for efficiency.
--
-- For space efficiency use the smaller stream as the second stream.
--
-- Space: O(n)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE hashInnerJoin #-}
hashInnerJoin :: -- Hashable b =>
    (a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
hashInnerJoin :: (a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
hashInnerJoin = (a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
forall a. HasCallStack => a
undefined

-- | Like 'innerJoin' but works only on sorted streams.
--
-- Space: O(1)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE mergeInnerJoin #-}
mergeInnerJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
mergeInnerJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
mergeInnerJoin = (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
forall a. HasCallStack => a
undefined

-- XXX We can do this concurrently.
-- XXX If the second stream is sorted and passed as an Array or a seek capable
-- stream then we could use binary search if we have an Ord instance or
-- Ordering returning function. The time complexity would then become (m x log
-- n).
--
-- | For all elements in @t m a@, for all elements in @t m b@ if @a@ and @b@
-- are equal then return the tuple @(a, Just b)@.  If @a@ is not present in @t
-- m b@ then return @(a, Nothing)@.
--
-- The second stream is evaluated multiple times. If the stream is a
-- consume-once stream then the caller should cache it in an 'Data.Array.Array'
-- before calling this function. Caching may also improve performance if the
-- stream is expensive to evaluate.
--
-- @
-- rightJoin = flip leftJoin
-- @
--
-- Space: O(n) assuming the second stream is cached in memory.
--
-- Time: O(m x n)
--
-- /Unimplemented/
{-# INLINE leftJoin #-}
leftJoin :: Monad m =>
    (a -> b -> Bool) -> SerialT m a -> SerialT m b -> SerialT m (a, Maybe b)
leftJoin :: (a -> b -> Bool)
-> SerialT m a -> SerialT m b -> SerialT m (a, Maybe b)
leftJoin a -> b -> Bool
eq SerialT m a
s1 SerialT m b
s2 = m Bool
-> SerialT (StateT Bool m) (a, Maybe b) -> SerialT m (a, Maybe b)
forall (m :: * -> *) s a.
Monad m =>
m s -> SerialT (StateT s m) a -> SerialT m a
Stream.evalStateT (Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) (SerialT (StateT Bool m) (a, Maybe b) -> SerialT m (a, Maybe b))
-> SerialT (StateT Bool m) (a, Maybe b) -> SerialT m (a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
    a
a <- SerialT m a -> SerialT (StateT Bool m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
       (tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m a
s1
    -- XXX should we use StreamD monad here?
    -- XXX Is there a better way to perform some action at the end of a loop
    -- iteration?
    StateT Bool m () -> SerialT (StateT Bool m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (StateT Bool m () -> SerialT (StateT Bool m) ())
-> StateT Bool m () -> SerialT (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False
    let final :: SerialT (StateT Bool m) (Maybe a)
final = do
            Bool
r <- StateT Bool m Bool -> SerialT (StateT Bool m) Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift StateT Bool m Bool
forall (m :: * -> *) s. Monad m => StateT s m s
get
            if Bool
r
            then SerialT (StateT Bool m) (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
StreamK.nil
            else Maybe a -> SerialT (StateT Bool m) (Maybe a)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
StreamK.fromPure Maybe a
forall a. Maybe a
Nothing
    Maybe b
b <- (b -> Maybe b)
-> SerialT (StateT Bool m) b -> SerialT (StateT Bool m) (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap b -> Maybe b
forall a. a -> Maybe a
Just (SerialT m b -> SerialT (StateT Bool m) b
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
       (tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m b
s2) SerialT (StateT Bool m) (Maybe b)
-> SerialT (StateT Bool m) (Maybe b)
-> SerialT (StateT Bool m) (Maybe b)
forall a. Semigroup a => a -> a -> a
<> SerialT (StateT Bool m) (Maybe b)
forall a. SerialT (StateT Bool m) (Maybe a)
final
    case Maybe b
b of
        Just b
b1 ->
            if a
a a -> b -> Bool
`eq` b
b1
            then do
                StateT Bool m () -> SerialT (StateT Bool m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (StateT Bool m () -> SerialT (StateT Bool m) ())
-> StateT Bool m () -> SerialT (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True
                (a, Maybe b) -> SerialT (StateT Bool m) (a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, b -> Maybe b
forall a. a -> Maybe a
Just b
b1)
            else SerialT (StateT Bool m) (a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
StreamK.nil
        Maybe b
Nothing -> (a, Maybe b) -> SerialT (StateT Bool m) (a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Maybe b
forall a. Maybe a
Nothing)

-- | Like 'outerJoin' but uses a hashmap for efficiency.
--
-- Space: O(n)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE hashLeftJoin #-}
hashLeftJoin :: -- Hashable b =>
    (a -> b -> Bool) -> t m a -> t m b -> t m (a, Maybe b)
hashLeftJoin :: (a -> b -> Bool) -> t m a -> t m b -> t m (a, Maybe b)
hashLeftJoin = (a -> b -> Bool) -> t m a -> t m b -> t m (a, Maybe b)
forall a. HasCallStack => a
undefined

-- | Like 'leftJoin' but works only on sorted streams.
--
-- Space: O(1)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE mergeLeftJoin #-}
mergeLeftJoin :: -- Monad m =>
    (a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b)
mergeLeftJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b)
mergeLeftJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = t m (a, Maybe b)
forall a. HasCallStack => a
undefined

-- XXX We can do this concurrently.
--
-- | For all elements in @t m a@, for all elements in @t m b@ if @a@ and @b@
-- are equal by the given equality pedicate then return the tuple (Just a, Just
-- b).  If @a@ is not found in @t m b@ then return (a, Nothing), return
-- (Nothing, b) for vice-versa.
--
-- For space efficiency use the smaller stream as the second stream.
--
-- Space: O(n)
--
-- Time: O(m x n)
--
-- /Unimplemented/
{-# INLINE outerJoin #-}
outerJoin :: MonadIO m =>
       (a -> b -> Bool)
    -> SerialT m a
    -> SerialT m b
    -> SerialT m (Maybe a, Maybe b)
outerJoin :: (a -> b -> Bool)
-> SerialT m a -> SerialT m b -> SerialT m (Maybe a, Maybe b)
outerJoin a -> b -> Bool
eq SerialT m a
s1 SerialT m b
s =
    m (SerialT m (Maybe a, Maybe b)) -> SerialT m (Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM (m (SerialT m (Maybe a, Maybe b)) -> SerialT m (Maybe a, Maybe b))
-> m (SerialT m (Maybe a, Maybe b)) -> SerialT m (Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
        Array (b, Bool)
arr <- SerialT m (b, Bool) -> m (Array (b, Bool))
forall (m :: * -> *) a. MonadIO m => SerialT m a -> m (Array a)
Array.fromStream (SerialT m (b, Bool) -> m (Array (b, Bool)))
-> SerialT m (b, Bool) -> m (Array (b, Bool))
forall a b. (a -> b) -> a -> b
$ (b -> (b, Bool)) -> SerialT m b -> SerialT m (b, Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (,Bool
False) SerialT m b
s
        SerialT m (Maybe a, Maybe b) -> m (SerialT m (Maybe a, Maybe b))
forall (m :: * -> *) a. Monad m => a -> m a
return (SerialT m (Maybe a, Maybe b) -> m (SerialT m (Maybe a, Maybe b)))
-> SerialT m (Maybe a, Maybe b) -> m (SerialT m (Maybe a, Maybe b))
forall a b. (a -> b) -> a -> b
$ Array (b, Bool) -> SerialT m (Maybe a, Maybe b)
forall b. Array (b, b) -> SerialT m (Maybe a, Maybe b)
go Array (b, Bool)
arr SerialT m (Maybe a, Maybe b)
-> SerialT m (Maybe a, Maybe b) -> SerialT m (Maybe a, Maybe b)
forall a. Semigroup a => a -> a -> a
<> Array (b, Bool) -> SerialT m (Maybe a, Maybe b)
forall a a. Array (a, Bool) -> SerialT m (Maybe a, Maybe a)
leftOver Array (b, Bool)
arr

    where

    leftOver :: Array (a, Bool) -> SerialT m (Maybe a, Maybe a)
leftOver =
            ((a, Bool) -> (Maybe a, Maybe a))
-> SerialT m (a, Bool) -> SerialT m (Maybe a, Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\(a
x, Bool
_) -> (Maybe a
forall a. Maybe a
Nothing, a -> Maybe a
forall a. a -> Maybe a
Just a
x))
                (SerialT m (a, Bool) -> SerialT m (Maybe a, Maybe a))
-> (Array (a, Bool) -> SerialT m (a, Bool))
-> Array (a, Bool)
-> SerialT m (Maybe a, Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((a, Bool) -> Bool) -> SerialT m (a, Bool) -> SerialT m (a, Bool)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter (Bool -> Bool
not (Bool -> Bool) -> ((a, Bool) -> Bool) -> (a, Bool) -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a, Bool) -> Bool
forall a b. (a, b) -> b
snd)
                (SerialT m (a, Bool) -> SerialT m (a, Bool))
-> (Array (a, Bool) -> SerialT m (a, Bool))
-> Array (a, Bool)
-> SerialT m (a, Bool)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Array (a, Bool) -> SerialT m (a, Bool)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
Array a -> t m a
Array.toStream

    go :: Array (b, b) -> SerialT m (Maybe a, Maybe b)
go Array (b, b)
arr = m Bool
-> SerialT (StateT Bool m) (Maybe a, Maybe b)
-> SerialT m (Maybe a, Maybe b)
forall (m :: * -> *) s a.
Monad m =>
m s -> SerialT (StateT s m) a -> SerialT m a
Stream.evalStateT (Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) (SerialT (StateT Bool m) (Maybe a, Maybe b)
 -> SerialT m (Maybe a, Maybe b))
-> SerialT (StateT Bool m) (Maybe a, Maybe b)
-> SerialT m (Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
        a
a <- SerialT m a -> SerialT (StateT Bool m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
       (tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m a
s1
        -- XXX should we use StreamD monad here?
        -- XXX Is there a better way to perform some action at the end of a loop
        -- iteration?
        StateT Bool m () -> SerialT (StateT Bool m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (StateT Bool m () -> SerialT (StateT Bool m) ())
-> StateT Bool m () -> SerialT (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False
        let final :: SerialT (StateT Bool m) (Maybe a)
final = do
                Bool
r <- StateT Bool m Bool -> SerialT (StateT Bool m) Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift StateT Bool m Bool
forall (m :: * -> *) s. Monad m => StateT s m s
get
                if Bool
r
                then SerialT (StateT Bool m) (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
StreamK.nil
                else Maybe a -> SerialT (StateT Bool m) (Maybe a)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
StreamK.fromPure Maybe a
forall a. Maybe a
Nothing
        (Int
_i, Maybe (b, b)
b) <-
            SerialT (StateT Bool m) (Maybe (b, b))
-> SerialT (StateT Bool m) (Int, Maybe (b, b))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m (Int, a)
Stream.indexed
                (SerialT (StateT Bool m) (Maybe (b, b))
 -> SerialT (StateT Bool m) (Int, Maybe (b, b)))
-> SerialT (StateT Bool m) (Maybe (b, b))
-> SerialT (StateT Bool m) (Int, Maybe (b, b))
forall a b. (a -> b) -> a -> b
$ ((b, b) -> Maybe (b, b))
-> SerialT (StateT Bool m) (b, b)
-> SerialT (StateT Bool m) (Maybe (b, b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (b, b) -> Maybe (b, b)
forall a. a -> Maybe a
Just (SerialT m (b, b) -> SerialT (StateT Bool m) (b, b)
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
       (tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner (Array (b, b) -> SerialT m (b, b)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
Array a -> t m a
Array.toStream Array (b, b)
arr)) SerialT (StateT Bool m) (Maybe (b, b))
-> SerialT (StateT Bool m) (Maybe (b, b))
-> SerialT (StateT Bool m) (Maybe (b, b))
forall a. Semigroup a => a -> a -> a
<> SerialT (StateT Bool m) (Maybe (b, b))
forall a. SerialT (StateT Bool m) (Maybe a)
final
        case Maybe (b, b)
b of
            Just (b
b1, b
_used) ->
                if a
a a -> b -> Bool
`eq` b
b1
                then do
                    StateT Bool m () -> SerialT (StateT Bool m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (StateT Bool m () -> SerialT (StateT Bool m) ())
-> StateT Bool m () -> SerialT (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True
                    -- XXX Need to use a mutable array
                    -- when (not used) $ Array.writeIndex i True
                    (Maybe a, Maybe b) -> SerialT (StateT Bool m) (Maybe a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a, b -> Maybe b
forall a. a -> Maybe a
Just b
b1)
                else SerialT (StateT Bool m) (Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
StreamK.nil
            Maybe (b, b)
Nothing -> (Maybe a, Maybe b) -> SerialT (StateT Bool m) (Maybe a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a, Maybe b
forall a. Maybe a
Nothing)

-- Put the b's that have been paired, in another hash or mutate the hash to set
-- a flag. At the end go through @t m b@ and find those that are not in that
-- hash to return (Nothing, b).
--
-- | Like 'outerJoin' but uses a hashmap for efficiency.
--
-- For space efficiency use the smaller stream as the second stream.
--
-- Space: O(n)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE hashOuterJoin #-}
hashOuterJoin :: -- (Monad m, Hashable b) =>
    (a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
hashOuterJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
hashOuterJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = t m (Maybe a, Maybe b)
forall a. HasCallStack => a
undefined

-- | Like 'outerJoin' but works only on sorted streams.
--
-- Space: O(1)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE mergeOuterJoin #-}
mergeOuterJoin :: -- Monad m =>
    (a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
mergeOuterJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
mergeOuterJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = t m (Maybe a, Maybe b)
forall a. HasCallStack => a
undefined

------------------------------------------------------------------------------
-- Set operations (special joins)
------------------------------------------------------------------------------
--
-- TODO: OrdSet/IntSet/hashmap based versions of these. With Eq only constraint
-- the best would be to use an Array with linear search. If the second stream
-- is sorted we can also use a binary search, using Ord constraint.

-- | 'intersectBy' is essentially a filtering operation that retains only those
-- elements in the first stream that are present in the second stream.
--
-- >>> Stream.toList $ Stream.intersectBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])
-- [1,2,2]
--
-- >>> Stream.toList $ Stream.intersectBy (==) (Stream.fromList [2,1,1,3]) (Stream.fromList [1,2,2,4])
-- [2,1,1]
--
-- 'intersectBy' is similar to but not the same as 'innerJoin':
--
-- >>> Stream.toList $ fmap fst $ Stream.innerJoin (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])
-- [1,1,2,2]
--
-- Space: O(n) where @n@ is the number of elements in the second stream.
--
-- Time: O(m x n) where @m@ is the number of elements in the first stream and
-- @n@ is the number of elements in the second stream.
--
-- /Pre-release/
{-# INLINE intersectBy #-}
intersectBy :: (IsStream t, Monad m) =>
    (a -> a -> Bool) -> t m a -> t m a -> t m a
intersectBy :: (a -> a -> Bool) -> t m a -> t m a -> t m a
intersectBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
    m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
        (m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
            -- This may work well when s2 is small
            [a]
xs <- SerialT m a -> m [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toListRev (SerialT m a -> m [a]) -> SerialT m a -> m [a]
forall a b. (a -> b) -> a -> b
$ (a -> a -> Bool) -> SerialT m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
(a -> a -> Bool) -> t m a -> t m a
Stream.uniqBy a -> a -> Bool
eq (SerialT m a -> SerialT m a) -> SerialT m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ t m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
StreamK.adapt t m a
s2
            t m a -> m (t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (t m a -> m (t m a)) -> t m a -> m (t m a)
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter (\a
x -> (a -> Bool) -> [a] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
List.any (a -> a -> Bool
eq a
x) [a]
xs) t m a
s1

-- | Like 'intersectBy' but works only on sorted streams.
--
-- Space: O(1)
--
-- Time: O(m+n)
--
-- /Unimplemented/
{-# INLINE mergeIntersectBy #-}
mergeIntersectBy :: -- (IsStream t, Monad m) =>
    (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeIntersectBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeIntersectBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = t m a
forall a. HasCallStack => a
undefined

-- Roughly leftJoin s1 s2 = s1 `difference` s2 + s1 `intersection` s2

-- | Delete first occurrences of those elements from the first stream that are
-- present in the second stream. If an element occurs multiple times in the
-- second stream as many occurrences of it are deleted from the first stream.
--
-- >>> Stream.toList $ Stream.differenceBy (==) (Stream.fromList [1,2,2]) (Stream.fromList [1,2,3])
-- [2]
--
-- The following laws hold:
--
-- @
-- (s1 `serial` s2) `differenceBy eq` s1 === s2
-- (s1 `wSerial` s2) `differenceBy eq` s1 === s2
-- @
--
-- Same as the list 'Data.List.//' operation.
--
-- Space: O(m) where @m@ is the number of elements in the first stream.
--
-- Time: O(m x n) where @m@ is the number of elements in the first stream and
-- @n@ is the number of elements in the second stream.
--
-- /Pre-release/
{-# INLINE differenceBy #-}
differenceBy :: (IsStream t, Monad m) =>
    (a -> a -> Bool) -> t m a -> t m a -> t m a
differenceBy :: (a -> a -> Bool) -> t m a -> t m a -> t m a
differenceBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
    m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
        (m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
            -- This may work well if s1 is small
            -- If s1 is big we can go through s1, deleting elements from s2 and
            -- not emitting an element if it was successfully deleted from s2.
            -- we will need a deleteBy that can return whether the element was
            -- deleted or not.
            [a]
xs <- SerialT m a -> m [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toList (SerialT m a -> m [a]) -> SerialT m a -> m [a]
forall a b. (a -> b) -> a -> b
$ t m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
StreamK.adapt t m a
s1
            ([a] -> t m a) -> m [a] -> m (t m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [a] -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
fromList (m [a] -> m (t m a)) -> m [a] -> m (t m a)
forall a b. (a -> b) -> a -> b
$ ([a] -> a -> [a]) -> [a] -> t m a -> m [a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) b a.
(Monad m, IsStream t) =>
(b -> a -> b) -> b -> t m a -> m b
foldl' ((a -> [a] -> [a]) -> [a] -> a -> [a]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq)) [a]
xs t m a
s2

-- | Like 'differenceBy' but works only on sorted streams.
--
-- Space: O(1)
--
-- /Unimplemented/
{-# INLINE mergeDifferenceBy #-}
mergeDifferenceBy :: -- (IsStream t, Monad m) =>
    (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = t m a
forall a. HasCallStack => a
undefined

-- | This is essentially an append operation that appends all the extra
-- occurrences of elements from the second stream that are not already present
-- in the first stream.
--
-- >>> Stream.toList $ Stream.unionBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [1,1,2,3])
-- [1,2,2,4,3]
--
-- Equivalent to the following except that @s1@ is evaluated only once:
--
-- @
-- unionBy eq s1 s2 = s1 \`serial` (s2 `differenceBy eq` s1)
-- @
--
-- Similar to 'outerJoin' but not the same.
--
-- Space: O(n)
--
-- Time: O(m x n)
--
-- /Pre-release/
{-# INLINE unionBy #-}
unionBy :: (IsStream t, MonadAsync m, Semigroup (t m a)) =>
    (a -> a -> Bool) -> t m a -> t m a -> t m a
unionBy :: (a -> a -> Bool) -> t m a -> t m a -> t m a
unionBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
    m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
        (m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
            [a]
xs <- SerialT m a -> m [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toList (SerialT m a -> m [a]) -> SerialT m a -> m [a]
forall a b. (a -> b) -> a -> b
$ t m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
StreamK.adapt t m a
s2
            -- XXX we can use postscanlMAfter' instead of IORef
            IORef [a]
ref <- IO (IORef [a]) -> m (IORef [a])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef [a]) -> m (IORef [a]))
-> IO (IORef [a]) -> m (IORef [a])
forall a b. (a -> b) -> a -> b
$ [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef ([a] -> IO (IORef [a])) -> [a] -> IO (IORef [a])
forall a b. (a -> b) -> a -> b
$! (a -> a -> Bool) -> [a] -> [a]
forall a. (a -> a -> Bool) -> [a] -> [a]
List.nubBy a -> a -> Bool
eq [a]
xs
            let f :: a -> m a
f a
x = do
                    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef [a] -> ([a] -> [a]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [a]
ref ((a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq a
x)
                    a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
                s3 :: t m a
s3 = m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
                        (m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
                            [a]
xs1 <- IO [a] -> m [a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [a] -> m [a]) -> IO [a] -> m [a]
forall a b. (a -> b) -> a -> b
$ IORef [a] -> IO [a]
forall a. IORef a -> IO a
readIORef IORef [a]
ref
                            t m a -> m (t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (t m a -> m (t m a)) -> t m a -> m (t m a)
forall a b. (a -> b) -> a -> b
$ [a] -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
fromList [a]
xs1
            t m a -> m (t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (t m a -> m (t m a)) -> t m a -> m (t m a)
forall a b. (a -> b) -> a -> b
$ (a -> m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
Stream.mapM a -> m a
forall (m :: * -> *). MonadIO m => a -> m a
f t m a
s1 t m a -> t m a -> t m a
forall a. Semigroup a => a -> a -> a
<> t m a
s3

-- | Like 'unionBy' but works only on sorted streams.
--
-- Space: O(1)
--
-- /Unimplemented/
{-# INLINE mergeUnionBy #-}
mergeUnionBy :: -- (IsStream t, Monad m) =>
    (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeUnionBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeUnionBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = t m a
forall a. HasCallStack => a
undefined