module Streamly.Internal.Data.Stream.IsStream.Transform
(
transform
, foldrS
, foldrT
, Serial.map
, sequence
, mapM
, smapM
, trace
, trace_
, tap
, tapOffsetEvery
, tapAsync
, tapRate
, pollCounts
, scan
, postscan
, scanl'
, scanlM'
, scanlMAfter'
, postscanl'
, postscanlM'
, prescanl'
, prescanlM'
, scanl1'
, scanl1M'
, with
, deleteBy
, filter
, filterM
, uniq
, uniqBy
, nubBy
, nubWindowBy
, prune
, repeated
, take
, takeInterval
, takeLast
, takeLastInterval
, takeWhile
, takeWhileM
, takeWhileLast
, takeWhileAround
, drop
, dropInterval
, dropLast
, dropLastInterval
, dropWhile
, dropWhileM
, dropWhileLast
, dropWhileAround
, intersperse
, intersperseM
, intersperseBySpan
, intersperseSuffix
, intersperseSuffixBySpan
, interjectSuffix
, intersperseM_
, delay
, intersperseSuffix_
, delayPost
, interspersePrefix_
, delayPre
, insertBy
, reverse
, reverse'
, reassembleBy
, indexed
, indexedR
, timestamped
, timestampWith
, timeIndexed
, timeIndexWith
, findIndices
, elemIndices
, rollingMapM
, rollingMap
, catMaybes
, mapMaybe
, mapMaybeM
, lefts
, rights
, Par.mkParallel
, applyAsync
, (|$)
, (|&)
, maxThreads
, maxBuffer
, sampleOld
, sampleNew
, sampleRate
, Rate (..)
, rate
, avgRate
, minRate
, maxRate
, constRate
, inspectMode
, scanx
)
where
#include "inline.hs"
import Control.Concurrent (threadDelay)
import Control.Monad (void)
import Control.Monad.Catch (MonadCatch)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Class (MonadTrans(..))
import Data.Either (isLeft, isRight)
import Data.Kind (Type)
import Data.Maybe (isJust, fromJust)
import Streamly.Internal.BaseCompat (fromLeft, fromRight)
import Streamly.Internal.Data.Fold.Type (Fold (..))
import Streamly.Internal.Data.Pipe.Type (Pipe (..))
import Streamly.Internal.Data.Stream.IsStream.Combinators
( inspectMode, maxBuffer, maxThreads, rate, avgRate, minRate
, maxRate, constRate)
import Streamly.Internal.Data.Stream.IsStream.Common
( absTimesWith
, drop
, findIndices
, postscanlM'
, relTimesWith
, reverse
, reverse'
, scanlMAfter'
, smapM
, take
, takeWhile
, interjectSuffix
, intersperseM
)
import Streamly.Internal.Data.Stream.Prelude (fromStreamS, toStreamS)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamD (fromStreamD, toStreamD)
import Streamly.Internal.Data.Stream.StreamK (IsStream)
import Streamly.Internal.Data.SVar (MonadAsync, Rate(..))
import Streamly.Internal.Data.Time.Units (TimeUnit64, AbsTime, RelTime64)
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.Prelude as P
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.StreamD as D
import qualified Streamly.Internal.Data.Stream.StreamK as K
import qualified Streamly.Internal.Data.Stream.Zip as Z
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S
#else
import qualified Streamly.Internal.Data.Stream.StreamD as S
#endif
import Prelude hiding
( filter, drop, dropWhile, take, takeWhile, foldr, map, mapM, sequence
, reverse, foldr1 , scanl, scanl1)
{-# INLINE transform #-}
transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b
transform :: Pipe m a b -> t m a -> t m b
transform Pipe m a b
pipe t m a
xs = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Pipe m a b -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Pipe m a b -> Stream m a -> Stream m b
D.transform Pipe m a b
pipe (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs)
{-# INLINE foldrS #-}
foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrS :: (a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrS = (a -> t m b -> t m b) -> t m b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) a (m :: * -> *) b.
IsStream t =>
(a -> t m b -> t m b) -> t m b -> t m a -> t m b
K.foldrS
{-# INLINE foldrT #-}
foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s)
=> (a -> s m b -> s m b) -> s m b -> t m a -> s m b
foldrT :: (a -> s m b -> s m b) -> s m b -> t m a -> s m b
foldrT a -> s m b -> s m b
f s m b
z t m a
s = (a -> s m b -> s m b) -> s m b -> Stream m a -> s m b
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
(Monad m, Monad (t m), MonadTrans t) =>
(a -> t m b -> t m b) -> t m b -> Stream m a -> t m b
S.foldrT a -> s m b -> s m b
f s m b
z (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
s)
{-# INLINE_EARLY mapM #-}
mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b
mapM :: (a -> m b) -> t m a -> t m b
mapM = (a -> m b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
K.mapM
{-# RULES "mapM serial" mapM = mapMSerial #-}
{-# INLINE mapMSerial #-}
mapMSerial :: Monad m => (a -> m b) -> SerialT m a -> SerialT m b
mapMSerial :: (a -> m b) -> SerialT m a -> SerialT m b
mapMSerial = (a -> m b) -> SerialT m a -> SerialT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m b) -> t m a -> t m b
Serial.mapM
{-# INLINE sequence #-}
sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a
sequence :: t m (m a) -> t m a
sequence t m (m a)
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Stream m (m a) -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m (m a) -> Stream m a
S.sequence (t m (m a) -> Stream m (m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m (m a)
m)
{-# INLINE tap #-}
tap :: (IsStream t, Monad m) => FL.Fold m a b -> t m a -> t m a
tap :: Fold m a b -> t m a -> t m a
tap Fold m a b
f t m a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m a
D.tap Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
xs)
{-# INLINE tapOffsetEvery #-}
tapOffsetEvery :: (IsStream t, Monad m)
=> Int -> Int -> FL.Fold m a b -> t m a -> t m a
tapOffsetEvery :: Int -> Int -> Fold m a b -> t m a -> t m a
tapOffsetEvery Int
offset Int
n Fold m a b
f t m a
xs =
Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Fold m a b -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
Monad m =>
Int -> Int -> Fold m a b -> Stream m a -> Stream m a
D.tapOffsetEvery Int
offset Int
n Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
xs)
{-# INLINE tapAsync #-}
tapAsync :: (IsStream t, MonadAsync m) => FL.Fold m a b -> t m a -> t m a
tapAsync :: Fold m a b -> t m a -> t m a
tapAsync Fold m a b
f t m a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Stream m a -> Stream m a
forall (m :: * -> *) a b.
MonadAsync m =>
Fold m a b -> Stream m a -> Stream m a
Par.tapAsyncF Fold m a b
f (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
xs)
{-# INLINE pollCounts #-}
pollCounts ::
(IsStream t, MonadAsync m)
=> (a -> Bool)
-> (t m Int -> t m Int)
-> Fold m Int b
-> t m a
-> t m a
pollCounts :: (a -> Bool)
-> (t m Int -> t m Int) -> Fold m Int b -> t m a -> t m a
pollCounts a -> Bool
predicate t m Int -> t m Int
transf Fold m Int b
f t m a
xs =
Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD
(Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> Bool)
-> (Stream m Int -> Stream m Int)
-> Fold m Int b
-> Stream m a
-> Stream m a
forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool)
-> (Stream m Int -> Stream m Int)
-> Fold m Int b
-> Stream m a
-> Stream m a
D.pollCounts a -> Bool
predicate (t m Int -> Stream m Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD (t m Int -> Stream m Int)
-> (Stream m Int -> t m Int) -> Stream m Int -> Stream m Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m Int -> t m Int
transf (t m Int -> t m Int)
-> (Stream m Int -> t m Int) -> Stream m Int -> t m Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m Int -> t m Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD) Fold m Int b
f
(Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
xs
{-# INLINE tapRate #-}
tapRate ::
(IsStream t, MonadAsync m, MonadCatch m)
=> Double
-> (Int -> m b)
-> t m a
-> t m a
tapRate :: Double -> (Int -> m b) -> t m a -> t m a
tapRate Double
n Int -> m b
f t m a
xs = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ Double -> (Int -> m b) -> Stream m a -> Stream m a
forall (m :: * -> *) b a.
(MonadAsync m, MonadCatch m) =>
Double -> (Int -> m b) -> Stream m a -> Stream m a
D.tapRate Double
n Int -> m b
f (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
xs
{-# INLINE trace #-}
trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a
trace :: (a -> m b) -> t m a -> t m a
trace a -> m b
f = (a -> m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM (\a
x -> m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (a -> m b
f a
x) m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x)
{-# INLINE trace_ #-}
trace_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
trace_ :: m b -> t m a -> t m a
trace_ m b
eff = (a -> m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> m b) -> t m a -> t m b
Serial.mapM (\a
x -> m b
eff m b -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x)
{-# INLINE scan #-}
scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
scan :: Fold m a b -> t m a -> t m b
scan = Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
P.scanOnce
{-# INLINE postscan #-}
postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
postscan :: Fold m a b -> t m a -> t m b
postscan = Fold m a b -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
P.postscanOnce
{-# DEPRECATED scanx "Please use scanl followed by map instead." #-}
{-# INLINE scanx #-}
scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
scanx :: (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
scanx = (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) x a b.
(IsStream t, Monad m) =>
(x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
P.scanlx'
{-# INLINE scanlM' #-}
scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
scanlM' :: (b -> a -> m b) -> m b -> t m a -> t m b
scanlM' b -> a -> m b
step m b
begin t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> m b) -> m b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
D.scanlM' b -> a -> m b
step m b
begin (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE scanl' #-}
scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
scanl' :: (b -> a -> b) -> b -> t m a -> t m b
scanl' b -> a -> b
step b
z t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> b) -> b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
S.scanl' b -> a -> b
step b
z (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
{-# INLINE postscanl' #-}
postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
postscanl' :: (b -> a -> b) -> b -> t m a -> t m b
postscanl' b -> a -> b
step b
z t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> b) -> b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
D.postscanl' b -> a -> b
step b
z (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE prescanl' #-}
prescanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
prescanl' :: (b -> a -> b) -> b -> t m a -> t m b
prescanl' b -> a -> b
step b
z t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> b) -> b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
D.prescanl' b -> a -> b
step b
z (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE prescanlM' #-}
prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
prescanlM' :: (b -> a -> m b) -> m b -> t m a -> t m b
prescanlM' b -> a -> m b
step m b
z t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (b -> a -> m b) -> m b -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
D.prescanlM' b -> a -> m b
step m b
z (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE scanl1M' #-}
scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a
scanl1M' :: (a -> a -> m a) -> t m a -> t m a
scanl1M' a -> a -> m a
step t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> a -> m a) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> m a) -> Stream m a -> Stream m a
D.scanl1M' a -> a -> m a
step (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE scanl1' #-}
scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a
scanl1' :: (a -> a -> a) -> t m a -> t m a
scanl1' a -> a -> a
step t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> a -> a) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> a) -> Stream m a -> Stream m a
D.scanl1' a -> a -> a
step (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE with #-}
with :: forall (t :: (Type -> Type) -> Type -> Type) m a b s. Functor (t m) =>
(t m a -> t m (s, a))
-> (((s, a) -> b) -> t m (s, a) -> t m (s, a))
-> (((s, a) -> b) -> t m a -> t m a)
with :: (t m a -> t m (s, a))
-> (((s, a) -> b) -> t m (s, a) -> t m (s, a))
-> ((s, a) -> b)
-> t m a
-> t m a
with t m a -> t m (s, a)
f ((s, a) -> b) -> t m (s, a) -> t m (s, a)
comb (s, a) -> b
g = ((s, a) -> a) -> t m (s, a) -> t m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (s, a) -> a
forall a b. (a, b) -> b
snd (t m (s, a) -> t m a) -> (t m a -> t m (s, a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((s, a) -> b) -> t m (s, a) -> t m (s, a)
comb (s, a) -> b
g (t m (s, a) -> t m (s, a))
-> (t m a -> t m (s, a)) -> t m a -> t m (s, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> t m (s, a)
f
{-# INLINE filter #-}
#if __GLASGOW_HASKELL__ != 802
filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
filter :: (a -> Bool) -> t m a -> t m a
filter a -> Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
S.filter a -> Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
#else
filter :: IsStream t => (a -> Bool) -> t m a -> t m a
filter = K.filter
#endif
{-# INLINE filterM #-}
filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
filterM :: (a -> m Bool) -> t m a -> t m a
filterM a -> m Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
D.filterM a -> m Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE uniqBy #-}
uniqBy :: (IsStream t, Monad m, Functor (t m)) =>
(a -> a -> Bool) -> t m a -> t m a
uniqBy :: (a -> a -> Bool) -> t m a -> t m a
uniqBy a -> a -> Bool
eq =
t m (Maybe a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
catMaybes (t m (Maybe a) -> t m a)
-> (t m a -> t m (Maybe a)) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> a -> Maybe a) -> t m a -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> b) -> t m a -> t m b
rollingMap (\a
x a
y -> if a
x a -> a -> Bool
`eq` a
y then Maybe a
forall a. Maybe a
Nothing else a -> Maybe a
forall a. a -> Maybe a
Just a
y)
{-# INLINE uniq #-}
uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a
uniq :: t m a -> t m a
uniq = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m a
forall a (m :: * -> *). (Eq a, Monad m) => Stream m a -> Stream m a
D.uniq (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE prune #-}
prune ::
(a -> Bool) -> t m a -> t m a
prune :: (a -> Bool) -> t m a -> t m a
prune = [Char] -> (a -> Bool) -> t m a -> t m a
forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"
repeated ::
t m a -> t m a
repeated :: t m a -> t m a
repeated = t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE nubBy #-}
nubBy ::
(a -> a -> Bool) -> t m a -> t m a
nubBy :: (a -> a -> Bool) -> t m a -> t m a
nubBy = (a -> a -> Bool) -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE nubWindowBy #-}
nubWindowBy ::
Int -> (a -> a -> Bool) -> t m a -> t m a
nubWindowBy :: Int -> (a -> a -> Bool) -> t m a -> t m a
nubWindowBy = Int -> (a -> a -> Bool) -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE deleteBy #-}
deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a
deleteBy :: (a -> a -> Bool) -> a -> t m a -> t m a
deleteBy a -> a -> Bool
cmp a
x t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> a -> Bool) -> a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> a -> Stream m a -> Stream m a
S.deleteBy a -> a -> Bool
cmp a
x (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m)
{-# INLINE sampleOld #-}
sampleOld ::
Int -> t m a -> t m a
sampleOld :: Int -> t m a -> t m a
sampleOld = Int -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE sampleNew #-}
sampleNew ::
Int -> t m a -> t m a
sampleNew :: Int -> t m a -> t m a
sampleNew = Int -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE sampleRate #-}
sampleRate ::
Double -> t m a -> t m a
sampleRate :: Double -> t m a -> t m a
sampleRate = Double -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE takeWhileM #-}
takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
takeWhileM :: (a -> m Bool) -> t m a -> t m a
takeWhileM a -> m Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
D.takeWhileM a -> m Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE takeLast #-}
takeLast ::
Int -> t m a -> t m a
takeLast :: Int -> t m a -> t m a
takeLast = Int -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE takeLastInterval #-}
takeLastInterval ::
Double -> t m a -> t m a
takeLastInterval :: Double -> t m a -> t m a
takeLastInterval = Double -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE takeWhileLast #-}
takeWhileLast ::
(a -> Bool) -> t m a -> t m a
takeWhileLast :: (a -> Bool) -> t m a -> t m a
takeWhileLast = (a -> Bool) -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE takeWhileAround #-}
takeWhileAround ::
(a -> Bool) -> t m a -> t m a
takeWhileAround :: (a -> Bool) -> t m a -> t m a
takeWhileAround = (a -> Bool) -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE takeInterval #-}
takeInterval ::(MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a
takeInterval :: d -> t m a -> t m a
takeInterval d
d = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. d -> Stream m a -> Stream m a
forall (m :: * -> *) t a.
(MonadIO m, TimeUnit64 t) =>
t -> Stream m a -> Stream m a
D.takeByTime d
d (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE dropWhile #-}
dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
dropWhile :: (a -> Bool) -> t m a -> t m a
dropWhile a -> Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
S.dropWhile a -> Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
{-# INLINE dropWhileM #-}
dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
dropWhileM :: (a -> m Bool) -> t m a -> t m a
dropWhileM a -> m Bool
p t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
D.dropWhileM a -> m Bool
p (Stream m a -> Stream m a) -> Stream m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE dropInterval #-}
dropInterval ::(MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a
dropInterval :: d -> t m a -> t m a
dropInterval d
d = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. d -> Stream m a -> Stream m a
forall (m :: * -> *) t a.
(MonadIO m, TimeUnit64 t) =>
t -> Stream m a -> Stream m a
D.dropByTime d
d (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE dropLast #-}
dropLast ::
Int -> t m a -> t m a
dropLast :: Int -> t m a -> t m a
dropLast = Int -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE dropLastInterval #-}
dropLastInterval ::
Int -> t m a -> t m a
dropLastInterval :: Int -> t m a -> t m a
dropLastInterval = Int -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE dropWhileLast #-}
dropWhileLast ::
(a -> Bool) -> t m a -> t m a
dropWhileLast :: (a -> Bool) -> t m a -> t m a
dropWhileLast = (a -> Bool) -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE dropWhileAround #-}
dropWhileAround ::
(a -> Bool) -> t m a -> t m a
dropWhileAround :: (a -> Bool) -> t m a -> t m a
dropWhileAround = (a -> Bool) -> t m a -> t m a
forall a. HasCallStack => a
undefined
{-# INLINE insertBy #-}
insertBy ::
(IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a
insertBy :: (a -> a -> Ordering) -> a -> t m a -> t m a
insertBy a -> a -> Ordering
cmp a
x t m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ (a -> a -> Ordering) -> a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> a -> Stream m a -> Stream m a
S.insertBy a -> a -> Ordering
cmp a
x (t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m)
{-# INLINE intersperse #-}
intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a
intersperse :: a -> t m a -> t m a
intersperse a
a = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => a -> Stream m a -> Stream m a
S.intersperse a
a (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS
{-# INLINE intersperseM_ #-}
intersperseM_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
intersperseM_ :: m b -> t m a -> t m a
intersperseM_ m b
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m b -> Stream m a -> Stream m a
forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
D.intersperseM_ m b
m (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE intersperseBySpan #-}
intersperseBySpan ::
Int -> m a -> t m a -> t m a
intersperseBySpan :: Int -> m a -> t m a -> t m a
intersperseBySpan Int
_n m a
_f t m a
_xs = t m a
forall a. HasCallStack => a
undefined
{-# INLINE intersperseSuffix #-}
intersperseSuffix :: (IsStream t, Monad m) => m a -> t m a -> t m a
intersperseSuffix :: m a -> t m a -> t m a
intersperseSuffix m a
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
D.intersperseSuffix m a
m (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE intersperseSuffix_ #-}
intersperseSuffix_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
intersperseSuffix_ :: m b -> t m a -> t m a
intersperseSuffix_ m b
m = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m b -> Stream m a -> Stream m a
forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
D.intersperseSuffix_ m b
m (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE intersperseSuffixBySpan #-}
intersperseSuffixBySpan :: (IsStream t, Monad m)
=> Int -> m a -> t m a -> t m a
intersperseSuffixBySpan :: Int -> m a -> t m a -> t m a
intersperseSuffixBySpan Int
n m a
eff =
Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
Monad m =>
Int -> m a -> Stream m a -> Stream m a
D.intersperseSuffixBySpan Int
n m a
eff (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE interspersePrefix_ #-}
interspersePrefix_ :: (IsStream t, MonadAsync m) => m b -> t m a -> t m a
interspersePrefix_ :: m b -> t m a -> t m a
interspersePrefix_ m b
m = (a -> m a) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM (\a
x -> m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void m b
m m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x)
{-# INLINE delay #-}
delay :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
delay :: Double -> t m a -> t m a
delay Double
n = m () -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
intersperseM_ (m () -> t m a -> t m a) -> m () -> t m a -> t m a
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
n Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000
{-# INLINE delayPost #-}
delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
delayPost :: Double -> t m a -> t m a
delayPost Double
n = m () -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
intersperseSuffix_ (m () -> t m a -> t m a) -> m () -> t m a -> t m a
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
n Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000
{-# INLINE delayPre #-}
delayPre :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
delayPre :: Double -> t m a -> t m a
delayPre Double
n = m () -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
trace_ (m () -> t m a -> t m a) -> m () -> t m a -> t m a
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
n Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000
{-# INLINE reassembleBy #-}
reassembleBy
::
Fold m a b
-> (a -> a -> Int)
-> t m a
-> t m b
reassembleBy :: Fold m a b -> (a -> a -> Int) -> t m a -> t m b
reassembleBy = Fold m a b -> (a -> a -> Int) -> t m a -> t m b
forall a. HasCallStack => a
undefined
{-# INLINE indexed #-}
indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a)
indexed :: t m a -> t m (Int, a)
indexed = Stream m (Int, a) -> t m (Int, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m (Int, a) -> t m (Int, a))
-> (t m a -> Stream m (Int, a)) -> t m a -> t m (Int, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m (Int, a)
forall (m :: * -> *) a. Monad m => Stream m a -> Stream m (Int, a)
D.indexed (Stream m a -> Stream m (Int, a))
-> (t m a -> Stream m a) -> t m a -> Stream m (Int, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE indexedR #-}
indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a)
indexedR :: Int -> t m a -> t m (Int, a)
indexedR Int
n = Stream m (Int, a) -> t m (Int, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m (Int, a) -> t m (Int, a))
-> (t m a -> Stream m (Int, a)) -> t m a -> t m (Int, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Stream m a -> Stream m (Int, a)
forall (m :: * -> *) a.
Monad m =>
Int -> Stream m a -> Stream m (Int, a)
D.indexedR Int
n (Stream m a -> Stream m (Int, a))
-> (t m a -> Stream m a) -> t m a -> Stream m (Int, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE timestampWith #-}
timestampWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m a -> t m (AbsTime, a)
timestampWith :: Double -> t m a -> t m (AbsTime, a)
timestampWith Double
g t m a
stream = (a -> AbsTime -> (AbsTime, a))
-> t m a -> t m AbsTime -> t m (AbsTime, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b c.
(IsStream t, Monad m) =>
(a -> b -> c) -> t m a -> t m b -> t m c
Z.zipWith ((AbsTime -> a -> (AbsTime, a)) -> a -> AbsTime -> (AbsTime, a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip (,)) t m a
stream (Double -> t m AbsTime
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m AbsTime
absTimesWith Double
g)
{-# INLINE timestamped #-}
timestamped :: (IsStream t, MonadAsync m, Functor (t m))
=> t m a -> t m (AbsTime, a)
timestamped :: t m a -> t m (AbsTime, a)
timestamped = Double -> t m a -> t m (AbsTime, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m (AbsTime, a)
timestampWith Double
0.01
{-# INLINE timeIndexWith #-}
timeIndexWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m a -> t m (RelTime64, a)
timeIndexWith :: Double -> t m a -> t m (RelTime64, a)
timeIndexWith Double
g t m a
stream = (a -> RelTime64 -> (RelTime64, a))
-> t m a -> t m RelTime64 -> t m (RelTime64, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b c.
(IsStream t, Monad m) =>
(a -> b -> c) -> t m a -> t m b -> t m c
Z.zipWith ((RelTime64 -> a -> (RelTime64, a))
-> a -> RelTime64 -> (RelTime64, a)
forall a b c. (a -> b -> c) -> b -> a -> c
flip (,)) t m a
stream (Double -> t m RelTime64
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m RelTime64
relTimesWith Double
g)
{-# INLINE timeIndexed #-}
timeIndexed :: (IsStream t, MonadAsync m, Functor (t m))
=> t m a -> t m (RelTime64, a)
timeIndexed :: t m a -> t m (RelTime64, a)
timeIndexed = Double -> t m a -> t m (RelTime64, a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m (RelTime64, a)
timeIndexWith Double
0.01
{-# INLINE elemIndices #-}
elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int
elemIndices :: a -> t m a -> t m Int
elemIndices a
a = (a -> Bool) -> t m a -> t m Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m Int
findIndices (a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
a)
{-# INLINE rollingMap #-}
rollingMap :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b
rollingMap :: (a -> a -> b) -> t m a -> t m b
rollingMap a -> a -> b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> a -> b) -> Stream m a -> Stream m b
D.rollingMap a -> a -> b
f (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE rollingMapM #-}
rollingMapM :: (IsStream t, Monad m) => (a -> a -> m b) -> t m a -> t m b
rollingMapM :: (a -> a -> m b) -> t m a -> t m b
rollingMapM a -> a -> m b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> a -> m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> a -> m b) -> Stream m a -> Stream m b
D.rollingMapM a -> a -> m b
f (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE mapMaybe #-}
mapMaybe :: (IsStream t, Monad m) => (a -> Maybe b) -> t m a -> t m b
mapMaybe :: (a -> Maybe b) -> t m a -> t m b
mapMaybe a -> Maybe b
f t m a
m = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ (a -> Maybe b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Stream m a -> Stream m b
S.mapMaybe a -> Maybe b
f (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
{-# INLINE_EARLY mapMaybeM #-}
mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m))
=> (a -> m (Maybe b)) -> t m a -> t m b
mapMaybeM :: (a -> m (Maybe b)) -> t m a -> t m b
mapMaybeM a -> m (Maybe b)
f = (Maybe b -> b) -> t m (Maybe b) -> t m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe b -> b
forall a. HasCallStack => Maybe a -> a
fromJust (t m (Maybe b) -> t m b)
-> (t m a -> t m (Maybe b)) -> t m a -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe b -> Bool) -> t m (Maybe b) -> t m (Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter Maybe b -> Bool
forall a. Maybe a -> Bool
isJust (t m (Maybe b) -> t m (Maybe b))
-> (t m a -> t m (Maybe b)) -> t m a -> t m (Maybe b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> m (Maybe b)) -> t m a -> t m (Maybe b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
K.mapM a -> m (Maybe b)
f
{-# RULES "mapMaybeM serial" mapMaybeM = mapMaybeMSerial #-}
{-# INLINE mapMaybeMSerial #-}
mapMaybeMSerial :: Monad m => (a -> m (Maybe b)) -> SerialT m a -> SerialT m b
mapMaybeMSerial :: (a -> m (Maybe b)) -> SerialT m a -> SerialT m b
mapMaybeMSerial a -> m (Maybe b)
f SerialT m a
m = Stream m b -> SerialT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> SerialT m b) -> Stream m b -> SerialT m b
forall a b. (a -> b) -> a -> b
$ (a -> m (Maybe b)) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
(a -> m (Maybe b)) -> Stream m a -> Stream m b
D.mapMaybeM a -> m (Maybe b)
f (Stream m a -> Stream m b) -> Stream m a -> Stream m b
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD SerialT m a
m
catMaybes :: (IsStream t, Monad m, Functor (t m)) => t m (Maybe a) -> t m a
catMaybes :: t m (Maybe a) -> t m a
catMaybes = (Maybe a -> a) -> t m (Maybe a) -> t m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe a -> a
forall a. HasCallStack => Maybe a -> a
fromJust (t m (Maybe a) -> t m a)
-> (t m (Maybe a) -> t m (Maybe a)) -> t m (Maybe a) -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe a -> Bool) -> t m (Maybe a) -> t m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter Maybe a -> Bool
forall a. Maybe a -> Bool
isJust
lefts :: (IsStream t, Monad m, Functor (t m)) => t m (Either a b) -> t m a
lefts :: t m (Either a b) -> t m a
lefts = (Either a b -> a) -> t m (Either a b) -> t m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a -> Either a b -> a
forall a b. a -> Either a b -> a
fromLeft a
forall a. HasCallStack => a
undefined) (t m (Either a b) -> t m a)
-> (t m (Either a b) -> t m (Either a b))
-> t m (Either a b)
-> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Either a b -> Bool) -> t m (Either a b) -> t m (Either a b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter Either a b -> Bool
forall a b. Either a b -> Bool
isLeft
rights :: (IsStream t, Monad m, Functor (t m)) => t m (Either a b) -> t m b
rights :: t m (Either a b) -> t m b
rights = (Either a b -> b) -> t m (Either a b) -> t m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (b -> Either a b -> b
forall b a. b -> Either a b -> b
fromRight b
forall a. HasCallStack => a
undefined) (t m (Either a b) -> t m b)
-> (t m (Either a b) -> t m (Either a b))
-> t m (Either a b)
-> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Either a b -> Bool) -> t m (Either a b) -> t m (Either a b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter Either a b -> Bool
forall a b. Either a b -> Bool
isRight
{-# INLINE (|$) #-}
(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> (t m a -> t m b)
|$ :: (t m a -> t m b) -> t m a -> t m b
(|$) t m a -> t m b
f = t m a -> t m b
f (t m a -> t m b) -> (t m a -> t m a) -> t m a -> t m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a
Par.mkParallel
infixr 0 |$
{-# INLINE applyAsync #-}
applyAsync :: (IsStream t, MonadAsync m)
=> (t m a -> t m b) -> (t m a -> t m b)
applyAsync :: (t m a -> t m b) -> t m a -> t m b
applyAsync = (t m a -> t m b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
(|$)
{-# INLINE (|&) #-}
(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b
t m a
x |& :: t m a -> (t m a -> t m b) -> t m b
|& t m a -> t m b
f = t m a -> t m b
f (t m a -> t m b) -> t m a -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
|$ t m a
x
infixl 1 |&