module Streamly.Internal.Data.Stream.IsStream.Transform
(
transform
, foldrS
, foldrSShared
, foldrT
, map
, sequence
, mapM
, smapM
, trace
, trace_
, tap
, tapOffsetEvery
, tapAsync
, tapAsyncK
, distributeAsync_
, tapRate
, pollCounts
, scan
, postscan
, scanl'
, scanlM'
, scanlMAfter'
, postscanl'
, postscanlM'
, prescanl'
, prescanlM'
, scanl1'
, scanl1M'
, with
, deleteBy
, filter
, filterM
, uniq
, uniqBy
, nubBy
, nubWindowBy
, prune
, repeated
, take
, takeInterval
, takeLast
, takeLastInterval
, takeWhile
, takeWhileM
, takeWhileLast
, takeWhileAround
, drop
, dropInterval
, dropLast
, dropLastInterval
, dropWhile
, dropWhileM
, dropWhileLast
, dropWhileAround
, intersperse
, intersperseM
, intersperseBySpan
, intersperseSuffix
, intersperseSuffixBySpan
, interjectSuffix
, intersperseM_
, delay
, intersperseSuffix_
, delayPost
, interspersePrefix_
, delayPre
, insertBy
, reverse
, reverse'
, reassembleBy
, indexed
, indexedR
, timestamped
, timestampWith
, timeIndexed
, timeIndexWith
, findIndices
, elemIndices
, rollingMapM
, rollingMap
, catMaybes
, mapMaybe
, mapMaybeM
, lefts
, rights
, both
, mkAsync
, mkParallel
, applyAsync
, (|$)
, (|&)
, maxThreads
, maxBuffer
, sampleOld
, sampleNew
, sampleRate
, Rate (..)
, rate
, avgRate
, minRate
, maxRate
, constRate
, inspectMode
, scanx
)
where
#include "inline.hs"
import Control.Concurrent (threadDelay)
import Control.Monad (void)
import Control.Monad.Catch (MonadCatch)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Class (MonadTrans(..))
import Data.Either (isLeft, isRight)
import Data.Kind (Type)
import Data.Maybe (isJust, fromJust)
import Streamly.Internal.BaseCompat (fromLeft, fromRight)
import Streamly.Internal.Data.Fold.Type (Fold (..))
import Streamly.Internal.Data.Pipe.Type (Pipe (..))
import Streamly.Internal.Data.Stream.IsStream.Combinators
( inspectMode, maxBuffer, maxThreads, rate, avgRate, minRate
, maxRate, constRate)
import Streamly.Internal.Data.Stream.IsStream.Common
( absTimesWith
, drop
, findIndices
, map
, postscanlM'
, relTimesWith
, reverse
, reverse'
, scanlMAfter'
, smapM
, take
, takeWhile
, interjectSuffix
, intersperseM
, mkAsync
, mkParallel
, zipWith
)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.IsStream.Type
(IsStream(..), fromStreamS, toStreamS, fromStreamD, toStreamD, toConsK)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.SVar (Rate(..))
import Streamly.Internal.Data.Time.Units (TimeUnit64, AbsTime, RelTime64)
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.StreamD as D
#if __GLASGOW_HASKELL__ == 802
import qualified Streamly.Internal.Data.Stream.StreamK as K
#endif
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S
#else
import qualified Streamly.Internal.Data.Stream.StreamD as S
#endif
import qualified Prelude
import Prelude hiding
( filter, drop, dropWhile, take, takeWhile, foldr, map, mapM, sequence
, reverse, foldr1 , scanl, scanl1, zipWith)
{-# INLINE transform #-}
transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b
transform :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Pipe m a b -> t m a -> t m b
transform Pipe m a b
pipe t m a
xs = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Pipe m a b -> Stream m a -> Stream m b
D.transform Pipe m a b
pipe (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs)
{-# INLINE foldrS #-}
foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrS :: forall (t :: (* -> *) -> * -> *) a (m :: * -> *) b.
IsStream t =>
(a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrS a -> t m b -> t m b
f t m b
z t m a
xs =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream
forall a b. (a -> b) -> a -> b
$ forall a (m :: * -> *) b.
(a -> Stream m b -> Stream m b)
-> Stream m b -> Stream m a -> Stream m b
K.foldrS
(\a
y Stream m b
ys -> forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream forall a b. (a -> b) -> a -> b
$ a -> t m b -> t m b
f a
y (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream Stream m b
ys))
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m b
z)
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
xs)
{-# INLINE foldrSShared #-}
foldrSShared :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
a -> t m b -> t m b
f t m b
z t m a
xs =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream
forall a b. (a -> b) -> a -> b
$ forall a (m :: * -> *) b.
(a -> Stream m b -> Stream m b)
-> Stream m b -> Stream m a -> Stream m b
K.foldrSShared
(\a
y Stream m b
ys -> forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream forall a b. (a -> b) -> a -> b
$ a -> t m b -> t m b
f a
y (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream Stream m b
ys))
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m b
z)
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
xs)
{-# INLINE foldrT #-}
foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s)
=> (a -> s m b -> s m b) -> s m b -> t m a -> s m b
foldrT :: forall (t :: (* -> *) -> * -> *) (m :: * -> *)
(s :: (* -> *) -> * -> *) a b.
(IsStream t, Monad m, Monad (s m), MonadTrans s) =>
(a -> s m b -> s m b) -> s m b -> t m a -> s m b
foldrT a -> s m b -> s m b
f s m b
z t m a
s = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
(Monad m, Monad (t m), MonadTrans t) =>
(a -> t m b -> t m b) -> t m b -> Stream m a -> t m b
S.foldrT a -> s m b -> s m b
f s m b
z (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
s)
{-# INLINE_EARLY mapM #-}
mapM :: forall t m a b. (IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM a -> m b
f = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) b a.
(m b -> Stream m b -> Stream m b)
-> (a -> m b) -> Stream m a -> Stream m b
K.mapMWith (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(m a -> t m a -> t m a) -> m a -> Stream m a -> Stream m a
toConsK (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
consM @t)) a -> m b
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream
{-# RULES "mapM serial" mapM = mapMSerial #-}
{-# INLINE mapMSerial #-}
mapMSerial :: Monad m => (a -> m b) -> SerialT m a -> SerialT m b
mapMSerial :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> SerialT m a -> SerialT m b
mapMSerial = forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> SerialT m a -> SerialT m b
Serial.mapM
{-# INLINE sequence #-}
sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a
sequence :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m (m a) -> t m a
sequence = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM forall a. a -> a
id
{-# INLINE tap #-}
tap :: (IsStream t, Monad m) => FL.Fold m a b -> t m a -> t m a
tap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m a
tap Fold m a b
f t m a
xs = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m a
D.tap Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs)
{-# INLINE tapOffsetEvery #-}
tapOffsetEvery :: (IsStream t, Monad m)
=> Int -> Int -> FL.Fold m a b -> t m a -> t m a
tapOffsetEvery :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Int -> Int -> Fold m a b -> t m a -> t m a
tapOffsetEvery Int
offset Int
n Fold m a b
f t m a
xs =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Int -> Int -> Fold m a b -> Stream m a -> Stream m a
D.tapOffsetEvery Int
offset Int
n Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs)
{-# INLINE tapAsync #-}
tapAsync :: (IsStream t, MonadAsync m) => FL.Fold m a b -> t m a -> t m a
tapAsync :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Fold m a b -> t m a -> t m a
tapAsync Fold m a b
f t m a
xs = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadAsync m =>
Fold m a b -> Stream m a -> Stream m a
Par.tapAsyncF Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs)
{-# INLINE tapAsyncK #-}
tapAsyncK :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a
tapAsyncK :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> m b) -> t m a -> t m a
tapAsyncK t m a -> m b
f t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadAsync m =>
(Stream m a -> m b) -> Stream m a -> Stream m a
Par.tapAsyncK (t m a -> m b
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m)
{-# INLINE distributeAsync_ #-}
distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m)
=> f (t m a -> m b) -> t m a -> t m a
distributeAsync_ :: forall (f :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(Foldable f, IsStream t, MonadAsync m) =>
f (t m a -> m b) -> t m a -> t m a
distributeAsync_ = forall a b c. (a -> b -> c) -> b -> a -> c
flip (forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
Prelude.foldr forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> m b) -> t m a -> t m a
tapAsyncK)
{-# INLINE pollCounts #-}
pollCounts ::
(IsStream t, MonadAsync m)
=> (a -> Bool)
-> (t m Int -> t m Int)
-> Fold m Int b
-> t m a
-> t m a
pollCounts :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> Bool)
-> (t m Int -> t m Int) -> Fold m Int b -> t m a -> t m a
pollCounts a -> Bool
predicate t m Int -> t m Int
transf Fold m Int b
f t m a
xs =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool)
-> (Stream m Int -> Stream m Int)
-> Fold m Int b
-> Stream m a
-> Stream m a
D.pollCounts a -> Bool
predicate (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m Int -> t m Int
transf forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD) Fold m Int b
f
forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs
{-# INLINE tapRate #-}
tapRate ::
(IsStream t, MonadAsync m, MonadCatch m)
=> Double
-> (Int -> m b)
-> t m a
-> t m a
tapRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadAsync m, MonadCatch m) =>
Double -> (Int -> m b) -> t m a -> t m a
tapRate Double
n Int -> m b
f t m a
xs = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
(MonadAsync m, MonadCatch m) =>
Double -> (Int -> m b) -> Stream m a -> Stream m a
D.tapRate Double
n Int -> m b
f forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs
{-# INLINE trace #-}
trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a
trace :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m a
trace a -> m b
f = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM (\a
x -> forall (f :: * -> *) a. Functor f => f a -> f ()
void (a -> m b
f a
x) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return a
x)
{-# INLINE trace_ #-}
trace_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
trace_ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
trace_ m b
eff = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
D.mapM (\a
x -> m b
eff forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return a
x) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE scan #-}
scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
scan :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
scan Fold m a b
fld t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.scanOnce Fold m a b
fld forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE postscan #-}
postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
postscan :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
postscan Fold m a b
fld = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.postscanOnce Fold m a b
fld forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# DEPRECATED scanx "Please use scanl followed by map instead." #-}
{-# INLINE scanx #-}
scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
scanx :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) x a b.
(IsStream t, Monad m) =>
(x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
scanx x -> a -> x
step x
begin x -> b
done = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
S.scanlx' x -> a -> x
step x
begin x -> b
done forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS
{-# INLINE scanlM' #-}
scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
scanlM' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> t m a -> t m b
scanlM' b -> a -> m b
step m b
begin t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
D.scanlM' b -> a -> m b
step m b
begin forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE scanl' #-}
scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
scanl' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> t m b
scanl' b -> a -> b
step b
z t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
S.scanl' b -> a -> b
step b
z forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
{-# INLINE postscanl' #-}
postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
postscanl' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> t m b
postscanl' b -> a -> b
step b
z t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
D.postscanl' b -> a -> b
step b
z forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE prescanl' #-}
prescanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
prescanl' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> t m b
prescanl' b -> a -> b
step b
z t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
D.prescanl' b -> a -> b
step b
z forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE prescanlM' #-}
prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
prescanlM' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> t m a -> t m b
prescanlM' b -> a -> m b
step m b
z t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
D.prescanlM' b -> a -> m b
step m b
z forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE scanl1M' #-}
scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a
scanl1M' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> m a) -> t m a -> t m a
scanl1M' a -> a -> m a
step t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> a -> m a) -> Stream m a -> Stream m a
D.scanl1M' a -> a -> m a
step forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE scanl1' #-}
scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a
scanl1' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> a) -> t m a -> t m a
scanl1' a -> a -> a
step t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> a -> a) -> Stream m a -> Stream m a
D.scanl1' a -> a -> a
step forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE with #-}
with :: forall (t :: (Type -> Type) -> Type -> Type) m a b s. Functor (t m) =>
(t m a -> t m (s, a))
-> (((s, a) -> b) -> t m (s, a) -> t m (s, a))
-> (((s, a) -> b) -> t m a -> t m a)
with :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b s.
Functor (t m) =>
(t m a -> t m (s, a))
-> (((s, a) -> b) -> t m (s, a) -> t m (s, a))
-> ((s, a) -> b)
-> t m a
-> t m a
with t m a -> t m (s, a)
f ((s, a) -> b) -> t m (s, a) -> t m (s, a)
comb (s, a) -> b
g = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> b
snd forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((s, a) -> b) -> t m (s, a) -> t m (s, a)
comb (s, a) -> b
g forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> t m (s, a)
f
{-# INLINE filter #-}
#if __GLASGOW_HASKELL__ != 802
filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
filter :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter a -> Bool
p t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
S.filter a -> Bool
p forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
#else
filter :: IsStream t => (a -> Bool) -> t m a -> t m a
filter p m = fromStream $ K.filter p $ toStream m
#endif
{-# INLINE filterM #-}
filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
filterM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> m Bool) -> t m a -> t m a
filterM a -> m Bool
p t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
D.filterM a -> m Bool
p forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE uniqBy #-}
uniqBy :: (IsStream t, Monad m, Functor (t m)) =>
(a -> a -> Bool) -> t m a -> t m a
uniqBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
(a -> a -> Bool) -> t m a -> t m a
uniqBy a -> a -> Bool
eq =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
catMaybes forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> b) -> t m a -> t m b
rollingMap (\a
x a
y -> if a
x a -> a -> Bool
`eq` a
y then forall a. Maybe a
Nothing else forall a. a -> Maybe a
Just a
y)
{-# INLINE uniq #-}
uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a
uniq :: forall a (t :: (* -> *) -> * -> *) (m :: * -> *).
(Eq a, IsStream t, Monad m) =>
t m a -> t m a
uniq = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a (m :: * -> *). (Eq a, Monad m) => Stream m a -> Stream m a
D.uniq forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE prune #-}
prune ::
(a -> Bool) -> t m a -> t m a
prune :: forall a (t :: * -> * -> *) m. (a -> Bool) -> t m a -> t m a
prune = forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"
repeated ::
t m a -> t m a
repeated :: forall (t :: * -> * -> *) m a. t m a -> t m a
repeated = forall a. HasCallStack => a
undefined
{-# INLINE nubBy #-}
nubBy ::
(a -> a -> Bool) -> t m a -> t m a
nubBy :: forall a (t :: * -> * -> *) m. (a -> a -> Bool) -> t m a -> t m a
nubBy = forall a. HasCallStack => a
undefined
{-# INLINE nubWindowBy #-}
nubWindowBy ::
Int -> (a -> a -> Bool) -> t m a -> t m a
nubWindowBy :: forall a (t :: * -> * -> *) m.
Int -> (a -> a -> Bool) -> t m a -> t m a
nubWindowBy = forall a. HasCallStack => a
undefined
{-# INLINE deleteBy #-}
deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a
deleteBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> a -> t m a -> t m a
deleteBy a -> a -> Bool
cmp a
x t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> a -> Stream m a -> Stream m a
S.deleteBy a -> a -> Bool
cmp a
x (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m)
{-# INLINE sampleOld #-}
sampleOld ::
Int -> t m a -> t m a
sampleOld :: forall (t :: * -> * -> *) m a. Int -> t m a -> t m a
sampleOld = forall a. HasCallStack => a
undefined
{-# INLINE sampleNew #-}
sampleNew ::
Int -> t m a -> t m a
sampleNew :: forall (t :: * -> * -> *) m a. Int -> t m a -> t m a
sampleNew = forall a. HasCallStack => a
undefined
{-# INLINE sampleRate #-}
sampleRate ::
Double -> t m a -> t m a
sampleRate :: forall (t :: * -> * -> *) m a. Double -> t m a -> t m a
sampleRate = forall a. HasCallStack => a
undefined
{-# INLINE takeWhileM #-}
takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
takeWhileM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> m Bool) -> t m a -> t m a
takeWhileM a -> m Bool
p t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
D.takeWhileM a -> m Bool
p forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE takeLast #-}
takeLast ::
Int -> t m a -> t m a
takeLast :: forall (t :: * -> * -> *) m a. Int -> t m a -> t m a
takeLast = forall a. HasCallStack => a
undefined
{-# INLINE takeLastInterval #-}
takeLastInterval ::
Double -> t m a -> t m a
takeLastInterval :: forall (t :: * -> * -> *) m a. Double -> t m a -> t m a
takeLastInterval = forall a. HasCallStack => a
undefined
{-# INLINE takeWhileLast #-}
takeWhileLast ::
(a -> Bool) -> t m a -> t m a
takeWhileLast :: forall a (t :: * -> * -> *) m. (a -> Bool) -> t m a -> t m a
takeWhileLast = forall a. HasCallStack => a
undefined
{-# INLINE takeWhileAround #-}
takeWhileAround ::
(a -> Bool) -> t m a -> t m a
takeWhileAround :: forall a (t :: * -> * -> *) m. (a -> Bool) -> t m a -> t m a
takeWhileAround = forall a. HasCallStack => a
undefined
{-# INLINE takeInterval #-}
takeInterval ::(MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a
takeInterval :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) d a.
(MonadIO m, IsStream t, TimeUnit64 d) =>
d -> t m a -> t m a
takeInterval d
d = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) t a.
(MonadIO m, TimeUnit64 t) =>
t -> Stream m a -> Stream m a
D.takeByTime d
d forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE dropWhile #-}
dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
dropWhile :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
dropWhile a -> Bool
p t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
S.dropWhile a -> Bool
p forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
{-# INLINE dropWhileM #-}
dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
dropWhileM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> m Bool) -> t m a -> t m a
dropWhileM a -> m Bool
p t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
D.dropWhileM a -> m Bool
p forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE dropInterval #-}
dropInterval ::(MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a
dropInterval :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) d a.
(MonadIO m, IsStream t, TimeUnit64 d) =>
d -> t m a -> t m a
dropInterval d
d = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) t a.
(MonadIO m, TimeUnit64 t) =>
t -> Stream m a -> Stream m a
D.dropByTime d
d forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE dropLast #-}
dropLast ::
Int -> t m a -> t m a
dropLast :: forall (t :: * -> * -> *) m a. Int -> t m a -> t m a
dropLast = forall a. HasCallStack => a
undefined
{-# INLINE dropLastInterval #-}
dropLastInterval ::
Int -> t m a -> t m a
dropLastInterval :: forall (t :: * -> * -> *) m a. Int -> t m a -> t m a
dropLastInterval = forall a. HasCallStack => a
undefined
{-# INLINE dropWhileLast #-}
dropWhileLast ::
(a -> Bool) -> t m a -> t m a
dropWhileLast :: forall a (t :: * -> * -> *) m. (a -> Bool) -> t m a -> t m a
dropWhileLast = forall a. HasCallStack => a
undefined
{-# INLINE dropWhileAround #-}
dropWhileAround ::
(a -> Bool) -> t m a -> t m a
dropWhileAround :: forall a (t :: * -> * -> *) m. (a -> Bool) -> t m a -> t m a
dropWhileAround = forall a. HasCallStack => a
undefined
{-# INLINE insertBy #-}
insertBy ::
(IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a
insertBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Ordering) -> a -> t m a -> t m a
insertBy a -> a -> Ordering
cmp a
x t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> a -> Stream m a -> Stream m a
S.insertBy a -> a -> Ordering
cmp a
x (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m)
{-# INLINE intersperse #-}
intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a
intersperse :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
a -> t m a -> t m a
intersperse a
a = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => a -> Stream m a -> Stream m a
S.intersperse a
a forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS
{-# INLINE intersperseM_ #-}
intersperseM_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
intersperseM_ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
intersperseM_ m b
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
D.intersperseM_ m b
m forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE intersperseBySpan #-}
intersperseBySpan ::
Int -> m a -> t m a -> t m a
intersperseBySpan :: forall (m :: * -> *) a (t :: (* -> *) -> * -> *).
Int -> m a -> t m a -> t m a
intersperseBySpan Int
_n m a
_f t m a
_xs = forall a. HasCallStack => a
undefined
{-# INLINE intersperseSuffix #-}
intersperseSuffix :: (IsStream t, Monad m) => m a -> t m a -> t m a
intersperseSuffix :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m a -> t m a -> t m a
intersperseSuffix m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
D.intersperseSuffix m a
m forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE intersperseSuffix_ #-}
intersperseSuffix_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
intersperseSuffix_ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
intersperseSuffix_ m b
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
D.intersperseSuffix_ m b
m forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE intersperseSuffixBySpan #-}
intersperseSuffixBySpan :: (IsStream t, Monad m)
=> Int -> m a -> t m a -> t m a
intersperseSuffixBySpan :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Int -> m a -> t m a -> t m a
intersperseSuffixBySpan Int
n m a
eff =
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
Monad m =>
Int -> m a -> Stream m a -> Stream m a
D.intersperseSuffixBySpan Int
n m a
eff forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE interspersePrefix_ #-}
interspersePrefix_ :: (IsStream t, MonadAsync m) => m b -> t m a -> t m a
interspersePrefix_ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadAsync m) =>
m b -> t m a -> t m a
interspersePrefix_ m b
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM (\a
x -> forall (f :: * -> *) a. Functor f => f a -> f ()
void m b
m forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return a
x)
{-# INLINE delay #-}
delay :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
delay :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m) =>
Double -> t m a -> t m a
delay Double
n = forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
intersperseM_ forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
n forall a. Num a => a -> a -> a
* Double
1000000
{-# INLINE delayPost #-}
delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
delayPost :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m) =>
Double -> t m a -> t m a
delayPost Double
n = forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
intersperseSuffix_ forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
n forall a. Num a => a -> a -> a
* Double
1000000
{-# INLINE delayPre #-}
delayPre :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
delayPre :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m) =>
Double -> t m a -> t m a
delayPre Double
n = forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
trace_ forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
n forall a. Num a => a -> a -> a
* Double
1000000
{-# INLINE reassembleBy #-}
reassembleBy
::
Fold m a b
-> (a -> a -> Int)
-> t m a
-> t m b
reassembleBy :: forall (m :: * -> *) a b (t :: (* -> *) -> * -> *).
Fold m a b -> (a -> a -> Int) -> t m a -> t m b
reassembleBy = forall a. HasCallStack => a
undefined
{-# INLINE indexed #-}
indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a)
indexed :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m (Int, a)
indexed = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => Stream m a -> Stream m (Int, a)
D.indexed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE indexedR #-}
indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a)
indexedR :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Int -> t m a -> t m (Int, a)
indexedR Int
n = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
Monad m =>
Int -> Stream m a -> Stream m (Int, a)
D.indexedR Int
n forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD
{-# INLINE timestampWith #-}
timestampWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m a -> t m (AbsTime, a)
timestampWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m (AbsTime, a)
timestampWith Double
g t m a
stream = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b c.
(IsStream t, Monad m) =>
(a -> b -> c) -> t m a -> t m b -> t m c
zipWith (forall a b c. (a -> b -> c) -> b -> a -> c
flip (,)) t m a
stream (forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m AbsTime
absTimesWith Double
g)
{-# INLINE timestamped #-}
timestamped :: (IsStream t, MonadAsync m, Functor (t m))
=> t m a -> t m (AbsTime, a)
timestamped :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (AbsTime, a)
timestamped = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m (AbsTime, a)
timestampWith Double
0.01
{-# INLINE timeIndexWith #-}
timeIndexWith :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m a -> t m (RelTime64, a)
timeIndexWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m (RelTime64, a)
timeIndexWith Double
g t m a
stream = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b c.
(IsStream t, Monad m) =>
(a -> b -> c) -> t m a -> t m b -> t m c
zipWith (forall a b c. (a -> b -> c) -> b -> a -> c
flip (,)) t m a
stream (forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m RelTime64
relTimesWith Double
g)
{-# INLINE timeIndexed #-}
timeIndexed :: (IsStream t, MonadAsync m, Functor (t m))
=> t m a -> t m (RelTime64, a)
timeIndexed :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (RelTime64, a)
timeIndexed = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m (RelTime64, a)
timeIndexWith Double
0.01
{-# INLINE elemIndices #-}
elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int
elemIndices :: forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
(IsStream t, Eq a, Monad m) =>
a -> t m a -> t m Int
elemIndices a
a = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m Int
findIndices (forall a. Eq a => a -> a -> Bool
== a
a)
{-# INLINE rollingMap #-}
rollingMap :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b
rollingMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> b) -> t m a -> t m b
rollingMap a -> a -> b
f t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> a -> b) -> Stream m a -> Stream m b
D.rollingMap a -> a -> b
f forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE rollingMapM #-}
rollingMapM :: (IsStream t, Monad m) => (a -> a -> m b) -> t m a -> t m b
rollingMapM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> m b) -> t m a -> t m b
rollingMapM a -> a -> m b
f t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> a -> m b) -> Stream m a -> Stream m b
D.rollingMapM a -> a -> m b
f forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m
{-# INLINE mapMaybe #-}
mapMaybe :: (IsStream t, Monad m) => (a -> Maybe b) -> t m a -> t m b
mapMaybe :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
mapMaybe a -> Maybe b
f t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Stream m a -> Stream m b
S.mapMaybe a -> Maybe b
f forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamS t m a
m
{-# INLINE_EARLY mapMaybeM #-}
mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m))
=> (a -> m (Maybe b)) -> t m a -> t m b
mapMaybeM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m, Functor (t m)) =>
(a -> m (Maybe b)) -> t m a -> t m b
mapMaybeM a -> m (Maybe b)
f = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. HasCallStack => Maybe a -> a
fromJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter forall a. Maybe a -> Bool
isJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM a -> m (Maybe b)
f
{-# RULES "mapMaybeM serial" mapMaybeM = mapMaybeMSerial #-}
{-# INLINE mapMaybeMSerial #-}
mapMaybeMSerial :: Monad m => (a -> m (Maybe b)) -> SerialT m a -> SerialT m b
mapMaybeMSerial :: forall (m :: * -> *) a b.
Monad m =>
(a -> m (Maybe b)) -> SerialT m a -> SerialT m b
mapMaybeMSerial a -> m (Maybe b)
f SerialT m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> m (Maybe b)) -> Stream m a -> Stream m b
D.mapMaybeM a -> m (Maybe b)
f forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD SerialT m a
m
{-# INLINE catMaybes #-}
catMaybes :: (IsStream t, Monad m, Functor (t m)) => t m (Maybe a) -> t m a
catMaybes :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
catMaybes = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. HasCallStack => Maybe a -> a
fromJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter forall a. Maybe a -> Bool
isJust
{-# INLINE lefts #-}
lefts :: (IsStream t, Monad m, Functor (t m)) => t m (Either a b) -> t m a
lefts :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m, Functor (t m)) =>
t m (Either a b) -> t m a
lefts = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a b. a -> Either a b -> a
fromLeft forall a. HasCallStack => a
undefined) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter forall a b. Either a b -> Bool
isLeft
{-# INLINE rights #-}
rights :: (IsStream t, Monad m, Functor (t m)) => t m (Either a b) -> t m b
rights :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m, Functor (t m)) =>
t m (Either a b) -> t m b
rights = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall b a. b -> Either a b -> b
fromRight forall a. HasCallStack => a
undefined) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter forall a b. Either a b -> Bool
isRight
{-# INLINE both #-}
both :: Functor (t m) => t m (Either a a) -> t m a
both :: forall (t :: * -> * -> *) m a.
Functor (t m) =>
t m (Either a a) -> t m a
both = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. a -> a
id forall a. a -> a
id)
{-# INLINE (|$) #-}
(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> (t m a -> t m b)
|$ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
(|$) t m a -> t m b
f = t m a -> t m b
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a
mkParallel
infixr 0 |$
{-# INLINE applyAsync #-}
applyAsync :: (IsStream t, MonadAsync m)
=> (t m a -> t m b) -> (t m a -> t m b)
applyAsync :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
applyAsync = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
(|$)
{-# INLINE (|&) #-}
(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b
t m a
x |& :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
t m a -> (t m a -> t m b) -> t m b
|& t m a -> t m b
f = t m a -> t m b
f forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
|$ t m a
x
infixl 1 |&