{-# LANGUAGE CPP #-}
module Streamly.Internal.Data.Stream.StreamD.Top
(
strideFromThen
, filterInStreamGenericBy
, deleteInStreamGenericBy
, unionWithStreamGenericBy
, filterInStreamAscBy
, deleteInStreamAscBy
, unionWithStreamAscBy
, joinInnerGeneric
, joinInnerAscBy
, joinLeftAscBy
, joinOuterAscBy
)
where
#include "inline.hs"
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (newIORef, readIORef, modifyIORef')
import Streamly.Internal.Data.Fold.Type (Fold)
import Streamly.Internal.Data.Stream.Common ()
import Streamly.Internal.Data.Stream.StreamD.Type (Stream, cross)
import qualified Data.List as List
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Stream.StreamD.Type as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Nesting as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Transform as Stream
import Prelude hiding (filter, zipWith, concatMap, concat)
#include "DocTestDataStream.hs"
{-# INLINE strideFromThen #-}
strideFromThen :: Monad m => Int -> Int -> Stream m a -> Stream m a
strideFromThen :: Int -> Int -> Stream m a -> Stream m a
strideFromThen Int
offset Int
stride =
(Stream m a -> Stream m (Int, a))
-> (((Int, a) -> Bool) -> Stream m (Int, a) -> Stream m (Int, a))
-> ((Int, a) -> Bool)
-> Stream m a
-> Stream m a
forall (m :: * -> *) a s b.
Monad m =>
(Stream m a -> Stream m (s, a))
-> (((s, a) -> b) -> Stream m (s, a) -> Stream m (s, a))
-> ((s, a) -> b)
-> Stream m a
-> Stream m a
Stream.with Stream m a -> Stream m (Int, a)
forall (m :: * -> *) a. Monad m => Stream m a -> Stream m (Int, a)
Stream.indexed ((Int, a) -> Bool) -> Stream m (Int, a) -> Stream m (Int, a)
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.filter
(\(Int
i, a
_) -> Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
offset Bool -> Bool -> Bool
&& (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset) Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
stride Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0)
{-# INLINE joinInnerGeneric #-}
joinInnerGeneric :: Monad m =>
(a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, b)
joinInnerGeneric :: (a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, b)
joinInnerGeneric a -> b -> Bool
eq Stream m a
s1 Stream m b
s2 = ((a, b) -> Bool) -> Stream m (a, b) -> Stream m (a, b)
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.filter (\(a
a, b
b) -> a
a a -> b -> Bool
`eq` b
b) (Stream m (a, b) -> Stream m (a, b))
-> Stream m (a, b) -> Stream m (a, b)
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m b -> Stream m (a, b)
forall (m :: * -> *) a b.
Monad m =>
Stream m a -> Stream m b -> Stream m (a, b)
cross Stream m a
s1 Stream m b
s2
{-# INLINE joinInnerAscBy #-}
joinInnerAscBy ::
(a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b)
joinInnerAscBy :: (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b)
joinInnerAscBy = (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b)
forall a. HasCallStack => a
undefined
{-# INLINE joinLeftAscBy #-}
joinLeftAscBy ::
(a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, Maybe b)
joinLeftAscBy :: (a -> b -> Ordering)
-> Stream m a -> Stream m b -> Stream m (a, Maybe b)
joinLeftAscBy a -> b -> Ordering
_eq Stream m a
_s1 Stream m b
_s2 = Stream m (a, Maybe b)
forall a. HasCallStack => a
undefined
{-# INLINE joinOuterAscBy #-}
joinOuterAscBy ::
(a -> b -> Ordering)
-> Stream m a
-> Stream m b
-> Stream m (Maybe a, Maybe b)
joinOuterAscBy :: (a -> b -> Ordering)
-> Stream m a -> Stream m b -> Stream m (Maybe a, Maybe b)
joinOuterAscBy a -> b -> Ordering
_eq Stream m a
_s1 Stream m b
_s2 = Stream m (Maybe a, Maybe b)
forall a. HasCallStack => a
undefined
{-# INLINE filterStreamWith #-}
filterStreamWith :: Monad m =>
Fold m a (f a)
-> (a -> f a -> Bool)
-> Stream m a
-> Stream m a
-> Stream m a
filterStreamWith :: Fold m a (f a)
-> (a -> f a -> Bool) -> Stream m a -> Stream m a -> Stream m a
filterStreamWith Fold m a (f a)
fld a -> f a -> Bool
member Stream m a
s1 Stream m a
s2 =
m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
(m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
f a
xs <- Fold m a (f a) -> Stream m a -> m (f a)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m a (f a)
fld Stream m a
s1
Stream m a -> m (Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a -> m (Stream m a)) -> Stream m a -> m (Stream m a)
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.filter (a -> f a -> Bool
`member` f a
xs) Stream m a
s2
{-# INLINE filterInStreamGenericBy #-}
filterInStreamGenericBy :: Monad m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
filterInStreamGenericBy :: (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
filterInStreamGenericBy a -> a -> Bool
eq =
Fold m a [a]
-> (a -> [a] -> Bool) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a (f :: * -> *).
Monad m =>
Fold m a (f a)
-> (a -> f a -> Bool) -> Stream m a -> Stream m a -> Stream m a
filterStreamWith
(Fold m a (Maybe a) -> Fold m a [a] -> Fold m a [a]
forall (m :: * -> *) a b c.
Monad m =>
Fold m a (Maybe b) -> Fold m b c -> Fold m a c
Fold.scanMaybe ((a -> a -> Bool) -> Fold m a (Maybe a)
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> Fold m a (Maybe a)
Fold.uniqBy a -> a -> Bool
eq) Fold m a [a]
forall (m :: * -> *) a. Monad m => Fold m a [a]
Fold.toListRev)
((a -> Bool) -> [a] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
List.any ((a -> Bool) -> [a] -> Bool)
-> (a -> a -> Bool) -> a -> [a] -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a -> Bool
eq)
{-# INLINE filterInStreamAscBy #-}
filterInStreamAscBy :: Monad m =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
filterInStreamAscBy :: (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
filterInStreamAscBy a -> a -> Ordering
eq Stream m a
s1 Stream m a
s2 = (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
Stream.intersectBySorted a -> a -> Ordering
eq Stream m a
s2 Stream m a
s1
{-# INLINE deleteInStreamGenericBy #-}
deleteInStreamGenericBy :: Monad m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
deleteInStreamGenericBy :: (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
deleteInStreamGenericBy a -> a -> Bool
eq Stream m a
s1 Stream m a
s2 =
m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
(m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- Fold m a [a] -> Stream m a -> m [a]
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m a [a]
forall (m :: * -> *) a. Monad m => Fold m a [a]
Fold.toList Stream m a
s2
let f :: Fold m a [a]
f = ([a] -> a -> [a]) -> [a] -> Fold m a [a]
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
Fold.foldl' ((a -> [a] -> [a]) -> [a] -> a -> [a]
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq)) [a]
xs
([a] -> Stream m a) -> m [a] -> m (Stream m a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [a] -> Stream m a
forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList (m [a] -> m (Stream m a)) -> m [a] -> m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Fold m a [a] -> Stream m a -> m [a]
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m a [a]
f Stream m a
s1
{-# INLINE deleteInStreamAscBy #-}
deleteInStreamAscBy ::
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
deleteInStreamAscBy :: (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
deleteInStreamAscBy a -> a -> Ordering
_eq Stream m a
_s1 Stream m a
_s2 = Stream m a
forall a. HasCallStack => a
undefined
{-# INLINE unionWithStreamGenericBy #-}
unionWithStreamGenericBy :: MonadIO m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
unionWithStreamGenericBy :: (a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
unionWithStreamGenericBy a -> a -> Bool
eq Stream m a
s1 Stream m a
s2 =
m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
(m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- Fold m a [a] -> Stream m a -> m [a]
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m a [a]
forall (m :: * -> *) a. Monad m => Fold m a [a]
Fold.toList Stream m a
s1
IORef [a]
ref <- IO (IORef [a]) -> m (IORef [a])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef [a]) -> m (IORef [a]))
-> IO (IORef [a]) -> m (IORef [a])
forall a b. (a -> b) -> a -> b
$ [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef ([a] -> IO (IORef [a])) -> [a] -> IO (IORef [a])
forall a b. (a -> b) -> a -> b
$! (a -> a -> Bool) -> [a] -> [a]
forall a. (a -> a -> Bool) -> [a] -> [a]
List.nubBy a -> a -> Bool
eq [a]
xs
let f :: a -> m a
f a
x = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef [a] -> ([a] -> [a]) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [a]
ref ((a -> a -> Bool) -> a -> [a] -> [a]
forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq a
x)
a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
s3 :: Stream m a
s3 = m (Stream m a) -> Stream m a
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
(m (Stream m a) -> Stream m a) -> m (Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ do
[a]
xs1 <- IO [a] -> m [a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [a] -> m [a]) -> IO [a] -> m [a]
forall a b. (a -> b) -> a -> b
$ IORef [a] -> IO [a]
forall a. IORef a -> IO a
readIORef IORef [a]
ref
Stream m a -> m (Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a -> m (Stream m a)) -> Stream m a -> m (Stream m a)
forall a b. (a -> b) -> a -> b
$ [a] -> Stream m a
forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList [a]
xs1
Stream m a -> m (Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a -> m (Stream m a)) -> Stream m a -> m (Stream m a)
forall a b. (a -> b) -> a -> b
$ (a -> m a) -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
Stream.mapM a -> m a
forall (m :: * -> *). MonadIO m => a -> m a
f Stream m a
s2 Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
`Stream.append` Stream m a
s3
{-# INLINE unionWithStreamAscBy #-}
unionWithStreamAscBy ::
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
unionWithStreamAscBy :: (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
unionWithStreamAscBy a -> a -> Ordering
_eq Stream m a
_s1 Stream m a
_s2 = Stream m a
forall a. HasCallStack => a
undefined