{-# LANGUAGE CPP #-}
module Streamly.Internal.Data.Stream.Top
(
strideFromThen
, filterInStreamGenericBy
, deleteInStreamGenericBy
, unionWithStreamGenericBy
, filterInStreamAscBy
, deleteInStreamAscBy
, unionWithStreamAscBy
, joinInnerGeneric
, joinInnerAscBy
, joinLeftAscBy
, joinOuterAscBy
)
where
#include "inline.hs"
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (newIORef, readIORef, modifyIORef')
import Streamly.Internal.Data.Fold.Type (Fold)
import Streamly.Internal.Data.Stream.Type (Stream, cross)
import qualified Data.List as List
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Stream.Type as Stream
import qualified Streamly.Internal.Data.Stream.Nesting as Stream
import qualified Streamly.Internal.Data.Stream.Transform as Stream
import Prelude hiding (filter, zipWith, concatMap, concat)
#include "DocTestDataStream.hs"
{-# INLINE strideFromThen #-}
strideFromThen :: Monad m => Int -> Int -> Stream m a -> Stream m a
strideFromThen :: forall (m :: * -> *) a.
Monad m =>
Int -> Int -> Stream m a -> Stream m a
strideFromThen Int
offset Int
stride =
forall (m :: * -> *) a s b.
Monad m =>
(Stream m a -> Stream m (s, a))
-> (((s, a) -> b) -> Stream m (s, a) -> Stream m (s, a))
-> ((s, a) -> b)
-> Stream m a
-> Stream m a
Stream.with forall (m :: * -> *) a. Monad m => Stream m a -> Stream m (Int, a)
Stream.indexed forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.filter
(\(Int
i, a
_) -> Int
i forall a. Ord a => a -> a -> Bool
>= Int
offset Bool -> Bool -> Bool
&& (Int
i forall a. Num a => a -> a -> a
- Int
offset) forall a. Integral a => a -> a -> a
`mod` Int
stride forall a. Eq a => a -> a -> Bool
== Int
0)
{-# INLINE joinInnerGeneric #-}
joinInnerGeneric :: Monad m =>
(a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, b)
joinInnerGeneric :: forall (m :: * -> *) a b.
Monad m =>
(a -> b -> Bool) -> Stream m a -> Stream m b -> Stream m (a, b)
joinInnerGeneric a -> b -> Bool
eq Stream m a
s1 Stream m b
s2 = forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.filter (\(a
a, b
b) -> a
a a -> b -> Bool
`eq` b
b) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Stream m a -> Stream m b -> Stream m (a, b)
cross Stream m a
s1 Stream m b
s2
{-# INLINE joinInnerAscBy #-}
joinInnerAscBy ::
(a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b)
joinInnerAscBy :: forall a b (m :: * -> *).
(a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b)
joinInnerAscBy = forall a. HasCallStack => a
undefined
{-# INLINE joinLeftAscBy #-}
joinLeftAscBy ::
(a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, Maybe b)
joinLeftAscBy :: forall a b (m :: * -> *).
(a -> b -> Ordering)
-> Stream m a -> Stream m b -> Stream m (a, Maybe b)
joinLeftAscBy a -> b -> Ordering
_eq Stream m a
_s1 Stream m b
_s2 = forall a. HasCallStack => a
undefined
{-# INLINE joinOuterAscBy #-}
joinOuterAscBy ::
(a -> b -> Ordering)
-> Stream m a
-> Stream m b
-> Stream m (Maybe a, Maybe b)
joinOuterAscBy :: forall a b (m :: * -> *).
(a -> b -> Ordering)
-> Stream m a -> Stream m b -> Stream m (Maybe a, Maybe b)
joinOuterAscBy a -> b -> Ordering
_eq Stream m a
_s1 Stream m b
_s2 = forall a. HasCallStack => a
undefined
{-# INLINE filterStreamWith #-}
filterStreamWith :: Monad m =>
Fold m a (f a)
-> (a -> f a -> Bool)
-> Stream m a
-> Stream m a
-> Stream m a
filterStreamWith :: forall (m :: * -> *) a (f :: * -> *).
Monad m =>
Fold m a (f a)
-> (a -> f a -> Bool) -> Stream m a -> Stream m a -> Stream m a
filterStreamWith Fold m a (f a)
fld a -> f a -> Bool
member Stream m a
s1 Stream m a
s2 =
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
forall a b. (a -> b) -> a -> b
$ do
f a
xs <- forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m a (f a)
fld Stream m a
s1
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.filter (a -> f a -> Bool
`member` f a
xs) Stream m a
s2
{-# INLINE filterInStreamGenericBy #-}
filterInStreamGenericBy :: Monad m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
filterInStreamGenericBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
filterInStreamGenericBy a -> a -> Bool
eq =
forall (m :: * -> *) a (f :: * -> *).
Monad m =>
Fold m a (f a)
-> (a -> f a -> Bool) -> Stream m a -> Stream m a -> Stream m a
filterStreamWith
(forall (m :: * -> *) a b c.
Monad m =>
Fold m a (Maybe b) -> Fold m b c -> Fold m a c
Fold.scanMaybe (forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> Fold m a (Maybe a)
Fold.uniqBy a -> a -> Bool
eq) forall (m :: * -> *) a. Monad m => Fold m a [a]
Fold.toListRev)
(forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
List.any forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a -> Bool
eq)
{-# INLINE filterInStreamAscBy #-}
filterInStreamAscBy :: Monad m =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
filterInStreamAscBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
filterInStreamAscBy a -> a -> Ordering
eq Stream m a
s1 Stream m a
s2 = forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
Stream.intersectBySorted a -> a -> Ordering
eq Stream m a
s2 Stream m a
s1
{-# INLINE deleteInStreamGenericBy #-}
deleteInStreamGenericBy :: Monad m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
deleteInStreamGenericBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
deleteInStreamGenericBy a -> a -> Bool
eq Stream m a
s1 Stream m a
s2 =
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold forall (m :: * -> *) a. Monad m => Fold m a [a]
Fold.toList Stream m a
s2
let f :: Fold m a [a]
f = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
Fold.foldl' (forall a b c. (a -> b -> c) -> b -> a -> c
flip (forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq)) [a]
xs
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold Fold m a [a]
f Stream m a
s1
{-# INLINE deleteInStreamAscBy #-}
deleteInStreamAscBy ::
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
deleteInStreamAscBy :: forall a (m :: * -> *).
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
deleteInStreamAscBy a -> a -> Ordering
_eq Stream m a
_s1 Stream m a
_s2 = forall a. HasCallStack => a
undefined
{-# INLINE unionWithStreamGenericBy #-}
unionWithStreamGenericBy :: MonadIO m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
unionWithStreamGenericBy :: forall (m :: * -> *) a.
MonadIO m =>
(a -> a -> Bool) -> Stream m a -> Stream m a -> Stream m a
unionWithStreamGenericBy a -> a -> Bool
eq Stream m a
s1 Stream m a
s2 =
forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
forall a b. (a -> b) -> a -> b
$ do
[a]
xs <- forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
Stream.fold forall (m :: * -> *) a. Monad m => Fold m a [a]
Fold.toList Stream m a
s1
IORef [a]
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef forall a b. (a -> b) -> a -> b
$! forall a. (a -> a -> Bool) -> [a] -> [a]
List.nubBy a -> a -> Bool
eq [a]
xs
let f :: a -> m a
f a
x = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> (a -> a) -> IO ()
modifyIORef' IORef [a]
ref (forall a. (a -> a -> Bool) -> a -> [a] -> [a]
List.deleteBy a -> a -> Bool
eq a
x)
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
s3 :: Stream m a
s3 = forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
Stream.concatEffect
forall a b. (a -> b) -> a -> b
$ do
[a]
xs1 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef [a]
ref
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
Stream.fromList [a]
xs1
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
Stream.mapM forall {m :: * -> *}. MonadIO m => a -> m a
f Stream m a
s2 forall (m :: * -> *) a.
Monad m =>
Stream m a -> Stream m a -> Stream m a
`Stream.append` Stream m a
s3
{-# INLINE unionWithStreamAscBy #-}
unionWithStreamAscBy ::
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
unionWithStreamAscBy :: forall a (m :: * -> *).
(a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a
unionWithStreamAscBy a -> a -> Ordering
_eq Stream m a
_s1 Stream m a
_s2 = forall a. HasCallStack => a
undefined