module Streamly.Internal.Data.Stream.IsStream.Top
(
sampleFromThen
, sampleIntervalStart
, sampleIntervalEnd
, sampleBurstStart
, sampleBurstEnd
, sortBy
, intersectBy
, mergeIntersectBy
, differenceBy
, mergeDifferenceBy
, unionBy
, mergeUnionBy
, crossJoin
, innerJoin
, mergeInnerJoin
, hashInnerJoin
, leftJoin
, mergeLeftJoin
, hashLeftJoin
, outerJoin
, mergeOuterJoin
, hashOuterJoin
)
where
#include "inline.hs"
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.State.Strict (get, put)
import Data.IORef (newIORef, readIORef, modifyIORef')
import Data.Kind (Type)
#if !(MIN_VERSION_base(4,11,0))
import Data.Semigroup (Semigroup(..))
#endif
import Streamly.Internal.Data.SVar (MonadAsync)
import Streamly.Internal.Data.Stream.IsStream.Common (concatM)
import Streamly.Internal.Data.Stream.Prelude (foldl', fromList)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamK (IsStream)
import Streamly.Internal.Data.Time.Units (NanoSecond64(..), toRelTime64)
import qualified Data.List as List
import qualified Streamly.Internal.Data.Array as Array
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Stream.IsStream.Lift as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Eliminate as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Generate as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Expand as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Reduce as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Transform as Stream
import qualified Streamly.Internal.Data.Stream.StreamK as StreamK
import Prelude hiding (filter, zipWith, concatMap, concat)
{-# INLINE sampleFromThen #-}
sampleFromThen :: (IsStream t, Monad m, Functor (t m)) =>
Int -> Int -> t m a -> t m a
sampleFromThen :: Int -> Int -> t m a -> t m a
sampleFromThen Int
offset Int
stride =
(t m a -> t m (Int, a))
-> (((Int, a) -> Bool) -> t m (Int, a) -> t m (Int, a))
-> ((Int, a) -> Bool)
-> t m a
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b s.
Functor (t m) =>
(t m a -> t m (s, a))
-> (((s, a) -> b) -> t m (s, a) -> t m (s, a))
-> ((s, a) -> b)
-> t m a
-> t m a
Stream.with t m a -> t m (Int, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m (Int, a)
Stream.indexed ((Int, a) -> Bool) -> t m (Int, a) -> t m (Int, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter
(\(Int
i, a
_) -> Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
offset Bool -> Bool -> Bool
&& (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
stride Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0)
{-# INLINE sampleIntervalEnd #-}
sampleIntervalEnd :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalEnd :: Double -> t m a -> t m a
sampleIntervalEnd Double
n = t m (Maybe a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes (t m (Maybe a) -> t m a)
-> (t m a -> t m (Maybe a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Fold m a (Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
Stream.intervalsOf Double
n Fold m a (Maybe a)
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.last
{-# INLINE sampleIntervalStart #-}
sampleIntervalStart :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalStart :: Double -> t m a -> t m a
sampleIntervalStart Double
n = t m (Maybe a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes (t m (Maybe a) -> t m a)
-> (t m a -> t m (Maybe a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Fold m a (Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
Stream.intervalsOf Double
n Fold m a (Maybe a)
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.head
{-# INLINE sampleBurstEnd #-}
sampleBurstEnd :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstEnd :: Double -> t m a -> t m a
sampleBurstEnd Double
gap =
let f :: (RelTime64, b) -> (RelTime64, b) -> Bool
f (RelTime64
t1, b
_) (RelTime64
t2, b
_) =
RelTime64
t2 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t1 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= NanoSecond64 -> RelTime64
forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 (Int64 -> NanoSecond64
NanoSecond64 (Double -> Int64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
gap Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
10Double -> Int -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^(Int
9::Int))))
in ((RelTime64, a) -> a) -> t m (RelTime64, a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (RelTime64, a) -> a
forall a b. (a, b) -> b
snd
(t m (RelTime64, a) -> t m a)
-> (t m a -> t m (RelTime64, a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m (Maybe (RelTime64, a)) -> t m (RelTime64, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes
(t m (Maybe (RelTime64, a)) -> t m (RelTime64, a))
-> (t m a -> t m (Maybe (RelTime64, a)))
-> t m a
-> t m (RelTime64, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((RelTime64, a) -> (RelTime64, a) -> Bool)
-> Fold m (RelTime64, a) (Maybe (RelTime64, a))
-> t m (RelTime64, a)
-> t m (Maybe (RelTime64, a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
Stream.groupsByRolling (RelTime64, a) -> (RelTime64, a) -> Bool
forall b b. (RelTime64, b) -> (RelTime64, b) -> Bool
f Fold m (RelTime64, a) (Maybe (RelTime64, a))
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.last
(t m (RelTime64, a) -> t m (Maybe (RelTime64, a)))
-> (t m a -> t m (RelTime64, a))
-> t m a
-> t m (Maybe (RelTime64, a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> t m (RelTime64, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (RelTime64, a)
Stream.timeIndexed
{-# INLINE sampleBurstStart #-}
sampleBurstStart :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstStart :: Double -> t m a -> t m a
sampleBurstStart Double
gap =
let f :: (RelTime64, b) -> (RelTime64, b) -> Bool
f (RelTime64
t1, b
_) (RelTime64
t2, b
_) =
RelTime64
t2 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t1 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= NanoSecond64 -> RelTime64
forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 (Int64 -> NanoSecond64
NanoSecond64 (Double -> Int64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
gap Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
10Double -> Int -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^(Int
9::Int))))
in ((RelTime64, a) -> a) -> t m (RelTime64, a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (RelTime64, a) -> a
forall a b. (a, b) -> b
snd
(t m (RelTime64, a) -> t m a)
-> (t m a -> t m (RelTime64, a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m (Maybe (RelTime64, a)) -> t m (RelTime64, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes
(t m (Maybe (RelTime64, a)) -> t m (RelTime64, a))
-> (t m a -> t m (Maybe (RelTime64, a)))
-> t m a
-> t m (RelTime64, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((RelTime64, a) -> (RelTime64, a) -> Bool)
-> Fold m (RelTime64, a) (Maybe (RelTime64, a))
-> t m (RelTime64, a)
-> t m (Maybe (RelTime64, a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
Stream.groupsByRolling (RelTime64, a) -> (RelTime64, a) -> Bool
forall b b. (RelTime64, b) -> (RelTime64, b) -> Bool
f Fold m (RelTime64, a) (Maybe (RelTime64, a))
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.head
(t m (RelTime64, a) -> t m (Maybe (RelTime64, a)))
-> (t m a -> t m (RelTime64, a))
-> t m a
-> t m (Maybe (RelTime64, a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> t m (RelTime64, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (RelTime64, a)
Stream.timeIndexed
{-# INLINE sortBy #-}
sortBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a
sortBy :: (a -> a -> Ordering) -> t m a -> t m a
sortBy a -> a -> Ordering
f = (t m a -> t m a -> t m a) -> (a -> t m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
Stream.concatPairsWith ((a -> a -> Ordering) -> t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
Stream.mergeBy a -> a -> Ordering
f) a -> t m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
Stream.fromPure
{-# INLINE crossJoin #-}
crossJoin :: Monad (t m) => t m a -> t m b -> t m (a, b)
crossJoin :: t m a -> t m b -> t m (a, b)
crossJoin t m a
s1 t m b
s2 = do
a
a <- t m a
s1
b
b <- t m b
s2
(a, b) -> t m (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, b
b)
{-# INLINE innerJoin #-}
innerJoin ::
forall (t :: (Type -> Type) -> Type -> Type) m a b.
(IsStream t, Monad (t m)) =>
(a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
innerJoin :: (a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
innerJoin a -> b -> Bool
eq t m a
s1 t m b
s2 = do
a
a <- t m a
s1
b
b <- t m b
s2
if a
a a -> b -> Bool
`eq` b
b
then (a, b) -> t m (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, b
b)
else t m (a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
StreamK.nil
{-# INLINE hashInnerJoin #-}
hashInnerJoin ::
(a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
hashInnerJoin :: (a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
hashInnerJoin = (a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
forall a. HasCallStack => a
undefined
{-# INLINE mergeInnerJoin #-}
mergeInnerJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
mergeInnerJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
mergeInnerJoin = (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
forall a. HasCallStack => a
undefined
{-# INLINE leftJoin #-}
leftJoin :: Monad m =>
(a -> b -> Bool) -> SerialT m a -> SerialT m b -> SerialT m (a, Maybe b)
leftJoin :: (a -> b -> Bool)
-> SerialT m a -> SerialT m b -> SerialT m (a, Maybe b)
leftJoin a -> b -> Bool
eq SerialT m a
s1 SerialT m b
s2 = m Bool
-> SerialT (StateT Bool m) (a, Maybe b) -> SerialT m (a, Maybe b)
forall (m :: * -> *) s a.
Monad m =>
m s -> SerialT (StateT s m) a -> SerialT m a
Stream.evalStateT (Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) (SerialT (StateT Bool m) (a, Maybe b) -> SerialT m (a, Maybe b))
-> SerialT (StateT Bool m) (a, Maybe b) -> SerialT m (a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
a
a <- SerialT m a -> SerialT (StateT Bool m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
(tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m a
s1
StateT Bool m () -> SerialT (StateT Bool m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (StateT Bool m () -> SerialT (StateT Bool m) ())
-> StateT Bool m () -> SerialT (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False
let final :: SerialT (StateT Bool m) (Maybe a)
final = do
Bool
r <- StateT Bool m Bool -> SerialT (StateT Bool m) Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift StateT Bool m Bool
forall (m :: * -> *) s. Monad m => StateT s m s
get
if Bool
r
then SerialT (StateT Bool m) (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
StreamK.nil
else Maybe a -> SerialT (StateT Bool m) (Maybe a)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
StreamK.fromPure Maybe a
forall a. Maybe a
Nothing
Maybe b
b <- (b -> Maybe b)
-> SerialT (StateT Bool m) b -> SerialT (StateT Bool m) (Maybe b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap b -> Maybe b
forall a. a -> Maybe a
Just (SerialT m b -> SerialT (StateT Bool m) b
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
(tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m b
s2) SerialT (StateT Bool m) (Maybe b)
-> SerialT (StateT Bool m) (Maybe b)
-> SerialT (StateT Bool m) (Maybe b)
forall a. Semigroup a => a -> a -> a
<> SerialT (StateT Bool m) (Maybe b)
forall a. SerialT (StateT Bool m) (Maybe a)
final
case Maybe b
b of
Just b
b1 ->
if a
a a -> b -> Bool
`eq` b
b1
then do
StateT Bool m () -> SerialT (StateT Bool m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (StateT Bool m () -> SerialT (StateT Bool m) ())
-> StateT Bool m () -> SerialT (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True
(a, Maybe b) -> SerialT (StateT Bool m) (a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, b -> Maybe b
forall a. a -> Maybe a
Just b
b1)
else SerialT (StateT Bool m) (a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
StreamK.nil
Maybe b
Nothing -> (a, Maybe b) -> SerialT (StateT Bool m) (a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Maybe b
forall a. Maybe a
Nothing)
{-# INLINE hashLeftJoin #-}
hashLeftJoin ::
(a -> b -> Bool) -> t m a -> t m b -> t m (a, Maybe b)
hashLeftJoin :: (a -> b -> Bool) -> t m a -> t m b -> t m (a, Maybe b)
hashLeftJoin = (a -> b -> Bool) -> t m a -> t m b -> t m (a, Maybe b)
forall a. HasCallStack => a
undefined
{-# INLINE mergeLeftJoin #-}
mergeLeftJoin ::
(a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b)
mergeLeftJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b)
mergeLeftJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = t m (a, Maybe b)
forall a. HasCallStack => a
undefined
{-# INLINE outerJoin #-}
outerJoin :: MonadIO m =>
(a -> b -> Bool)
-> SerialT m a
-> SerialT m b
-> SerialT m (Maybe a, Maybe b)
outerJoin :: (a -> b -> Bool)
-> SerialT m a -> SerialT m b -> SerialT m (Maybe a, Maybe b)
outerJoin a -> b -> Bool
eq SerialT m a
s1 SerialT m b
s =
m (SerialT m (Maybe a, Maybe b)) -> SerialT m (Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM (m (SerialT m (Maybe a, Maybe b)) -> SerialT m (Maybe a, Maybe b))
-> m (SerialT m (Maybe a, Maybe b)) -> SerialT m (Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
Array (b, Bool)
arr <- SerialT m (b, Bool) -> m (Array (b, Bool))
forall (m :: * -> *) a. MonadIO m => SerialT m a -> m (Array a)
Array.fromStream (SerialT m (b, Bool) -> m (Array (b, Bool)))
-> SerialT m (b, Bool) -> m (Array (b, Bool))
forall a b. (a -> b) -> a -> b
$ (b -> (b, Bool)) -> SerialT m b -> SerialT m (b, Bool)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (,Bool
False) SerialT m b
s
SerialT m (Maybe a, Maybe b) -> m (SerialT m (Maybe a, Maybe b))
forall (m :: * -> *) a. Monad m => a -> m a
return (SerialT m (Maybe a, Maybe b) -> m (SerialT m (Maybe a, Maybe b)))
-> SerialT m (Maybe a, Maybe b) -> m (SerialT m (Maybe a, Maybe b))
forall a b. (a -> b) -> a -> b
$ Array (b, Bool) -> SerialT m (Maybe a, Maybe b)
forall b. Array (b, b) -> SerialT m (Maybe a, Maybe b)
go Array (b, Bool)
arr SerialT m (Maybe a, Maybe b)
-> SerialT m (Maybe a, Maybe b) -> SerialT m (Maybe a, Maybe b)
forall a. Semigroup a => a -> a -> a
<> Array (b, Bool) -> SerialT m (Maybe a, Maybe b)
forall a a. Array (a, Bool) -> SerialT m (Maybe a, Maybe a)
leftOver Array (b, Bool)
arr
where
leftOver :: Array (a, Bool) -> SerialT m (Maybe a, Maybe a)
leftOver =
((a, Bool) -> (Maybe a, Maybe a))
-> SerialT m (a, Bool) -> SerialT m (Maybe a, Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\(a
x, Bool
_) -> (Maybe a
forall a. Maybe a
Nothing, a -> Maybe a
forall a. a -> Maybe a
Just a
x))
(SerialT m (a, Bool) -> SerialT m (Maybe a, Maybe a))
-> (Array (a, Bool) -> SerialT m (a, Bool))
-> Array (a, Bool)
-> SerialT m (Maybe a, Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((a, Bool) -> Bool) -> SerialT m (a, Bool) -> SerialT m (a, Bool)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter (Bool -> Bool
not (Bool -> Bool) -> ((a, Bool) -> Bool) -> (a, Bool) -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a, Bool) -> Bool
forall a b. (a, b) -> b
snd)
(SerialT m (a, Bool) -> SerialT m (a, Bool))
-> (Array (a, Bool) -> SerialT m (a, Bool))
-> Array (a, Bool)
-> SerialT m (a, Bool)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Array (a, Bool) -> SerialT m (a, Bool)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
Array a -> t m a
Array.toStream
go :: Array (b, b) -> SerialT m (Maybe a, Maybe b)
go Array (b, b)
arr = m Bool
-> SerialT (StateT Bool m) (Maybe a, Maybe b)
-> SerialT m (Maybe a, Maybe b)
forall (m :: * -> *) s a.
Monad m =>
m s -> SerialT (StateT s m) a -> SerialT m a
Stream.evalStateT (Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) (SerialT (StateT Bool m) (Maybe a, Maybe b)
-> SerialT m (Maybe a, Maybe b))
-> SerialT (StateT Bool m) (Maybe a, Maybe b)
-> SerialT m (Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
a
a <- SerialT m a -> SerialT (StateT Bool m) a
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
(tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m a
s1
StateT Bool m () -> SerialT (StateT Bool m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (StateT Bool m () -> SerialT (StateT Bool m) ())
-> StateT Bool m () -> SerialT (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False
let final :: SerialT (StateT Bool m) (Maybe a)
final = do
Bool
r <- StateT Bool m Bool -> SerialT (StateT Bool m) Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift StateT Bool m Bool
forall (m :: * -> *) s. Monad m => StateT s m s
get
if Bool
r
then SerialT (StateT Bool m) (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
StreamK.nil
else Maybe a -> SerialT (StateT Bool m) (Maybe a)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
StreamK.fromPure Maybe a
forall a. Maybe a
Nothing
(Int
_i, Maybe (b, b)
b) <-
SerialT (StateT Bool m) (Maybe (b, b))
-> SerialT (StateT Bool m) (Int, Maybe (b, b))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m (Int, a)
Stream.indexed
(SerialT (StateT Bool m) (Maybe (b, b))
-> SerialT (StateT Bool m) (Int, Maybe (b, b)))
-> SerialT (StateT Bool m) (Maybe (b, b))
-> SerialT (StateT Bool m) (Int, Maybe (b, b))
forall a b. (a -> b) -> a -> b
$ ((b, b) -> Maybe (b, b))
-> SerialT (StateT Bool m) (b, b)
-> SerialT (StateT Bool m) (Maybe (b, b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (b, b) -> Maybe (b, b)
forall a. a -> Maybe a
Just (SerialT m (b, b) -> SerialT (StateT Bool m) (b, b)
forall (m :: * -> *) (t :: (* -> *) -> * -> *)
(tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner (Array (b, b) -> SerialT m (b, b)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
Array a -> t m a
Array.toStream Array (b, b)
arr)) SerialT (StateT Bool m) (Maybe (b, b))
-> SerialT (StateT Bool m) (Maybe (b, b))
-> SerialT (StateT Bool m) (Maybe (b, b))
forall a. Semigroup a => a -> a -> a
<> SerialT (StateT Bool m) (Maybe (b, b))
forall a. SerialT (StateT Bool m) (Maybe a)
final
case Maybe (b, b)
b of
Just (b
b1, b
_used) ->
if a
a a -> b -> Bool
`eq` b
b1
then do
StateT Bool m () -> SerialT (StateT Bool m) ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (StateT Bool m () -> SerialT (StateT Bool m) ())
-> StateT Bool m () -> SerialT (StateT Bool m) ()
forall a b. (a -> b) -> a -> b
$ Bool -> StateT Bool m ()
forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True
(Maybe a, Maybe b) -> SerialT (StateT Bool m) (Maybe a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a, b -> Maybe b
forall a. a -> Maybe a
Just b
b1)
else SerialT (StateT Bool m) (Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
StreamK.nil
Maybe (b, b)
Nothing -> (Maybe a, Maybe b) -> SerialT (StateT Bool m) (Maybe a, Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
a, Maybe b
forall a. Maybe a
Nothing)
{-# INLINE hashOuterJoin #-}
hashOuterJoin ::
(a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
hashOuterJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
hashOuterJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = t m (Maybe a, Maybe b)
forall a. HasCallStack => a
undefined
{-# INLINE mergeOuterJoin #-}
mergeOuterJoin ::
(a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
mergeOuterJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
mergeOuterJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = t m (Maybe a, Maybe b)
forall a. HasCallStack => a
undefined
{-# INLINE intersectBy #-}
intersectBy :: (IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
intersectBy :: (a -> a -> Bool) -> t m a -> t m a -> t m a
intersectBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
(m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- SerialT m a -> m [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toListRev (SerialT m a -> m [a]) -> SerialT m a -> m [a]
forall a b. (a -> b) -> a -> b
$ (a -> a -> Bool) -> SerialT m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
(a -> a -> Bool) -> t m a -> t m a
Stream.uniqBy a -> a -> Bool
eq (SerialT m a -> SerialT m a) -> SerialT m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ t m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
StreamK.adapt t m a
s2
t m a -> m (t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (t m a -> m (t m a)) -> t m a -> m (t m a)
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter (\a
x -> (a -> Bool) -> [a] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
List.any (a -> a -> Bool
eq a
x) [a]
xs) t m a
s1
{-# INLINE mergeIntersectBy #-}
mergeIntersectBy ::
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeIntersectBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeIntersectBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = t m a
forall a. HasCallStack => a
undefined
{-# INLINE differenceBy #-}
differenceBy :: (IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
differenceBy :: (a -> a -> Bool) -> t m a -> t m a -> t m a
differenceBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
(m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- SerialT m a -> m [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toList (SerialT m a -> m [a]) -> SerialT m a -> m [a]
forall a b. (a -> b) -> a -> b
$ t m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
StreamK.adapt t m a
s1
([a] -> t m a) -> m [a] -> m (t m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [a] -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
fromList (m [a] -> m (t m a)) -> m [a] -> m (t m a)
forall a b. (a -> b) -> a -> b
$ ([a] -> a -> [a]) -> [a] -> t m a -> m [a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) b a.
(Monad m, IsStream t) =>
(b -> a -> b) -> b -> t m a -> m b
foldl' ((a -> [a] -> [a]) -> [a] -> a -> [a]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq)) [a]
xs t m a
s2
{-# INLINE mergeDifferenceBy #-}
mergeDifferenceBy ::
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = t m a
forall a. HasCallStack => a
undefined
{-# INLINE unionBy #-}
unionBy :: (IsStream t, MonadAsync m, Semigroup (t m a)) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
unionBy :: (a -> a -> Bool) -> t m a -> t m a -> t m a
unionBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
(m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- SerialT m a -> m [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toList (SerialT m a -> m [a]) -> SerialT m a -> m [a]
forall a b. (a -> b) -> a -> b
$ t m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
StreamK.adapt t m a
s2
IORef [a]
ref <- IO (IORef [a]) -> m (IORef [a])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef [a]) -> m (IORef [a]))
-> IO (IORef [a]) -> m (IORef [a])
forall a b. (a -> b) -> a -> b
$ [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef ([a] -> IO (IORef [a])) -> [a] -> IO (IORef [a])
forall a b. (a -> b) -> a -> b
$! (a -> a -> Bool) -> [a] -> [a]
forall a. (a -> a -> Bool) -> [a] -> [a]
List.nubBy a -> a -> Bool
eq [a]
xs
let f :: a -> m a
f a
x = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef [a] -> ([a] -> [a]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [a]
ref ((a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq a
x)
a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
s3 :: t m a
s3 = m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
(m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs1 <- IO [a] -> m [a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [a] -> m [a]) -> IO [a] -> m [a]
forall a b. (a -> b) -> a -> b
$ IORef [a] -> IO [a]
forall a. IORef a -> IO a
readIORef IORef [a]
ref
t m a -> m (t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (t m a -> m (t m a)) -> t m a -> m (t m a)
forall a b. (a -> b) -> a -> b
$ [a] -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
fromList [a]
xs1
t m a -> m (t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (t m a -> m (t m a)) -> t m a -> m (t m a)
forall a b. (a -> b) -> a -> b
$ (a -> m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
Stream.mapM a -> m a
forall (m :: * -> *). MonadIO m => a -> m a
f t m a
s1 t m a -> t m a -> t m a
forall a. Semigroup a => a -> a -> a
<> t m a
s3
{-# INLINE mergeUnionBy #-}
mergeUnionBy ::
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeUnionBy :: (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeUnionBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = t m a
forall a. HasCallStack => a
undefined