{-# OPTIONS_GHC -Wno-deprecations #-}
module Streamly.Internal.Data.Stream.IsStream.Top {-# DEPRECATED "Please use \"Streamly.Data.Stream.*\" instead." #-}
(
sampleFromThen
, sampleIntervalStart
, sampleIntervalEnd
, sampleBurstStart
, sampleBurstEnd
, sortBy
, intersectBy
, intersectBySorted
, differenceBy
, mergeDifferenceBy
, unionBy
, mergeUnionBy
, crossJoin
, joinInner
, joinInnerMap
, joinInnerMerge
, mergeLeftJoin
, joinLeftMap
, mergeOuterJoin
, joinOuterMap
)
where
#include "inline.hs"
import Control.Monad.Catch (MonadCatch)
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (newIORef, readIORef, modifyIORef')
import Data.Kind (Type)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.IsStream.Common (concatM)
import Streamly.Internal.Data.Stream.IsStream.Type
(IsStream(..), adapt, foldl', fromList)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Time.Units (NanoSecond64(..), toRelTime64)
import qualified Data.List as List
import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Parser as Parser
(groupByRollingEither)
import qualified Streamly.Internal.Data.Stream.IsStream.Eliminate as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Generate as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Expand as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Reduce as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Transform as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream as StreamD
import Prelude hiding (Foldable(..), filter, zipWith, concatMap, concat)
{-# INLINE sampleFromThen #-}
sampleFromThen :: (IsStream t, Monad m, Functor (t m)) =>
Int -> Int -> t m a -> t m a
sampleFromThen :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
Int -> Int -> t m a -> t m a
sampleFromThen Int
offset Int
stride =
(t m a -> t m (Int, a))
-> (((Int, a) -> Bool) -> t m (Int, a) -> t m (Int, a))
-> ((Int, a) -> Bool)
-> t m a
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b s.
Functor (t m) =>
(t m a -> t m (s, a))
-> (((s, a) -> b) -> t m (s, a) -> t m (s, a))
-> ((s, a) -> b)
-> t m a
-> t m a
Stream.with t m a -> t m (Int, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m (Int, a)
Stream.indexed ((Int, a) -> Bool) -> t m (Int, a) -> t m (Int, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter
(\(Int
i, a
_) -> Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
offset Bool -> Bool -> Bool
&& (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
stride Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0)
{-# INLINE sampleIntervalEnd #-}
sampleIntervalEnd :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalEnd :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalEnd Double
n = t m (Maybe a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes (t m (Maybe a) -> t m a)
-> (t m a -> t m (Maybe a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Fold m a (Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
Stream.intervalsOf Double
n Fold m a (Maybe a)
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.last
{-# INLINE sampleIntervalStart #-}
sampleIntervalStart :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalStart :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleIntervalStart Double
n = t m (Maybe a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
Stream.catMaybes (t m (Maybe a) -> t m a)
-> (t m a -> t m (Maybe a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Fold m a (Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
Stream.intervalsOf Double
n Fold m a (Maybe a)
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.one
data BurstState t x =
BurstNone
| BurstWait !t !x
| BurstDone !x
| BurstDoneNext !x !t !x
{-# INLINE sampleBurst #-}
sampleBurst :: (IsStream t, MonadAsync m, Functor (t m)) =>
Bool -> Double -> t m a -> t m a
sampleBurst :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Bool -> Double -> t m a -> t m a
sampleBurst Bool
sampleAtEnd Double
gap t m a
xs =
(BurstState RelTime64 a -> Maybe a)
-> t m (BurstState RelTime64 a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
Stream.mapMaybe BurstState RelTime64 a -> Maybe a
forall {t} {a}. BurstState t a -> Maybe a
extract
(t m (BurstState RelTime64 a) -> t m a)
-> t m (BurstState RelTime64 a) -> t m a
forall a b. (a -> b) -> a -> b
$ (BurstState RelTime64 a
-> (RelTime64, Maybe a) -> BurstState RelTime64 a)
-> BurstState RelTime64 a
-> t m (RelTime64, Maybe a)
-> t m (BurstState RelTime64 a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> t m b
Stream.scanl' BurstState RelTime64 a
-> (RelTime64, Maybe a) -> BurstState RelTime64 a
forall {x}.
BurstState RelTime64 x
-> (RelTime64, Maybe x) -> BurstState RelTime64 x
step BurstState RelTime64 a
forall t x. BurstState t x
BurstNone
(t m (RelTime64, Maybe a) -> t m (BurstState RelTime64 a))
-> t m (RelTime64, Maybe a) -> t m (BurstState RelTime64 a)
forall a b. (a -> b) -> a -> b
$ t m (Maybe a) -> t m (RelTime64, Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (RelTime64, a)
Stream.timeIndexed
(t m (Maybe a) -> t m (RelTime64, Maybe a))
-> t m (Maybe a) -> t m (RelTime64, Maybe a)
forall a b. (a -> b) -> a -> b
$ Double -> m (Maybe a) -> t m (Maybe a) -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
Double -> m a -> t m a -> t m a
Stream.interjectSuffix Double
0.01 (Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) ((a -> Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map a -> Maybe a
forall a. a -> Maybe a
Just t m a
xs)
where
gap1 :: RelTime64
gap1 = NanoSecond64 -> RelTime64
forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 (Int64 -> NanoSecond64
NanoSecond64 (Double -> Int64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
gap Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
10Double -> Int -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^(Int
9::Int))))
{-# INLINE step #-}
step :: BurstState RelTime64 x
-> (RelTime64, Maybe x) -> BurstState RelTime64 x
step BurstState RelTime64 x
BurstNone (RelTime64
t1, Just x
x1) = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
step BurstState RelTime64 x
BurstNone (RelTime64, Maybe x)
_ = BurstState RelTime64 x
forall t x. BurstState t x
BurstNone
step (BurstDone x
_) (RelTime64
t1, Just x
x1) = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
step (BurstDone x
_) (RelTime64, Maybe x)
_ = BurstState RelTime64 x
forall t x. BurstState t x
BurstNone
step old :: BurstState RelTime64 x
old@(BurstWait RelTime64
t0 x
x0) (RelTime64
t1, Maybe x
Nothing)
| RelTime64
t1 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = x -> BurstState RelTime64 x
forall t x. x -> BurstState t x
BurstDone x
x0
| Bool
otherwise = BurstState RelTime64 x
old
step (BurstWait RelTime64
t0 x
x0) (RelTime64
t1, Just x
x1)
| RelTime64
t1 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = x -> RelTime64 -> x -> BurstState RelTime64 x
forall t x. x -> t -> x -> BurstState t x
BurstDoneNext x
x0 RelTime64
t1 x
x1
| Bool
sampleAtEnd = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
| Bool
otherwise = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x0
step (BurstDoneNext x
_ RelTime64
t0 x
x0) (RelTime64
t1, Maybe x
Nothing)
| RelTime64
t1 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = x -> BurstState RelTime64 x
forall t x. x -> BurstState t x
BurstDone x
x0
| Bool
otherwise = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t0 x
x0
step (BurstDoneNext x
_ RelTime64
t0 x
x0) (RelTime64
t1, Just x
x1)
| RelTime64
t1 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = x -> RelTime64 -> x -> BurstState RelTime64 x
forall t x. x -> t -> x -> BurstState t x
BurstDoneNext x
x0 RelTime64
t1 x
x1
| Bool
sampleAtEnd = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
| Bool
otherwise = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x0
{-# INLINE extract #-}
extract :: BurstState t a -> Maybe a
extract (BurstDoneNext a
x t
_ a
_) = a -> Maybe a
forall a. a -> Maybe a
Just a
x
extract (BurstDone a
x) = a -> Maybe a
forall a. a -> Maybe a
Just a
x
extract BurstState t a
_ = Maybe a
forall a. Maybe a
Nothing
{-# INLINE sampleBurstEnd #-}
sampleBurstEnd :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstEnd :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstEnd = Bool -> Double -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Bool -> Double -> t m a -> t m a
sampleBurst Bool
True
{-# INLINE sampleBurstStart #-}
sampleBurstStart :: (IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstStart :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m a
sampleBurstStart = Bool -> Double -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Bool -> Double -> t m a -> t m a
sampleBurst Bool
False
{-# INLINE sortBy #-}
sortBy :: MonadCatch m => (a -> a -> Ordering) -> SerialT m a -> SerialT m a
sortBy :: forall (m :: * -> *) a.
MonadCatch m =>
(a -> a -> Ordering) -> SerialT m a -> SerialT m a
sortBy a -> a -> Ordering
cmp =
let p :: Parser a m (Either (SerialT m a) (SerialT m a))
p =
(a -> a -> Bool)
-> Fold m a (SerialT m a)
-> Fold m a (SerialT m a)
-> Parser a m (Either (SerialT m a) (SerialT m a))
forall (m :: * -> *) a b c.
Monad m =>
(a -> a -> Bool)
-> Fold m a b -> Fold m a c -> Parser a m (Either b c)
Parser.groupByRollingEither
(\a
x -> (Ordering -> Ordering -> Bool
forall a. Ord a => a -> a -> Bool
< Ordering
GT) (Ordering -> Bool) -> (a -> Ordering) -> a -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a -> Ordering
cmp a
x)
((StreamK m a -> SerialT m a)
-> Fold m a (StreamK m a) -> Fold m a (SerialT m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap StreamK m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream Fold m a (StreamK m a)
forall (m :: * -> *) a (n :: * -> *).
Monad m =>
Fold m a (StreamK n a)
Fold.toStreamKRev)
((StreamK m a -> SerialT m a)
-> Fold m a (StreamK m a) -> Fold m a (SerialT m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap StreamK m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream Fold m a (StreamK m a)
forall (m :: * -> *) a (n :: * -> *).
Monad m =>
Fold m a (StreamK n a)
Fold.toStreamK)
in (SerialT m a -> SerialT m a -> SerialT m a)
-> (SerialT m a -> SerialT m a)
-> SerialT m (SerialT m a)
-> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
Stream.concatPairsWith ((a -> a -> Ordering) -> SerialT m a -> SerialT m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
Stream.mergeBy a -> a -> Ordering
cmp) SerialT m a -> SerialT m a
forall a. a -> a
id
(SerialT m (SerialT m a) -> SerialT m a)
-> (SerialT m a -> SerialT m (SerialT m a))
-> SerialT m a
-> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SerialT m (Either ParseError (SerialT m a))
-> SerialT m (SerialT m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m, Functor (t m)) =>
t m (Either a b) -> t m b
Stream.rights (SerialT m (Either ParseError (SerialT m a))
-> SerialT m (SerialT m a))
-> (SerialT m a -> SerialT m (Either ParseError (SerialT m a)))
-> SerialT m a
-> SerialT m (SerialT m a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Parser a m (SerialT m a)
-> SerialT m a -> SerialT m (Either ParseError (SerialT m a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Parser a m b -> t m a -> t m (Either ParseError b)
Stream.parseMany ((Either (SerialT m a) (SerialT m a) -> SerialT m a)
-> Parser a m (Either (SerialT m a) (SerialT m a))
-> Parser a m (SerialT m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((SerialT m a -> SerialT m a)
-> (SerialT m a -> SerialT m a)
-> Either (SerialT m a) (SerialT m a)
-> SerialT m a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either SerialT m a -> SerialT m a
forall a. a -> a
id SerialT m a -> SerialT m a
forall a. a -> a
id) Parser a m (Either (SerialT m a) (SerialT m a))
forall {m :: * -> *} {m :: * -> *}.
Parser a m (Either (SerialT m a) (SerialT m a))
p)
{-# INLINE crossJoin #-}
crossJoin :: Monad (t m) => t m a -> t m b -> t m (a, b)
crossJoin :: forall (t :: * -> * -> *) m a b.
Monad (t m) =>
t m a -> t m b -> t m (a, b)
crossJoin t m a
s1 t m b
s2 = do
a
a <- t m a
s1
b
b <- t m b
s2
(a, b) -> t m (a, b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, b
b)
{-# INLINE joinInner #-}
joinInner ::
forall (t :: (Type -> Type) -> Type -> Type) m a b.
(IsStream t, Monad m) =>
(a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
joinInner :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b -> Bool) -> t m a -> t m b -> t m (a, b)
joinInner a -> b -> Bool
eq t m a
s1 t m b
s2 = do
(a -> t m (a, b)) -> t m a -> t m (a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
Stream.concatMap (\a
a ->
(b -> t m (a, b)) -> t m b -> t m (a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
Stream.concatMap (\b
b ->
if a
a a -> b -> Bool
`eq` b
b
then (a, b) -> t m (a, b)
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
Stream.fromPure (a
a, b
b)
else t m (a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil
) t m b
s2
) t m a
s1
toMap :: (Monad m, Ord k) => IsStream.SerialT m (k, v) -> m (Map.Map k v)
toMap :: forall (m :: * -> *) k v.
(Monad m, Ord k) =>
SerialT m (k, v) -> m (Map k v)
toMap = (Map k v -> (k, v) -> Map k v)
-> Map k v -> SerialT m (k, v) -> m (Map k v)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
Stream.foldl' (\Map k v
kv (k
k, v
b) -> k -> v -> Map k v -> Map k v
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k v
b Map k v
kv) Map k v
forall k a. Map k a
Map.empty
{-# INLINE joinInnerMap #-}
joinInnerMap :: (IsStream t, Monad m, Ord k) =>
t m (k, a) -> t m (k, b) -> t m (k, a, b)
joinInnerMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, Monad m, Ord k) =>
t m (k, a) -> t m (k, b) -> t m (k, a, b)
joinInnerMap t m (k, a)
s1 t m (k, b)
s2 =
m (t m (k, a, b)) -> t m (k, a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM (m (t m (k, a, b)) -> t m (k, a, b))
-> m (t m (k, a, b)) -> t m (k, a, b)
forall a b. (a -> b) -> a -> b
$ do
Map k b
km <- SerialT m (k, b) -> m (Map k b)
forall (m :: * -> *) k v.
(Monad m, Ord k) =>
SerialT m (k, v) -> m (Map k v)
toMap (SerialT m (k, b) -> m (Map k b))
-> SerialT m (k, b) -> m (Map k b)
forall a b. (a -> b) -> a -> b
$ t m (k, b) -> SerialT m (k, b)
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2
t m (k, a, b) -> m (t m (k, a, b))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (t m (k, a, b) -> m (t m (k, a, b)))
-> t m (k, a, b) -> m (t m (k, a, b))
forall a b. (a -> b) -> a -> b
$ ((k, a) -> Maybe (k, a, b)) -> t m (k, a) -> t m (k, a, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
Stream.mapMaybe (Map k b -> (k, a) -> Maybe (k, a, b)
forall {a} {c} {b}. Ord a => Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map k b
km) t m (k, a)
s1
where
joinAB :: Map a c -> (a, b) -> Maybe (a, b, c)
joinAB Map a c
kvm (a
k, b
a) =
case a
k a -> Map a c -> Maybe c
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a c
kvm of
Just c
b -> (a, b, c) -> Maybe (a, b, c)
forall a. a -> Maybe a
Just (a
k, b
a, c
b)
Maybe c
Nothing -> Maybe (a, b, c)
forall a. Maybe a
Nothing
{-# INLINE joinInnerMerge #-}
joinInnerMerge :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
joinInnerMerge :: forall a b (t :: * -> * -> *) m.
(a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
joinInnerMerge = (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
forall a. HasCallStack => a
undefined
{-# INLINE joinLeftMap #-}
joinLeftMap :: (IsStream t, Ord k, Monad m) =>
t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b)
joinLeftMap :: forall (t :: (* -> *) -> * -> *) k (m :: * -> *) a b.
(IsStream t, Ord k, Monad m) =>
t m (k, a) -> t m (k, b) -> t m (k, a, Maybe b)
joinLeftMap t m (k, a)
s1 t m (k, b)
s2 =
m (t m (k, a, Maybe b)) -> t m (k, a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM (m (t m (k, a, Maybe b)) -> t m (k, a, Maybe b))
-> m (t m (k, a, Maybe b)) -> t m (k, a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
Map k b
km <- SerialT m (k, b) -> m (Map k b)
forall (m :: * -> *) k v.
(Monad m, Ord k) =>
SerialT m (k, v) -> m (Map k v)
toMap (SerialT m (k, b) -> m (Map k b))
-> SerialT m (k, b) -> m (Map k b)
forall a b. (a -> b) -> a -> b
$ t m (k, b) -> SerialT m (k, b)
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2
t m (k, a, Maybe b) -> m (t m (k, a, Maybe b))
forall (m :: * -> *) a. Monad m => a -> m a
return (t m (k, a, Maybe b) -> m (t m (k, a, Maybe b)))
-> t m (k, a, Maybe b) -> m (t m (k, a, Maybe b))
forall a b. (a -> b) -> a -> b
$ ((k, a) -> (k, a, Maybe b)) -> t m (k, a) -> t m (k, a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (Map k b -> (k, a) -> (k, a, Maybe b)
forall {a} {a} {b}. Ord a => Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map k b
km) t m (k, a)
s1
where
joinAB :: Map a a -> (a, b) -> (a, b, Maybe a)
joinAB Map a a
km (a
k, b
a) =
case a
k a -> Map a a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
b -> (a
k, b
a, a -> Maybe a
forall a. a -> Maybe a
Just a
b)
Maybe a
Nothing -> (a
k, b
a, Maybe a
forall a. Maybe a
Nothing)
{-# INLINE mergeLeftJoin #-}
mergeLeftJoin ::
(a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b)
mergeLeftJoin :: forall a b (t :: * -> * -> *) m.
(a -> b -> Ordering) -> t m a -> t m b -> t m (a, Maybe b)
mergeLeftJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = t m (a, Maybe b)
forall a. HasCallStack => a
undefined
{-# INLINE joinOuterMap #-}
joinOuterMap ::
(IsStream t, Ord k, MonadIO m) =>
t m (k, a) -> t m (k, b) -> t m (k, Maybe a, Maybe b)
joinOuterMap :: forall (t :: (* -> *) -> * -> *) k (m :: * -> *) a b.
(IsStream t, Ord k, MonadIO m) =>
t m (k, a) -> t m (k, b) -> t m (k, Maybe a, Maybe b)
joinOuterMap t m (k, a)
s1 t m (k, b)
s2 =
m (t m (k, Maybe a, Maybe b)) -> t m (k, Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
Stream.concatM (m (t m (k, Maybe a, Maybe b)) -> t m (k, Maybe a, Maybe b))
-> m (t m (k, Maybe a, Maybe b)) -> t m (k, Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ do
Map k a
km1 <- SerialT m (k, a) -> m (Map k a)
forall {a}. SerialT m (k, a) -> m (Map k a)
kvFold (SerialT m (k, a) -> m (Map k a))
-> SerialT m (k, a) -> m (Map k a)
forall a b. (a -> b) -> a -> b
$ t m (k, a) -> SerialT m (k, a)
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, a)
s1
Map k b
km2 <- SerialT m (k, b) -> m (Map k b)
forall {a}. SerialT m (k, a) -> m (Map k a)
kvFold (SerialT m (k, b) -> m (Map k b))
-> SerialT m (k, b) -> m (Map k b)
forall a b. (a -> b) -> a -> b
$ t m (k, b) -> SerialT m (k, b)
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
IsStream.adapt t m (k, b)
s2
let res1 :: t m (k, Maybe a, Maybe b)
res1 = ((k, a) -> (k, Maybe a, Maybe b))
-> t m (k, a) -> t m (k, Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map (Map k b -> (k, a) -> (k, Maybe a, Maybe b)
forall {a} {a} {a}.
Ord a =>
Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map k b
km2) (t m (k, a) -> t m (k, Maybe a, Maybe b))
-> t m (k, a) -> t m (k, Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ [(k, a)] -> t m (k, a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
Stream.fromList ([(k, a)] -> t m (k, a)) -> [(k, a)] -> t m (k, a)
forall a b. (a -> b) -> a -> b
$ Map k a -> [(k, a)]
forall k a. Map k a -> [(k, a)]
Map.toList Map k a
km1
where
joinAB :: Map a a -> (a, a) -> (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
a) =
case a
k a -> Map a a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
b -> (a
k, a -> Maybe a
forall a. a -> Maybe a
Just a
a, a -> Maybe a
forall a. a -> Maybe a
Just a
b)
Maybe a
Nothing -> (a
k, a -> Maybe a
forall a. a -> Maybe a
Just a
a, Maybe a
forall a. Maybe a
Nothing)
let res2 :: t m (k, Maybe a, Maybe b)
res2 = ((k, b) -> Maybe (k, Maybe a, Maybe b))
-> t m (k, b) -> t m (k, Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
Stream.mapMaybe (Map k a -> (k, b) -> Maybe (k, Maybe a, Maybe b)
forall {a} {a} {a} {a}.
Ord a =>
Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map k a
km1) (t m (k, b) -> t m (k, Maybe a, Maybe b))
-> t m (k, b) -> t m (k, Maybe a, Maybe b)
forall a b. (a -> b) -> a -> b
$ [(k, b)] -> t m (k, b)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
Stream.fromList ([(k, b)] -> t m (k, b)) -> [(k, b)] -> t m (k, b)
forall a b. (a -> b) -> a -> b
$ Map k b -> [(k, b)]
forall k a. Map k a -> [(k, a)]
Map.toList Map k b
km2
where
joinAB :: Map a a -> (a, a) -> Maybe (a, Maybe a, Maybe a)
joinAB Map a a
km (a
k, a
b) =
case a
k a -> Map a a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
`Map.lookup` Map a a
km of
Just a
_ -> Maybe (a, Maybe a, Maybe a)
forall a. Maybe a
Nothing
Maybe a
Nothing -> (a, Maybe a, Maybe a) -> Maybe (a, Maybe a, Maybe a)
forall a. a -> Maybe a
Just (a
k, Maybe a
forall a. Maybe a
Nothing, a -> Maybe a
forall a. a -> Maybe a
Just a
b)
t m (k, Maybe a, Maybe b) -> m (t m (k, Maybe a, Maybe b))
forall (m :: * -> *) a. Monad m => a -> m a
return (t m (k, Maybe a, Maybe b) -> m (t m (k, Maybe a, Maybe b)))
-> t m (k, Maybe a, Maybe b) -> m (t m (k, Maybe a, Maybe b))
forall a b. (a -> b) -> a -> b
$ t m (k, Maybe a, Maybe b)
-> t m (k, Maybe a, Maybe b) -> t m (k, Maybe a, Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a -> t m a
Stream.serial t m (k, Maybe a, Maybe b)
res1 t m (k, Maybe a, Maybe b)
forall {a}. t m (k, Maybe a, Maybe b)
res2
where
kvFold :: SerialT m (k, a) -> m (Map k a)
kvFold = (Map k a -> (k, a) -> Map k a)
-> Map k a -> SerialT m (k, a) -> m (Map k a)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> SerialT m a -> m b
Stream.foldl' (\Map k a
kv (k
k, a
b) -> k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
k a
b Map k a
kv) Map k a
forall k a. Map k a
Map.empty
{-# INLINE mergeOuterJoin #-}
mergeOuterJoin ::
(a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
mergeOuterJoin :: forall a b (t :: * -> * -> *) m.
(a -> b -> Ordering) -> t m a -> t m b -> t m (Maybe a, Maybe b)
mergeOuterJoin a -> b -> Ordering
_eq t m a
_s1 t m b
_s2 = t m (Maybe a, Maybe b)
forall a. HasCallStack => a
undefined
{-# INLINE intersectBy #-}
intersectBy :: (IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
intersectBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
intersectBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
(m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- SerialT m a -> m [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toListRev (SerialT m a -> m [a]) -> SerialT m a -> m [a]
forall a b. (a -> b) -> a -> b
$ (a -> a -> Bool) -> SerialT m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
(a -> a -> Bool) -> t m a -> t m a
Stream.uniqBy a -> a -> Bool
eq (SerialT m a -> SerialT m a) -> SerialT m a -> SerialT m a
forall a b. (a -> b) -> a -> b
$ t m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s2
t m a -> m (t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (t m a -> m (t m a)) -> t m a -> m (t m a)
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
Stream.filter (\a
x -> (a -> Bool) -> [a] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
List.any (a -> a -> Bool
eq a
x) [a]
xs) t m a
s1
{-# INLINE intersectBySorted #-}
intersectBySorted :: (IsStream t, Monad m) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
intersectBySorted :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
intersectBySorted a -> a -> Ordering
eq t m a
s1 =
Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
IsStream.fromStreamD
(Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
StreamD.intersectBySorted a -> a -> Ordering
eq (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD t m a
s1)
(Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD
{-# INLINE differenceBy #-}
differenceBy :: (IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
differenceBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
differenceBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
(m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- SerialT m a -> m [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toList (SerialT m a -> m [a]) -> SerialT m a -> m [a]
forall a b. (a -> b) -> a -> b
$ t m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s1
([a] -> t m a) -> m [a] -> m (t m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [a] -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
fromList (m [a] -> m (t m a)) -> m [a] -> m (t m a)
forall a b. (a -> b) -> a -> b
$ ([a] -> a -> [a]) -> [a] -> t m a -> m [a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> m b
foldl' ((a -> [a] -> [a]) -> [a] -> a -> [a]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq)) [a]
xs t m a
s2
{-# INLINE mergeDifferenceBy #-}
mergeDifferenceBy ::
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy :: forall a (t :: * -> * -> *) m.
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = t m a
forall a. HasCallStack => a
undefined
{-# INLINE unionBy #-}
unionBy :: (IsStream t, MonadAsync m, Semigroup (t m a)) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
unionBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Semigroup (t m a)) =>
(a -> a -> Bool) -> t m a -> t m a -> t m a
unionBy a -> a -> Bool
eq t m a
s1 t m a
s2 =
m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
(m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- SerialT m a -> m [a]
forall (m :: * -> *) a. Monad m => SerialT m a -> m [a]
Stream.toList (SerialT m a -> m [a]) -> SerialT m a -> m [a]
forall a b. (a -> b) -> a -> b
$ t m a -> SerialT m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m a
s2
IORef [a]
ref <- IO (IORef [a]) -> m (IORef [a])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef [a]) -> m (IORef [a]))
-> IO (IORef [a]) -> m (IORef [a])
forall a b. (a -> b) -> a -> b
$ [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef ([a] -> IO (IORef [a])) -> [a] -> IO (IORef [a])
forall a b. (a -> b) -> a -> b
$! (a -> a -> Bool) -> [a] -> [a]
forall a. (a -> a -> Bool) -> [a] -> [a]
List.nubBy a -> a -> Bool
eq [a]
xs
let f :: a -> m a
f a
x = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef [a] -> ([a] -> [a]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [a]
ref ((a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq a
x)
a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
s3 :: t m a
s3 = m (t m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM
(m (t m a) -> t m a) -> m (t m a) -> t m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs1 <- IO [a] -> m [a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [a] -> m [a]) -> IO [a] -> m [a]
forall a b. (a -> b) -> a -> b
$ IORef [a] -> IO [a]
forall a. IORef a -> IO a
readIORef IORef [a]
ref
t m a -> m (t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (t m a -> m (t m a)) -> t m a -> m (t m a)
forall a b. (a -> b) -> a -> b
$ [a] -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
[a] -> t m a
fromList [a]
xs1
t m a -> m (t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (t m a -> m (t m a)) -> t m a -> m (t m a)
forall a b. (a -> b) -> a -> b
$ (a -> m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
Stream.mapM a -> m a
forall {m :: * -> *}. MonadIO m => a -> m a
f t m a
s1 t m a -> t m a -> t m a
forall a. Semigroup a => a -> a -> a
<> t m a
s3
{-# INLINE mergeUnionBy #-}
mergeUnionBy ::
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeUnionBy :: forall a (t :: * -> * -> *) m.
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeUnionBy a -> a -> Ordering
_eq t m a
_s1 t m a
_s2 = t m a
forall a. HasCallStack => a
undefined