module Streamly.Internal.Data.Stream.IsStream.Top
(
sampleFromThen
, sampleIntervalStart
, sampleIntervalEnd
, sampleBurstStart
, sampleBurstEnd
, sortBy
, intersectBy
, intersectBySorted
, differenceBy
, mergeDifferenceBy
, unionBy
, mergeUnionBy
, crossJoin
, joinInner
, joinInnerMap
, joinInnerMerge
, joinLeft
, mergeLeftJoin
, joinLeftMap
, joinOuter
, mergeOuterJoin
, joinOuterMap
)
where
#include "inline.hs"
import Control.Monad.Catch (MonadCatch)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.State.Strict (get, put)
import Data.IORef (newIORef, readIORef, modifyIORef')
import Data.Kind (Type)
#if !(MIN_VERSION_base(4,11,0))
import Data.Semigroup (Semigroup(..))
#endif
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.IsStream.Common (concatM)
import Streamly.Internal.Data.Stream.IsStream.Type
(IsStream(..), adapt, foldl', fromList)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Time.Units (NanoSecond64(..), toRelTime64)
import qualified Data.List as List
import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Array as Array
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Parser as Parser
import qualified Streamly.Internal.Data.Stream.IsStream.Lift as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Eliminate as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Generate as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Expand as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Reduce as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Transform as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream.StreamD as StreamD
import Prelude hiding (filter, zipWith, concatMap, concat)
{-# INLINE sampleFromThen #-}
sampleFromThen :: (IsStream t, Monad m, Functor (t m)) =>
Int -> Int -> t m a -> t m a
sampleFromThen :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
Int -> Int -> t m a -> t m a
sampleFromThen Int
offset Int
stride =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b s.
Functor (t m) =>
(t m a -> t m (s, a))
-> (((s, a) -> b) -> t m (s, a) -> t m (s, a))
-> ((s, a) -> b)
-> t m a
-> t m a
Stream.with forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m (Int, a)
Stream.indexed forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter
(\(Int
i, a
_) -> Int
i forall a. Ord a => a -> a -> Bool
>= Int
offset Bool -> Bool -> Bool
&& (Int
i forall a. Num a => a -> a -> a
- Int
offset) forall a. Integral a => a -> a -> a
`mod` Int
stride forall a. Eq a => a -> a -> Bool
== Int
0)
{-# INLINE sampleIntervalEnd #-}
sampleIntervalEnd :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalEnd :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalEnd Double
n = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
Stream.intervalsOf Double
n forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.last
{-# INLINE sampleIntervalStart #-}
sampleIntervalStart :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalStart :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalStart Double
n = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
Stream.intervalsOf Double
n forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.head
{-# INLINE sampleBurstEnd #-}
sampleBurstEnd :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstEnd :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstEnd Double
gap =
let f :: (RelTime64, b) -> (RelTime64, b) -> Bool
f (RelTime64
t1, b
_) (RelTime64
t2, b
_) =
RelTime64
t2 forall a. Num a => a -> a -> a
- RelTime64
t1 forall a. Ord a => a -> a -> Bool
>= forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 (Int64 -> NanoSecond64
NanoSecond64 (forall a b. (RealFrac a, Integral b) => a -> b
round (Double
gap forall a. Num a => a -> a -> a
* Double
10forall a b. (Num a, Integral b) => a -> b -> a
^(Int
9::Int))))
in forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map forall a b. (a, b) -> b
snd
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
Stream.groupsByRolling forall {b} {b}. (RelTime64, b) -> (RelTime64, b) -> Bool
f forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.last
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (RelTime64, a)
Stream.timeIndexed
{-# INLINE sampleBurstStart #-}
sampleBurstStart :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstStart :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstStart Double
gap =
let f :: (RelTime64, b) -> (RelTime64, b) -> Bool
f (RelTime64
t1, b
_) (RelTime64
t2, b
_) =
RelTime64
t2 forall a. Num a => a -> a -> a
- RelTime64
t1 forall a. Ord a => a -> a -> Bool
>= forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 (Int64 -> NanoSecond64
NanoSecond64 (forall a b. (RealFrac a, Integral b) => a -> b
round (Double
gap forall a. Num a => a -> a -> a
* Double
10forall a b. (Num a, Integral b) => a -> b -> a
^(Int
9::Int))))
in forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map forall a b. (a, b) -> b
snd
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
Stream.groupsByRolling forall {b} {b}. (RelTime64, b) -> (RelTime64, b) -> Bool
f forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.head
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (RelTime64, a)
Stream.timeIndexed
{-# INLINE sortBy #-}
sortBy :: MonadCatch m => (a -> a -> Ordering) -> SerialT m a -> SerialT m a
sortBy :: forall (m :: * -> *) a.
MonadCatch m =>
(a -> a -> Ordering) -> SerialT m a -> SerialT m a
sortBy a -> a -> Ordering
cmp =
let p :: Parser m a (Either (SerialT n a) (SerialT n a))
p =
forall (m :: * -> *) a b c.
MonadCatch m =>
(a -> a -> Bool)
-> Fold m a b -> Fold m a c -> Parser m a (Either b c)
Parser.groupByRollingEither
(\a
x -> (forall a. Ord a => a -> a -> Bool
< Ordering
GT) forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a -> Ordering
cmp a
x)
forall (m :: * -> *) a (n :: * -> *).
Monad m =>
Fold m a (SerialT n a)
Fold.toStreamRev
forall (m :: * -> *) a (n :: * -> *).
Monad m =>
Fold m a (SerialT n a)
Fold.toStream
in forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
Stream.concatPairsWith (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
Stream.mergeBy a -> a -> Ordering
cmp) forall a. a -> a
id
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadThrow m) =>
Parser m a b -> t m a -> t m b
Stream.parseMany (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. a -> a
id forall a. a -> a
id) forall {n :: * -> *} {n :: * -> *}.
Parser m a (Either (SerialT n a) (SerialT n a))
p)
{-# INLINE crossJoin #-}
crossJoin :: Monad (t m) => t m a -> t m b -> t m (a, b)
crossJoin :: forall (t :: * -> * -> *) m a b.
Monad (t m) =>
t m a -> t m b -> t m (a, b)
crossJoin t m a
s1 t m b
s2 = do
a
a <- t m a
s1
b
b <- t m b
s2
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, b
b)
{-# INLINE joinInner #-}
joinInner ::
forall (t :: (Type -> Type) -> Type -> Type) m a b.
(IsStream t, Monad m) =>
(a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
joinInner :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
joinInner a -> b -> Bool
eq t m a
s1 t m b
s2 = do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
Stream.concatMap (\a
a ->
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
Stream.concatMap (\b
b ->
if a
a a -> b -> Bool
`eq` b
b
then forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
Stream.fromPure (a
a, b
b)
else forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
) t m b
s2
) t m a
s1
toMap :: (Monad m, Ord k) => IsStream.SerialT m (k, v) -> m (Map.Map k v)
toMap :: forall (m :: * -> *) k v.
(Monad m, Ord k) =>
SerialT m (k, v) -> m (Map k v)
toMap = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
Stream.foldl' (\Map k v
kv (k
k, v
b) -> forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k v
b Map k v
kv) forall k a. Map k a
Map.empty
{-# INLINE joinInnerMap #-}
joinInnerMap :: (IsStream t, Monad m, Ord k) =>
t m (k, a) -> t m (k, b) -> t m (k, a, b)
joinInnerMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, Monad m, Ord k) =>
t m (k, a) -> t m (k, b) -> t m (k, a, b)
joinInnerMap t m (k, a)
s1 t m (k, b)
s2 =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM forall a b. (a -> b) -> a -> b
$ do
Map k b
km <- forall (m :: * -> *) k v.
(Monad m, Ord k) =>
SerialT m (k, v) -> m (Map k v)
toMap forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
Stream.mapMaybe (forall {a} {c} {b}. Ord a => Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map k b
km) t m (k, a)
s1
where
joinAB :: Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map a c
kvm (a
k, b
a) =
case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a c
kvm of
Just c
b -> forall a. a -> Maybe a
Just (a
k, b
a, c
b)
Maybe c
Nothing -> forall a. Maybe a
Nothing
{-# INLINE joinInnerMerge #-}
joinInnerMerge :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
joinInnerMerge :: forall a b (t :: * -> * -> *) m.
(a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
joinInnerMerge = forall a. HasCallStack => a
undefined
{-# INLINE joinLeft #-}
joinLeft :: Monad m =>
(a -> b -> Bool) -> SerialT m a -> SerialT m b -> SerialT m (a, Maybe b)
joinLeft :: forall (m :: * -> *) a b.
Monad m =>
(a -> b -> Bool)
-> SerialT m a -> SerialT m b -> SerialT m (a, Maybe b)
joinLeft a -> b -> Bool
eq SerialT m a
s1 SerialT m b
s2 = forall (m :: * -> *) s a.
Monad m =>
m s -> SerialT (StateT s m) a -> SerialT m a
Stream.evalStateT (forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) forall a b. (a -> b) -> a -> b
$ do
a
a <- forall (m :: * -> *) (t :: (* -> *) -> * -> *)
(tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m a
s1
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False
let final :: SerialT (StateT Bool m) (Maybe a)
final = do
Bool
r <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall (m :: * -> *) s. Monad m => StateT s m s
get
if Bool
r
then forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
else forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
Stream.fromPure forall a. Maybe a
Nothing
Maybe b
b <- forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. a -> Maybe a
Just (forall (m :: * -> *) (t :: (* -> *) -> * -> *)
(tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m b
s2) forall a. Semigroup a => a -> a -> a
<> forall {a}. SerialT (StateT Bool m) (Maybe a)
final
case Maybe b
b of
Just b
b1 ->
if a
a a -> b -> Bool
`eq` b
b1
then do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. a -> Maybe a
Just b
b1)
else forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
Maybe b
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, forall a. Maybe a
Nothing)
{-# INLINE joinLeftMap #-}
joinLeftMap :: (IsStream t, Ord k, Monad m) =>
t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b)
joinLeftMap :: forall (t :: (* -> *) -> * -> *) k (m :: * -> *) a b.
(IsStream t, Ord k, Monad m) =>
t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b)
joinLeftMap t m (k, a)
s1 t m (k, b)
s2 =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM forall a b. (a -> b) -> a -> b
$ do
Map k b
km <- forall (m :: * -> *) k v.
(Monad m, Ord k) =>
SerialT m (k, v) -> m (Map k v)
toMap forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (forall {a} {a} {b}. Ord a => Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map k b
km) t m (k, a)
s1
where
joinAB :: Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map a a
km (a
k, b
a) =
case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
b -> (a
k, b
a, forall a. a -> Maybe a
Just a
b)
Maybe a
Nothing -> (a
k, b
a, forall a. Maybe a
Nothing)
{-# INLINE mergeLeftJoin #-}
mergeLeftJoin ::
(a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b)
mergeLeftJoin :: forall a b (t :: * -> * -> *) m.
(a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b)
mergeLeftJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = forall a. HasCallStack => a
undefined
{-# INLINE joinOuter #-}
joinOuter :: MonadIO m =>
(a -> b -> Bool)
-> SerialT m a
-> SerialT m b
-> SerialT m (Maybe a, Maybe b)
joinOuter :: forall (m :: * -> *) a b.
MonadIO m =>
(a -> b -> Bool)
-> SerialT m a -> SerialT m b -> SerialT m (Maybe a, Maybe b)
joinOuter a -> b -> Bool
eq SerialT m a
s1 SerialT m b
s =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM forall a b. (a -> b) -> a -> b
$ do
Array (b, Bool)
arr <- forall (m :: * -> *) a. MonadIO m => SerialT m a -> m (Array a)
Array.fromStream forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (,Bool
False) SerialT m b
s
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall {b}. Array (b, b) -> SerialT m (Maybe a, Maybe b)
go Array (b, Bool)
arr forall a. Semigroup a => a -> a -> a
<> forall {a} {a}. Array (a, Bool) -> SerialT m (Maybe a, Maybe a)
leftOver Array (b, Bool)
arr
where
leftOver :: Array (a, Bool) -> SerialT m (Maybe a, Maybe a)
leftOver =
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\(a
x, Bool
_) -> (forall a. Maybe a
Nothing, forall a. a -> Maybe a
Just a
x))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter (Bool -> Bool
not forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> b
snd)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => Array a -> SerialT m a
Array.toStream
go :: Array (b, b) -> SerialT m (Maybe a, Maybe b)
go Array (b, b)
arr = forall (m :: * -> *) s a.
Monad m =>
m s -> SerialT (StateT s m) a -> SerialT m a
Stream.evalStateT (forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False) forall a b. (a -> b) -> a -> b
$ do
a
a <- forall (m :: * -> *) (t :: (* -> *) -> * -> *)
(tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m a
s1
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
False
let final :: SerialT (StateT Bool m) (Maybe a)
final = do
Bool
r <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall (m :: * -> *) s. Monad m => StateT s m s
get
if Bool
r
then forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
else forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
Stream.fromPure forall a. Maybe a
Nothing
(Int
_i, Maybe (b, b)
b) <-
let stream :: SerialT m (b, b)
stream = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
SerialT m a -> t m a
IsStream.fromSerial forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => Array a -> SerialT m a
Array.toStream Array (b, b)
arr
in forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m (Int, a)
Stream.indexed forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. a -> Maybe a
Just (forall (m :: * -> *) (t :: (* -> *) -> * -> *)
(tr :: (* -> *) -> * -> *) a.
(Monad m, IsStream t, MonadTrans tr, Monad (tr m)) =>
t m a -> t (tr m) a
Stream.liftInner SerialT m (b, b)
stream) forall a. Semigroup a => a -> a -> a
<> forall {a}. SerialT (StateT Bool m) (Maybe a)
final
case Maybe (b, b)
b of
Just (b
b1, b
_used) ->
if a
a a -> b -> Bool
`eq` b
b1
then do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s. Monad m => s -> StateT s m ()
put Bool
True
forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a, forall a. a -> Maybe a
Just b
b1)
else forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
Maybe (b, b)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
a, forall a. Maybe a
Nothing)
{-# INLINE joinOuterMap #-}
joinOuterMap ::
(IsStream t, Ord k, MonadIO m) =>
t m (k, a) -> t m (k, b) -> t m (k, Maybe a, Maybe b)
joinOuterMap :: forall (t :: (* -> *) -> * -> *) k (m :: * -> *) a b.
(IsStream t, Ord k, MonadIO m) =>
t m (k, a) -> t m (k, b) -> t m (k, Maybe a, Maybe b)
joinOuterMap t m (k, a)
s1 t m (k, b)
s2 =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM forall a b. (a -> b) -> a -> b
$ do
Map k a
km1 <- forall {a}. SerialT m (k, a) -> m (Map k a)
kvFold forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, a)
s1
Map k b
km2 <- forall {a}. SerialT m (k, a) -> m (Map k a)
kvFold forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2
let res1 :: t m (k, Maybe a, Maybe b)
res1 = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (forall {a} {a} {a}.
Ord a =>
Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map k b
km2) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
Stream.fromList forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
Map.toList Map k a
km1
where
joinAB :: Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
a) =
case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
b -> (a
k, forall a. a -> Maybe a
Just a
a, forall a. a -> Maybe a
Just a
b)
Maybe a
Nothing -> (a
k, forall a. a -> Maybe a
Just a
a, forall a. Maybe a
Nothing)
let res2 :: t m (k, Maybe a, Maybe b)
res2 = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
Stream.mapMaybe (forall {a} {a} {a} {a}.
Ord a =>
Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map k a
km1) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
Stream.fromList forall a b. (a -> b) -> a -> b
$ forall k a. Map k a -> [(k, a)]
Map.toList Map k b
km2
where
joinAB :: Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
b) =
case a
k forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
_ -> forall a. Maybe a
Nothing
Maybe a
Nothing -> forall a. a -> Maybe a
Just (a
k, forall a. Maybe a
Nothing, forall a. a -> Maybe a
Just a
b)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
Stream.serial t m (k, Maybe a, Maybe b)
res1 forall {a}. t m (k, Maybe a, Maybe b)
res2
where
kvFold :: SerialT m (k, a) -> m (Map k a)
kvFold = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
Stream.foldl' (\Map k a
kv (k
k, a
b) -> forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
b Map k a
kv) forall k a. Map k a
Map.empty
{-# INLINE mergeOuterJoin #-}
mergeOuterJoin ::
(a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
mergeOuterJoin :: forall a b (t :: * -> * -> *) m.
(a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
mergeOuterJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = forall a. HasCallStack => a
undefined
{-# INLINE intersectBy #-}
intersectBy :: (IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
intersectBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
intersectBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toListRev forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
(a -> a -> Bool) -> t m a -> t m a
Stream.uniqBy a -> a -> Bool
eq forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s2
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter (\a
x -> forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
List.any (a -> a -> Bool
eq a
x) [a]
xs) t m a
s1
{-# INLINE intersectBySorted #-}
intersectBySorted :: (IsStream t, Monad m) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
intersectBySorted :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
intersectBySorted a -> a -> Ordering
eq t m a
s1 =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
IsStream.fromStreamD
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
StreamD.intersectBySorted a -> a -> Ordering
eq (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD t m a
s1)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD
{-# INLINE differenceBy #-}
differenceBy :: (IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
differenceBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
differenceBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toList forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s1
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
fromList forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> m b
foldl' (forall a b c. (a -> b -> c) -> b -> a -> c
flip (forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq)) [a]
xs t m a
s2
{-# INLINE mergeDifferenceBy #-}
mergeDifferenceBy ::
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy :: forall a (t :: * -> * -> *) m.
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = forall a. HasCallStack => a
undefined
{-# INLINE unionBy #-}
unionBy :: (IsStream t, MonadAsync m, Semigroup (t m a)) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
unionBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Semigroup (t m a)) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
unionBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toList forall a b. (a -> b) -> a -> b
$ forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s2
IORef [a]
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef forall a b. (a -> b) -> a -> b
$! forall a. (a -> a -> Bool) -> [a] -> [a]
List.nubBy a -> a -> Bool
eq [a]
xs
let f :: a -> m a
f a
x = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [a]
ref (forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq a
x)
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
s3 :: t m a
s3 = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
forall a b. (a -> b) -> a -> b
$ do
[a]
xs1 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef [a]
ref
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
fromList [a]
xs1
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
Stream.mapM forall {m :: * -> *}. MonadIO m => a -> m a
f t m a
s1 forall a. Semigroup a => a -> a -> a
<> t m a
s3
{-# INLINE mergeUnionBy #-}
mergeUnionBy ::
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeUnionBy :: forall a (t :: * -> * -> *) m.
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeUnionBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = forall a. HasCallStack => a
undefined