-- |
-- Module      : Streamly.Internal.Data.Stream.IsStream.Top
-- Copyright   : (c) 2020 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Top level IsStream module that can use all other lower level IsStream
-- modules.

module Streamly.Internal.Data.Stream.IsStream.Top
    (
    -- * Transformation
    -- ** Sampling
    -- | Value agnostic filtering.
      sampleFromThen
    , sampleIntervalStart
    , sampleIntervalEnd
    , sampleBurstStart
    , sampleBurstEnd

    -- ** Reordering
    , sortBy

    -- * Nesting
    -- ** Set like operations
    -- | These are not exactly set operations because streams are not
    -- necessarily sets, they may have duplicated elements.
    , intersectBy
    , intersectBySorted
    , differenceBy
    , mergeDifferenceBy
    , unionBy
    , mergeUnionBy

    -- ** Join operations
    , crossJoin
    , joinInner
    , joinInnerMap
    , joinInnerMerge
    , joinLeft
    , mergeLeftJoin
    , joinLeftMap
    , joinOuter
    , mergeOuterJoin
    , joinOuterMap
    )
where

#include "inline.hs"

import Control.Monad.Catch (MonadCatch)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.State.Strict (get, put)
-- import Data.Hashable (Hashable)
import Data.IORef (newIORef, readIORef, modifyIORef')
import Data.Kind (Type)
#if !(MIN_VERSION_base(4,11,0))
import Data.Semigroup (Semigroup(..))
#endif
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.IsStream.Common (concatM)
import Streamly.Internal.Data.Stream.IsStream.Type
    (IsStream(..), adapt, foldl', fromList)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Time.Units (NanoSecond64(..), toRelTime64)

import qualified Data.List as List
import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Array as Array
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Parser as Parser
import qualified Streamly.Internal.Data.Stream.IsStream.Lift as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Eliminate as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Generate as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Expand as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Reduce as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Transform as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream.StreamD as StreamD

import Prelude hiding (filter, zipWith, concatMap, concat)

-- $setup
-- >>> :m
-- >>> import Prelude hiding (filter, zipWith, concatMap, concat)
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import qualified Streamly.Internal.Data.Stream.IsStream as Stream

------------------------------------------------------------------------------
-- Sampling
------------------------------------------------------------------------------

-- XXX We can implement this using addition instead of "mod" to make it more
-- efficient.
--
-- | @sampleFromthen offset stride@ samples the element at @offset@ index and
-- then every element at strides of @stride@.
--
-- >>> Stream.toList $ Stream.sampleFromThen 2 3 $ Stream.enumerateFromTo 0 10
-- [2,5,8]
--
-- /Pre-release/
--
{-# INLINE sampleFromThen #-}
sampleFromThen :: (IsStream t, Monad m, Functor (t m)) =>
    Int -> Int -> t m a -> t m a
sampleFromThen :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
Int -> Int -> t m a -> t m a
sampleFromThen Int
offset Int
stride =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b s.
Functor (t m) =>
(t m a -> t m (s, a))
-> (((s, a) -> b) -> t m (s, a) -> t m (s, a))
-> ((s, a) -> b)
-> t m a
-> t m a
Stream.with forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m (Int, a)
Stream.indexed forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter
        (\(Int
i, a
_) -> Int
i forall a. Ord a => a -> a -> Bool
>= Int
offset Bool -> Bool -> Bool
&& (Int
i forall a. Num a => a -> a -> a
- Int
offset) forall a. Integral a => a -> a -> a
`mod` Int
stride forall a. Eq a => a -> a -> Bool
== Int
0)

-- | Continuously evaluate the input stream and sample the last event in time
-- window of @n@ seconds.
--
-- This is also known as @throttle@ in some libraries.
--
-- @
-- sampleIntervalEnd n = Stream.catMaybes . Stream.intervalsOf n Fold.last
-- @
--
-- /Pre-release/
--
{-# INLINE sampleIntervalEnd #-}
sampleIntervalEnd :: (IsStream t, MonadAsync m, Functor (t m)) =>
    Double -> t m a -> t m a
sampleIntervalEnd :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalEnd Double
n = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
Stream.intervalsOf Double
n forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.last

-- | Like 'sampleInterval' but samples at the beginning of the time window.
--
-- @
-- sampleIntervalStart n = Stream.catMaybes . Stream.intervalsOf n Fold.head
-- @
--
-- /Pre-release/
--
{-# INLINE sampleIntervalStart #-}
sampleIntervalStart :: (IsStream t, MonadAsync m, Functor (t m)) =>
    Double -> t m a -> t m a
sampleIntervalStart :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalStart Double
n = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
Stream.intervalsOf Double
n forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.head

-- | Sample one event at the end of each burst of events.  A burst is a group
-- of events close together in time, it ends when an event is spaced by more
-- than the specified time interval from the previous event.
--
-- This is known as @debounce@ in some libraries.
--
-- The clock granularity is 10 ms.
--
-- /Pre-release/
--
{-# INLINE sampleBurstEnd #-}
sampleBurstEnd :: (IsStream t, MonadAsync m, Functor (t m)) =>
    Double -> t m a -> t m a
sampleBurstEnd :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstEnd Double
gap =
    let f :: (RelTime64, b) -> (RelTime64, b) -> Bool
f (RelTime64
t1, b
_) (RelTime64
t2, b
_) =
            RelTime64
t2 forall a. Num a => a -> a -> a
- RelTime64
t1 forall a. Ord a => a -> a -> Bool
>= forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 (Int64 -> NanoSecond64
NanoSecond64 (forall a b. (RealFrac a, Integral b) => a -> b
round (Double
gap forall a. Num a => a -> a -> a
* Double
10forall a b. (Num a, Integral b) => a -> b -> a
^(Int
9::Int))))
    in forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map forall a b. (a, b) -> b
snd
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
Stream.groupsByRolling forall {b} {b}. (RelTime64, b) -> (RelTime64, b) -> Bool
f forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.last
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (RelTime64, a)
Stream.timeIndexed

-- | Like 'sampleBurstEnd' but samples the event at the beginning of the burst
-- instead of at the end of it.
--
-- /Pre-release/
--
{-# INLINE sampleBurstStart #-}
sampleBurstStart :: (IsStream t, MonadAsync m, Functor (t m)) =>
    Double -> t m a -> t m a
sampleBurstStart :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstStart Double
gap =
    let f :: (RelTime64, b) -> (RelTime64, b) -> Bool
f (RelTime64
t1, b
_) (RelTime64
t2, b
_) =
            RelTime64
t2 forall a. Num a => a -> a -> a
- RelTime64
t1 forall a. Ord a => a -> a -> Bool
>= forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 (Int64 -> NanoSecond64
NanoSecond64 (forall a b. (RealFrac a, Integral b) => a -> b
round (Double
gap forall a. Num a => a -> a -> a
* Double
10forall a b. (Num a, Integral b) => a -> b -> a
^(Int
9::Int))))
    in forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map forall a b. (a, b) -> b
snd
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
Stream.groupsByRolling forall {b} {b}. (RelTime64, b) -> (RelTime64, b) -> Bool
f forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.head
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (RelTime64, a)
Stream.timeIndexed

------------------------------------------------------------------------------
-- Reordering
------------------------------------------------------------------------------
--
-- We could possibly choose different algorithms depending on whether the
-- input stream is almost sorted (ascending/descending) or random. We could
-- serialize the stream to an array and use quicksort.
--
-- | Sort the input stream using a supplied comparison function.
--
-- /O(n) space/
--
-- Note: this is not the fastest possible implementation as of now.
--
-- /Pre-release/
--
{-# INLINE sortBy #-}
sortBy :: MonadCatch m => (a -> a -> Ordering) -> SerialT m a -> SerialT m a
-- sortBy f = Stream.concatPairsWith (Stream.mergeBy f) Stream.fromPure
sortBy :: forall (m :: * -> *) a.
MonadCatch m =>
(a -> a -> Ordering) -> SerialT m a -> SerialT m a
sortBy a -> a -> Ordering
cmp =
    let p :: Parser m a (Either (SerialT n a) (SerialT n a))
p =
            forall (m :: * -> *) a b c.
MonadCatch m =>
(a -> a -> Bool)
-> Fold m a b -> Fold m a c -> Parser m a (Either b c)
Parser.groupByRollingEither
                (\a
x -> (forall a. Ord a => a -> a -> Bool
< Ordering
GT) forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a -> Ordering
cmp a
x)
                forall (m :: * -> *) a (n :: * -> *).
Monad m =>
Fold m a (SerialT n a)
Fold.toStreamRev
                forall (m :: * -> *) a (n :: * -> *).
Monad m =>
Fold m a (SerialT n a)
Fold.toStream
     in   forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
Stream.concatPairsWith (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
Stream.mergeBy a -> a -> Ordering
cmp) forall a. a -> a
id
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadThrow m) =>
Parser m a b -> t m a -> t m b
Stream.parseMany (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. a -> a
id forall a. a -> a
id) forall {n :: * -> *} {n :: * -> *}.
Parser m a (Either (SerialT n a) (SerialT n a))
p)

------------------------------------------------------------------------------
-- SQL Joins
------------------------------------------------------------------------------
--
-- Some references:
-- * https://en.wikipedia.org/wiki/Relational_algebra
-- * https://en.wikipedia.org/wiki/Join_(SQL)

-- TODO: OrdSet/IntSet/hashmap based versions of these. With Eq only
-- constraint, the best would be to use an Array with linear search. If the
-- second stream is sorted we can also use a binary search, using Ord
-- constraint or an ordering function.
--
-- For Storables we can cache the second stream into an unboxed array for
-- possibly faster access/compact representation?
--
-- If we do not want to keep the stream in memory but always read it from the
-- source (disk/network) every time we iterate through it then we can do that
-- too by reading the stream every time, the stream must have immutable state
-- in that case and the user is responsible for the behavior if the stream
-- source changes during iterations. We can also use an Unfold instead of
-- stream. We probably need a way to distinguish streams that can be read
-- mutliple times without any interference (e.g. unfolding a stream using an
-- immutable handle would work i.e. using pread/pwrite instead of maintianing
-- an offset in the handle).

-- XXX We can do this concurrently.
--
-- | This is the same as 'Streamly.Internal.Data.Unfold.outerProduct' but less
-- efficient.
--
-- The second stream is evaluated multiple times. If the second stream is
-- consume-once stream then it can be cached in an 'Data.Array.Array' before
-- calling this function. Caching may also improve performance if the stream is
-- expensive to evaluate.
--
-- Time: O(m x n)
--
-- /Pre-release/
{-# INLINE crossJoin #-}
crossJoin :: Monad (t m) => t m a -> t m b -> t m (a, b)
crossJoin :: forall (t :: * -> * -> *) m a b.
Monad (t m) =>
t m a -> t m b -> t m (a, b)
crossJoin t m a
s1 t m b
s2 = do
    -- XXX use concatMap instead?
    a
a <- t m a
s1
    b
b <- t m b
s2
    forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, b
b)

-- XXX We can do this concurrently.
-- XXX If the second stream is sorted and passed as an Array we could use
-- binary search if we have an Ord instance or Ordering returning function. The
-- time complexity would then become (m x log n).
--
-- | For all elements in @t m a@, for all elements in @t m b@ if @a@ and @b@
-- are equal by the given equality pedicate then return the tuple (a, b).
--
-- The second stream is evaluated multiple times. If the stream is a
-- consume-once stream then the caller should cache it (e.g. in a
-- 'Data.Array.Array') before calling this function. Caching may also improve
-- performance if the stream is expensive to evaluate.
--
-- For space efficiency use the smaller stream as the second stream.
--
-- You should almost always use joinInnerMap instead of joinInner. joinInnerMap
-- is an order of magnitude faster. joinInner may be used when the second
-- stream is generated from a seed, therefore, need not be stored in memory and
-- the amount of memory it takes is a concern.
--
-- Space: O(n) assuming the second stream is cached in memory.
--
-- Time: O(m x n)
--
-- /Pre-release/
{-# INLINE joinInner #-}
joinInner ::
    forall (t :: (Type -> Type) -> Type -> Type) m a b.
    (IsStream t, Monad m) =>
        (a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
joinInner :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
joinInner a -> b -> Bool
eq t m a
s1 t m b
s2 = do
    -- ConcatMap works faster than bind
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
Stream.concatMap (\a
a ->
        forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
Stream.concatMap (\b
b ->
            if a
a a -> b -> Bool
`eq` b
b
            then forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
Stream.fromPure (a
a, b
b)
            else forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
            ) t m b
s2
        ) t m a
s1

-- XXX Generate error if a duplicate insertion is attempted?
toMap ::  (Monad m, Ord k) => IsStream.SerialT m (k, v) -> m (Map.Map k v)
toMap :: forall (m :: * -> *) k v.
(Monad m, Ord k) =>
SerialT m (k, v) -> m (Map k v)
toMap = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
Stream.foldl' (\Map k v
kv (k
k, v
b) -> forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k v
b Map k v
kv) forall k a. Map k a
Map.empty

-- If the second stream is too big it can be partitioned based on hashes and
-- then we can process one parition at a time.
--
-- XXX An IntMap may be faster when the keys are Int.
-- XXX Use hashmap instead of map?
--
-- | Like 'joinInner' but uses a 'Map' for efficiency.
--
-- If the input streams have duplicate keys, the behavior is undefined.
--
-- For space efficiency use the smaller stream as the second stream.
--
-- Space: O(n)
--
-- Time: O(m + n)
--
-- /Pre-release/
{-# INLINE joinInnerMap #-}
joinInnerMap :: (IsStream t, Monad m, Ord k) =>
    t m (k, a) -> t m (k, b) -> t m (k, a, b)
joinInnerMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, Monad m, Ord k) =>
t m (k, a) -> t m (k, b) -> t m (k, a, b)
joinInnerMap t m (k, a)
s1 t m (k, b)
s2 =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM forall a b. (a -> b) -> a -> b
$ do
        Map k b
km <- forall (m :: * -> *) k v.
(Monad m, Ord k) =>
SerialT m (k, v) -> m (Map k v)
toMap forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2
        forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
Stream.mapMaybe (forall {a} {c} {b}. Ord a => Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map k b
km) t m (k, a)
s1

    where

    joinAB :: Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map a c
kvm (a
k, b
a) =
        case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a c
kvm of
            Just c
b -> forall a. a -> Maybe a
Just (a
k, b
a, c
b)
            Maybe c
Nothing -> forall a. Maybe a
Nothing

-- | Like 'joinInner' but works only on sorted streams.
--
-- Space: O(1)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE joinInnerMerge #-}
joinInnerMerge :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
joinInnerMerge :: forall a b (t :: * -> * -> *) m.
(a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
joinInnerMerge = forall a. HasCallStack => a
undefined

-- XXX We can do this concurrently.
-- XXX If the second stream is sorted and passed as an Array or a seek capable
-- stream then we could use binary search if we have an Ord instance or
-- Ordering returning function. The time complexity would then become (m x log
-- n).
--
-- | For all elements in @t m a@, for all elements in @t m b@ if @a@ and @b@
-- are equal then return the tuple @(a, Just b)@.  If @a@ is not present in @t
-- m b@ then return @(a, Nothing)@.
--
-- The second stream is evaluated multiple times. If the stream is a
-- consume-once stream then the caller should cache it in an 'Data.Array.Array'
-- before calling this function. Caching may also improve performance if the
-- stream is expensive to evaluate.
--
-- @
-- rightJoin = flip joinLeft
-- @
--
-- Space: O(n) assuming the second stream is cached in memory.
--
-- Time: O(m x n)
--
-- /Unimplemented/
{-# INLINE joinLeft #-}
joinLeft :: Monad m =>
    (a -> b -> Bool) -> SerialT m a -> SerialT m b -> SerialT m (a, Maybe b)
joinLeft :: forall (m :: * -> *) a b.
Monad m =>
(a -> b -> Bool)
-> SerialT m a -> SerialT m b -> SerialT m (a, Maybe b)
joinLeft a -> b -> Bool
eq SerialT m a
s1 SerialT m b
s2 = forall (m :: * -> *) s a.
Monad m =>
m s -> SerialT (StateT s m) a -> SerialT m a
Stream.evalStateT (forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) forall a b. (a -> b) -> a -> b
$ do
    a
a <- forall (m :: * -> *) (t :: (* -> *) -> * -> *)
       (tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m a
s1
    -- XXX should we use StreamD monad here?
    -- XXX Is there a better way to perform some action at the end of a loop
    -- iteration?
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False
    let final :: SerialT (StateT Bool m) (Maybe a)
final = do
            Bool
r <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall (m :: * -> *) s. Monad m => StateT s m s
get
            if Bool
r
            then forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
            else forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
Stream.fromPure forall a. Maybe a
Nothing
    Maybe b
b <- forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. a -> Maybe a
Just (forall (m :: * -> *) (t :: (* -> *) -> * -> *)
       (tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m b
s2) forall a. Semigroup a => a -> a -> a
<> forall {a}. SerialT (StateT Bool m) (Maybe a)
final
    case Maybe b
b of
        Just b
b1 ->
            if a
a a -> b -> Bool
`eq` b
b1
            then do
                forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True
                forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. a -> Maybe a
Just b
b1)
            else forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
        Maybe b
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. Maybe a
Nothing)

-- | Like 'joinLeft' but uses a hashmap for efficiency.
--
-- Space: O(n)
--
-- Time: O(m + n)
--
-- /Pre-release/
{-# INLINE joinLeftMap #-}
joinLeftMap :: (IsStream t, Ord k, Monad m) =>
    t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b)
joinLeftMap :: forall (t :: (* -> *) -> * -> *) k (m :: * -> *) a b.
(IsStream t, Ord k, Monad m) =>
t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b)
joinLeftMap t m (k, a)
s1 t m (k, b)
s2 =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM forall a b. (a -> b) -> a -> b
$ do
        Map k b
km <- forall (m :: * -> *) k v.
(Monad m, Ord k) =>
SerialT m (k, v) -> m (Map k v)
toMap forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (forall {a} {a} {b}. Ord a => Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map k b
km) t m (k, a)
s1

            where

            joinAB :: Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map a a
km (a
k, b
a) =
                case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
                    Just a
b -> (a
k, b
a, forall a. a -> Maybe a
Just a
b)
                    Maybe a
Nothing -> (a
k, b
a, forall a. Maybe a
Nothing)

-- | Like 'joinLeft' but works only on sorted streams.
--
-- Space: O(1)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE mergeLeftJoin #-}
mergeLeftJoin :: -- Monad m =>
    (a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b)
mergeLeftJoin :: forall a b (t :: * -> * -> *) m.
(a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b)
mergeLeftJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = forall a. HasCallStack => a
undefined

-- XXX We can do this concurrently.
--
-- | For all elements in @t m a@, for all elements in @t m b@ if @a@ and @b@
-- are equal by the given equality pedicate then return the tuple (Just a, Just
-- b).  If @a@ is not found in @t m b@ then return (a, Nothing), return
-- (Nothing, b) for vice-versa.
--
-- For space efficiency use the smaller stream as the second stream.
--
-- Space: O(n)
--
-- Time: O(m x n)
--
-- /Unimplemented/
{-# INLINE joinOuter #-}
joinOuter :: MonadIO m =>
       (a -> b -> Bool)
    -> SerialT m a
    -> SerialT m b
    -> SerialT m (Maybe a, Maybe b)
joinOuter :: forall (m :: * -> *) a b.
MonadIO m =>
(a -> b -> Bool)
-> SerialT m a -> SerialT m b -> SerialT m (Maybe a, Maybe b)
joinOuter a -> b -> Bool
eq SerialT m a
s1 SerialT m b
s =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM forall a b. (a -> b) -> a -> b
$ do
        Array (b, Bool)
arr <- forall (m :: * -> *) a. MonadIO m => SerialT m a -> m (Array a)
Array.fromStream forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (,Bool
False) SerialT m b
s
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall {b}. Array (b, b) -> SerialT m (Maybe a, Maybe b)
go Array (b, Bool)
arr forall a. Semigroup a => a -> a -> a
<> forall {a} {a}. Array (a, Bool) -> SerialT m (Maybe a, Maybe a)
leftOver Array (b, Bool)
arr

    where

    leftOver :: Array (a, Bool) -> SerialT m (Maybe a, Maybe a)
leftOver =
            forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\(a
x, Bool
_) -> (forall a. Maybe a
Nothing, forall a. a -> Maybe a
Just a
x))
                forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter (Bool -> Bool
not forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> b
snd)
                forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => Array a -> SerialT m a
Array.toStream

    go :: Array (b, b) -> SerialT m (Maybe a, Maybe b)
go Array (b, b)
arr = forall (m :: * -> *) s a.
Monad m =>
m s -> SerialT (StateT s m) a -> SerialT m a
Stream.evalStateT (forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) forall a b. (a -> b) -> a -> b
$ do
        a
a <- forall (m :: * -> *) (t :: (* -> *) -> * -> *)
       (tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m a
s1
        -- XXX should we use StreamD monad here?
        -- XXX Is there a better way to perform some action at the end of a loop
        -- iteration?
        forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False
        let final :: SerialT (StateT Bool m) (Maybe a)
final = do
                Bool
r <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall (m :: * -> *) s. Monad m => StateT s m s
get
                if Bool
r
                then forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
                else forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
Stream.fromPure forall a. Maybe a
Nothing
        (Int
_i, Maybe (b, b)
b) <-
            let stream :: SerialT m (b, b)
stream = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
SerialT m a -> t m a
IsStream.fromSerial forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => Array a -> SerialT m a
Array.toStream Array (b, b)
arr
             in forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m (Int, a)
Stream.indexed forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. a -> Maybe a
Just (forall (m :: * -> *) (t :: (* -> *) -> * -> *)
       (tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m (b, b)
stream) forall a. Semigroup a => a -> a -> a
<> forall {a}. SerialT (StateT Bool m) (Maybe a)
final
        case Maybe (b, b)
b of
            Just (b
b1, b
_used) ->
                if a
a a -> b -> Bool
`eq` b
b1
                then do
                    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True
                    -- XXX Need to use a mutable array
                    -- when (not used) $ Array.writeIndex i True
                    forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a, forall a. a -> Maybe a
Just b
b1)
                else forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
            Maybe (b, b)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a, forall a. Maybe a
Nothing)

-- Put the b's that have been paired, in another hash or mutate the hash to set
-- a flag. At the end go through @t m b@ and find those that are not in that
-- hash to return (Nothing, b).
--
-- | Like 'joinOuter' but uses a 'Map' for efficiency.
--
-- Space: O(m + n)
--
-- Time: O(m + n)
--
-- /Pre-release/
{-# INLINE joinOuterMap #-}
joinOuterMap ::
    (IsStream t, Ord k, MonadIO m) =>
    t m (k, a) -> t m (k, b) -> t m (k, Maybe a, Maybe b)
joinOuterMap :: forall (t :: (* -> *) -> * -> *) k (m :: * -> *) a b.
(IsStream t, Ord k, MonadIO m) =>
t m (k, a) -> t m (k, b) -> t m (k, Maybe a, Maybe b)
joinOuterMap t m (k, a)
s1 t m (k, b)
s2 =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM forall a b. (a -> b) -> a -> b
$ do
        Map k a
km1 <- forall {a}. SerialT m (k, a) -> m (Map k a)
kvFold forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, a)
s1
        Map k b
km2 <- forall {a}. SerialT m (k, a) -> m (Map k a)
kvFold forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2

        -- XXX Not sure if toList/fromList would fuse optimally. We may have to
        -- create a fused Map.toStream function.
        let res1 :: t m (k, Maybe a, Maybe b)
res1 = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (forall {a} {a} {a}.
Ord a =>
Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map k b
km2) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
Stream.fromList forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
Map.toList Map k a
km1
                    where
                    joinAB :: Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
a) =
                        case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
                            Just a
b -> (a
k, forall a. a -> Maybe a
Just a
a, forall a. a -> Maybe a
Just a
b)
                            Maybe a
Nothing -> (a
k, forall a. a -> Maybe a
Just a
a, forall a. Maybe a
Nothing)

        -- XXX We can take advantage of the lookups in the first pass above to
        -- reduce the number of lookups in this pass. If we keep mutable cells
        -- in the second Map, we can flag it in the first pass and not do any
        -- lookup in the second pass if it is flagged.
        let res2 :: t m (k, Maybe a, Maybe b)
res2 = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
Stream.mapMaybe (forall {a} {a} {a} {a}.
Ord a =>
Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map k a
km1) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
Stream.fromList forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
Map.toList Map k b
km2
                    where
                    joinAB :: Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
b) =
                        case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
                            Just a
_ -> forall a. Maybe a
Nothing
                            Maybe a
Nothing -> forall a. a -> Maybe a
Just (a
k, forall a. Maybe a
Nothing, forall a. a -> Maybe a
Just a
b)

        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
Stream.serial t m (k, Maybe a, Maybe b)
res1 forall {a}. t m (k, Maybe a, Maybe b)
res2

        where

        -- XXX Generate error if a duplicate insertion is attempted?
        kvFold :: SerialT m (k, a) -> m (Map k a)
kvFold = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
Stream.foldl' (\Map k a
kv (k
k, a
b) -> forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
b Map k a
kv) forall k a. Map k a
Map.empty

-- | Like 'joinOuter' but works only on sorted streams.
--
-- Space: O(1)
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE mergeOuterJoin #-}
mergeOuterJoin :: -- Monad m =>
    (a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
mergeOuterJoin :: forall a b (t :: * -> * -> *) m.
(a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
mergeOuterJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = forall a. HasCallStack => a
undefined

------------------------------------------------------------------------------
-- Set operations (special joins)
------------------------------------------------------------------------------
--
-- TODO: OrdSet/IntSet/hashmap based versions of these. With Eq only constraint
-- the best would be to use an Array with linear search. If the second stream
-- is sorted we can also use a binary search, using Ord constraint.

-- | 'intersectBy' is essentially a filtering operation that retains only those
-- elements in the first stream that are present in the second stream.
--
-- >>> Stream.toList $ Stream.intersectBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])
-- [1,2,2]
--
-- >>> Stream.toList $ Stream.intersectBy (==) (Stream.fromList [2,1,1,3]) (Stream.fromList [1,2,2,4])
-- [2,1,1]
--
-- 'intersectBy' is similar to but not the same as 'joinInner':
--
-- >>> Stream.toList $ fmap fst $ Stream.joinInner (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])
-- [1,1,2,2]
--
-- Space: O(n) where @n@ is the number of elements in the second stream.
--
-- Time: O(m x n) where @m@ is the number of elements in the first stream and
-- @n@ is the number of elements in the second stream.
--
-- /Pre-release/
{-# INLINE intersectBy #-}
intersectBy :: (IsStream t, Monad m) =>
    (a -> a -> Bool) -> t m a -> t m a -> t m a
intersectBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
intersectBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
        forall a b. (a -> b) -> a -> b
$ do
            -- This may work well when s2 is small
            [a]
xs <- forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toListRev forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
(a -> a -> Bool) -> t m a -> t m a
Stream.uniqBy a -> a -> Bool
eq forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s2
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter (\a
x -> forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
List.any (a -> a -> Bool
eq a
x) [a]
xs) t m a
s1

-- | Like 'intersectBy' but works only on streams sorted in ascending order.
--
-- Space: O(1)
--
-- Time: O(m+n)
--
-- /Pre-release/
{-# INLINE intersectBySorted #-}
intersectBySorted :: (IsStream t, Monad m) =>
    (a -> a -> Ordering) -> t m a -> t m a -> t m a
intersectBySorted :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
intersectBySorted a -> a -> Ordering
eq t m a
s1 =
      forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
IsStream.fromStreamD
    forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
StreamD.intersectBySorted a -> a -> Ordering
eq (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD t m a
s1)
    forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD

-- Roughly joinLeft s1 s2 = s1 `difference` s2 + s1 `intersection` s2

-- | Delete first occurrences of those elements from the first stream that are
-- present in the second stream. If an element occurs multiple times in the
-- second stream as many occurrences of it are deleted from the first stream.
--
-- >>> Stream.toList $ Stream.differenceBy (==) (Stream.fromList [1,2,2]) (Stream.fromList [1,2,3])
-- [2]
--
-- The following laws hold:
--
-- @
-- (s1 `serial` s2) `differenceBy eq` s1 === s2
-- (s1 `wSerial` s2) `differenceBy eq` s1 === s2
-- @
--
-- Same as the list 'Data.List.//' operation.
--
-- Space: O(m) where @m@ is the number of elements in the first stream.
--
-- Time: O(m x n) where @m@ is the number of elements in the first stream and
-- @n@ is the number of elements in the second stream.
--
-- /Pre-release/
{-# INLINE differenceBy #-}
differenceBy :: (IsStream t, Monad m) =>
    (a -> a -> Bool) -> t m a -> t m a -> t m a
differenceBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
differenceBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
        forall a b. (a -> b) -> a -> b
$ do
            -- This may work well if s1 is small
            -- If s1 is big we can go through s1, deleting elements from s2 and
            -- not emitting an element if it was successfully deleted from s2.
            -- we will need a deleteBy that can return whether the element was
            -- deleted or not.
            [a]
xs <- forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toList forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s1
            forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
fromList forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> m b
foldl' (forall a b c. (a -> b -> c) -> b -> a -> c
flip (forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq)) [a]
xs t m a
s2

-- | Like 'differenceBy' but works only on sorted streams.
--
-- Space: O(1)
--
-- /Unimplemented/
{-# INLINE mergeDifferenceBy #-}
mergeDifferenceBy :: -- (IsStream t, Monad m) =>
    (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy :: forall a (t :: * -> * -> *) m.
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = forall a. HasCallStack => a
undefined

-- | This is essentially an append operation that appends all the extra
-- occurrences of elements from the second stream that are not already present
-- in the first stream.
--
-- >>> Stream.toList $ Stream.unionBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [1,1,2,3])
-- [1,2,2,4,3]
--
-- Equivalent to the following except that @s1@ is evaluated only once:
--
-- @
-- unionBy eq s1 s2 = s1 \`serial` (s2 `differenceBy eq` s1)
-- @
--
-- Similar to 'joinOuter' but not the same.
--
-- Space: O(n)
--
-- Time: O(m x n)
--
-- /Pre-release/
{-# INLINE unionBy #-}
unionBy :: (IsStream t, MonadAsync m, Semigroup (t m a)) =>
    (a -> a -> Bool) -> t m a -> t m a -> t m a
unionBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Semigroup (t m a)) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
unionBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
        forall a b. (a -> b) -> a -> b
$ do
            [a]
xs <- forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toList forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s2
            -- XXX we can use postscanlMAfter' instead of IORef
            IORef [a]
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef forall a b. (a -> b) -> a -> b
$! forall a. (a -> a -> Bool) -> [a] -> [a]
List.nubBy a -> a -> Bool
eq [a]
xs
            let f :: a -> m a
f a
x = do
                    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [a]
ref (forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq a
x)
                    forall (m :: * -> *) a. Monad m => a -> m a
return a
x
                s3 :: t m a
s3 = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
                        forall a b. (a -> b) -> a -> b
$ do
                            [a]
xs1 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef [a]
ref
                            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
fromList [a]
xs1
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
Stream.mapM forall {m :: * -> *}. MonadIO m => a -> m a
f t m a
s1 forall a. Semigroup a => a -> a -> a
<> t m a
s3

-- | Like 'unionBy' but works only on sorted streams.
--
-- Space: O(1)
--
-- /Unimplemented/
{-# INLINE mergeUnionBy #-}
mergeUnionBy :: -- (IsStream t, Monad m) =>
    (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeUnionBy :: forall a (t :: * -> * -> *) m.
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeUnionBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = forall a. HasCallStack => a
undefined