{-# OPTIONS_GHC -Wno-orphans #-}
module Streamly.Internal.Data.Stream.IsStream.Common
(
fromPure
, fromEffect
, repeatM
, timesWith
, absTimesWith
, relTimesWith
, foldOn
, fold
, fold_
, map
, scanlMAfter'
, postscanlM'
, smapM
, take
, takeWhile
, takeEndBy
, drop
, findIndices
, intersperseM
, interjectSuffix
, reverse
, reverse'
, mkAsync
, mkParallel
, parallelFst
, concatM
, concatMapM
, concatMap
, splitOnSeq
, zipWithM
, zipWith
, yield
, yieldM
)
where
#include "inline.hs"
import Control.Concurrent (threadDelay)
import Control.Monad.IO.Class (MonadIO(..))
import Foreign.Storable (Storable)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Array.Foreign.Type (Array)
import Streamly.Internal.Data.Fold.Type (Fold (..))
import Streamly.Internal.Data.Stream.IsStream.Combinators (maxYields)
import Streamly.Internal.Data.Stream.IsStream.Type
(IsStream(..), fromStreamD, toStreamD, fromStreamS, toStreamS)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Time.Units (AbsTime, RelTime64, addToAbsTime64)
import qualified Streamly.Internal.Data.Array.Foreign.Type as A
import qualified Streamly.Internal.Data.Stream.Async as Async
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import qualified Streamly.Internal.Data.Stream.StreamD as D
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S
#else
import qualified Streamly.Internal.Data.Stream.StreamD as S
#endif
import Streamly.Internal.System.IO (defaultChunkSize)
import Prelude hiding (take, takeWhile, drop, reverse, concatMap, map, zipWith)
{-# INLINE fromPure #-}
fromPure :: IsStream t => a -> t m a
fromPure :: forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
fromPure = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a (m :: * -> *). a -> Stream m a
K.fromPure
{-# DEPRECATED yield "Please use fromPure instead." #-}
{-# INLINE yield #-}
yield :: IsStream t => a -> t m a
yield :: forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
yield = forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
fromPure
{-# INLINE fromEffect #-}
fromEffect :: (Monad m, IsStream t) => m a -> t m a
fromEffect :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => m a -> Stream m a
K.fromEffect
{-# DEPRECATED yieldM "Please use fromEffect instead." #-}
{-# INLINE yieldM #-}
yieldM :: (Monad m, IsStream t) => m a -> t m a
yieldM :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
yieldM = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
fromEffect
{-# INLINE_EARLY repeatM #-}
repeatM :: (IsStream t, MonadAsync m) => m a -> t m a
repeatM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a
repeatM = forall (m :: * -> *) a (t :: (* -> *) -> * -> *).
(m a -> t m a -> t m a) -> m a -> t m a
K.repeatMWith forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
IsStream.consM
{-# RULES "repeatM serial" repeatM = repeatMSerial #-}
{-# INLINE repeatMSerial #-}
repeatMSerial :: MonadAsync m => m a -> SerialT m a
repeatMSerial :: forall (m :: * -> *) a. MonadAsync m => m a -> SerialT m a
repeatMSerial = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => m a -> Stream m a
S.repeatM
{-# INLINE timesWith #-}
timesWith :: (IsStream t, MonadAsync m) => Double -> t m (AbsTime, RelTime64)
timesWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
Double -> t m (AbsTime, RelTime64)
timesWith Double
g = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *).
MonadIO m =>
Double -> Stream m (AbsTime, RelTime64)
D.times Double
g
{-# INLINE absTimesWith #-}
absTimesWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m AbsTime
absTimesWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m AbsTime
absTimesWith = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry AbsTime -> RelTime64 -> AbsTime
addToAbsTime64) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
Double -> t m (AbsTime, RelTime64)
timesWith
{-# INLINE relTimesWith #-}
relTimesWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m RelTime64
relTimesWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m RelTime64
relTimesWith = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> b
snd forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
Double -> t m (AbsTime, RelTime64)
timesWith
{-# INLINE foldOn #-}
foldOn :: Monad m => Fold m a b -> SerialT m a -> Fold m a b
foldOn :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> SerialT m a -> Fold m a b
foldOn Fold m a b
f SerialT m a
s = forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Fold m a b
D.foldOn Fold m a b
f forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD SerialT m a
s
{-# INLINE fold #-}
fold :: Monad m => Fold m a b -> SerialT m a -> m b
fold :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> SerialT m a -> m b
fold Fold m a b
fl SerialT m a
strm = do
(b
b, SerialT m a
_) <- forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> SerialT m a -> m (b, SerialT m a)
fold_ Fold m a b
fl SerialT m a
strm
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$! b
b
{-# INLINE fold_ #-}
fold_ :: Monad m => Fold m a b -> SerialT m a -> m (b, SerialT m a)
fold_ :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> SerialT m a -> m (b, SerialT m a)
fold_ Fold m a b
fl SerialT m a
strm = do
(b
b, Stream m a
str) <- forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m (b, Stream m a)
D.fold_ Fold m a b
fl forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD SerialT m a
strm
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$! (b
b, forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
IsStream.fromStreamD Stream m a
str)
{-# INLINE map #-}
map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b
map :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
map a -> b
f = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> Stream m a -> Stream m b
D.map a -> b
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE scanlMAfter' #-}
scanlMAfter' :: (IsStream t, Monad m)
=> (b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
scanlMAfter' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
scanlMAfter' b -> a -> m b
step m b
initial b -> m b
done t m a
stream =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
D.scanlMAfter' b -> a -> m b
step m b
initial b -> m b
done forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
stream
{-# INLINE postscanlM' #-}
postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
postscanlM' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> t m a -> t m b
postscanlM' b -> a -> m b
step m b
z t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
D.postscanlM' b -> a -> m b
step m b
z forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE smapM #-}
smapM :: (IsStream t, Monad m) =>
(s -> a -> m (s, b))
-> m s
-> t m a
-> t m b
smapM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) s a b.
(IsStream t, Monad m) =>
(s -> a -> m (s, b)) -> m s -> t m a -> t m b
smapM s -> a -> m (s, b)
step m s
initial t m a
stream =
let r :: t m (s, b)
r = forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> t m a -> t m b
postscanlM'
(\(s
s, b
_) a
a -> s -> a -> m (s, b)
step s
s a
a)
(forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (,forall a. HasCallStack => a
undefined) m s
initial)
t m a
stream
in forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
map forall a b. (a, b) -> b
snd t m (s, b)
r
{-# INLINE take #-}
take :: (IsStream t, Monad m) => Int -> t m a -> t m a
take :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Int -> t m a -> t m a
take Int
n t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Applicative m =>
Int -> Stream m a -> Stream m a
S.take Int
n forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Int64 -> t m a -> t m a
maxYields (forall a. a -> Maybe a
Just (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)) t m a
m)
{-# INLINE takeWhile #-}
takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
takeWhile :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
takeWhile a -> Bool
p t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
S.takeWhile a -> Bool
p forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
{-# INLINE takeEndBy #-}
takeEndBy :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
takeEndBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
takeEndBy a -> Bool
p t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
D.takeEndBy a -> Bool
p forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE drop #-}
drop :: (IsStream t, Monad m) => Int -> t m a -> t m a
drop :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Int -> t m a -> t m a
drop Int
n t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => Int -> Stream m a -> Stream m a
S.drop Int
n forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
{-# INLINE findIndices #-}
findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int
findIndices :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m Int
findIndices a -> Bool
p t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m Int
S.findIndices a -> Bool
p (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m)
{-# INLINE intersperseM #-}
intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
intersperseM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
intersperseM m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
S.intersperseM m a
m forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS
{-# INLINE interjectSuffix #-}
interjectSuffix
:: (IsStream t, MonadAsync m)
=> Double -> m a -> t m a -> t m a
interjectSuffix :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
Double -> m a -> t m a -> t m a
interjectSuffix Double
n m a
f t m a
xs = t m a
xs forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`parallelFst` forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a
repeatM m a
timed
where timed :: m a
timed = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO ()
threadDelay (forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
n forall a. Num a => a -> a -> a
* Double
1000000)) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m a
f
{-# INLINE reverse #-}
reverse :: (IsStream t, Monad m) => t m a -> t m a
reverse :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m a
reverse t m a
s = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
S.reverse forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
s
{-# INLINE reverse' #-}
reverse' :: (IsStream t, MonadIO m, Storable a) => t m a -> t m a
reverse' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
t m a -> t m a
reverse' =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Stream m (Array a) -> Stream m a
A.flattenArraysRev
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Applicative m => Stream m a -> Stream m a
D.fromStreamK
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Stream m a -> Stream m a
K.reverse
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
D.toStreamK
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Int -> Stream m a -> Stream m (Array a)
A.arraysOf Int
defaultChunkSize
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE_NORMAL mkAsync #-}
mkAsync :: (IsStream t, MonadAsync m) => t m a -> t m a
mkAsync :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a
mkAsync = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
Async.mkAsyncD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE_NORMAL mkParallel #-}
mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a
mkParallel :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a
mkParallel = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
Par.mkParallelD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE parallelFst #-}
parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
parallelFst :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
parallelFst t m a
m1 t m a
m2 = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
Par.parallelFstK (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m1) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m2)
{-# INLINE concatMapM #-}
concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b
concatMapM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m (t m b)) -> t m a -> t m b
concatMapM a -> m (t m b)
f t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> m (Stream m b)) -> Stream m a -> Stream m b
D.concatMapM (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m (t m b)
f) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE concatMap #-}
concatMap ::(IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b
concatMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> t m b) -> t m a -> t m b
concatMap a -> t m b
f t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> Stream m b) -> Stream m a -> Stream m b
D.concatMap (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> t m b
f) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
{-# INLINE concatM #-}
concatM :: (IsStream t, Monad m) => m (t m a) -> t m a
concatM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM m (t m a)
generator = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m (t m b)) -> t m a -> t m b
concatMapM (\() -> m (t m a)
generator) (forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
fromPure ())
{-# INLINE splitOnSeq #-}
splitOnSeq
:: (IsStream t, MonadIO m, Storable a, Enum a, Eq a)
=> Array a -> Fold m a b -> t m a -> t m b
splitOnSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadIO m, Storable a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitOnSeq Array a
patt Fold m a b
f t m a
m =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
IsStream.fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
(MonadIO m, Storable a, Enum a, Eq a) =>
Array a -> Fold m a b -> Stream m a -> Stream m b
D.splitOnSeq Array a
patt Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamD t m a
m)
{-# INLINE zipWithM #-}
zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
zipWithM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b c.
(IsStream t, Monad m) =>
(a -> b -> m c) -> t m a -> t m b -> t m c
zipWithM a -> b -> m c
f t m a
m1 t m b
m2 =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
IsStream.fromStreamS
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> m c) -> Stream m a -> Stream m b -> Stream m c
S.zipWithM a -> b -> m c
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamS t m a
m1) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamS t m b
m2)
{-# INLINE zipWith #-}
zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c
zipWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b c.
(IsStream t, Monad m) =>
(a -> b -> c) -> t m a -> t m b -> t m c
zipWith a -> b -> c
f t m a
m1 t m b
m2 =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
IsStream.fromStreamS
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
S.zipWith a -> b -> c
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamS t m a
m1) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
IsStream.toStreamS t m b
m2)