Copyright | (c) 2017 Harendra Kumar |
---|---|
License | BSD3 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
This is an Internal module consisting of released, unreleased and unimplemented APIs. For stable and released APIs please see Streamly.Prelude module. This module provides documentation only for the unreleased and unimplemented APIs. For documentation on released APIs please see Streamly.Prelude module.
Synopsis
- nil :: IsStream t => t m a
- nilM :: (IsStream t, Monad m) => m b -> t m a
- cons :: IsStream t => a -> t m a -> t m a
- (.:) :: IsStream t => a -> t m a -> t m a
- consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- (|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- yield :: IsStream t => a -> t m a
- yieldM :: (Monad m, IsStream t) => m a -> t m a
- repeat :: (IsStream t, Monad m) => a -> t m a
- repeatM :: (IsStream t, MonadAsync m) => m a -> t m a
- replicate :: (IsStream t, Monad m) => Int -> a -> t m a
- replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a
- class Enum a => Enumerable a where
- enumerateFrom :: (IsStream t, Monad m) => a -> t m a
- enumerateFromTo :: (IsStream t, Monad m) => a -> a -> t m a
- enumerateFromThen :: (IsStream t, Monad m) => a -> a -> t m a
- enumerateFromThenTo :: (IsStream t, Monad m) => a -> a -> a -> t m a
- enumerate :: (IsStream t, Monad m, Bounded a, Enumerable a) => t m a
- enumerateTo :: (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a
- unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a
- unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a
- unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b
- iterate :: (IsStream t, Monad m) => (a -> a) -> a -> t m a
- iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a
- fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a
- fromIndicesM :: (IsStream t, MonadAsync m) => (Int -> m a) -> t m a
- fromList :: (Monad m, IsStream t) => [a] -> t m a
- fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a
- fromFoldable :: (IsStream t, Foldable f) => f a -> t m a
- fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a
- fromPrimVar :: (IsStream t, MonadIO m, Prim a) => Var IO a -> t m a
- fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a
- currentTime :: (IsStream t, MonadAsync m) => Double -> t m AbsTime
- uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a))
- tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a))
- init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a))
- foldrM :: Monad m => (a -> m b -> m b) -> m b -> SerialT m a -> m b
- foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
- foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> t m a -> s m b
- foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b
- foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b
- foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a)
- foldlM' :: Monad m => (b -> a -> m b) -> b -> SerialT m a -> m b
- fold :: Monad m => Fold m a b -> SerialT m a -> m b
- parse :: MonadThrow m => Parser m a b -> SerialT m a -> m b
- foldAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
- (|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
- (|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b
- drain :: Monad m => SerialT m a -> m ()
- last :: Monad m => SerialT m a -> m (Maybe a)
- length :: Monad m => SerialT m a -> m Int
- sum :: (Monad m, Num a) => SerialT m a -> m a
- product :: (Monad m, Num a) => SerialT m a -> m a
- mconcat :: (Monad m, Monoid a) => SerialT m a -> m a
- maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a)
- maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a)
- minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a)
- minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a)
- the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a)
- toList :: Monad m => SerialT m a -> m [a]
- toListRev :: Monad m => SerialT m a -> m [a]
- toPure :: Monad m => SerialT m a -> m (SerialT Identity a)
- toPureRev :: Monad m => SerialT m a -> m (SerialT Identity a)
- toStream :: Monad m => Fold m a (SerialT Identity a)
- toStreamRev :: Monad m => Fold m a (SerialT Identity a)
- drainN :: Monad m => Int -> SerialT m a -> m ()
- drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m ()
- (!!) :: Monad m => SerialT m a -> Int -> m (Maybe a)
- head :: Monad m => SerialT m a -> m (Maybe a)
- headElse :: Monad m => a -> SerialT m a -> m a
- findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a)
- find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a)
- lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b)
- findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int)
- elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int)
- null :: Monad m => SerialT m a -> m Bool
- elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool
- notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool
- all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool
- any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool
- and :: Monad m => SerialT m Bool -> m Bool
- or :: Monad m => SerialT m Bool -> m Bool
- eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool
- cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering
- isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool
- isSuffixOf :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m Bool
- isInfixOf :: (MonadIO m, Eq a, Enum a, Storable a) => SerialT m a -> SerialT m a -> m Bool
- isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool
- stripPrefix :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m (Maybe (t m a))
- stripSuffix :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m (Maybe (SerialT m a))
- dropPrefix :: t m a -> t m a -> t m a
- dropInfix :: t m a -> t m a -> t m a
- dropSuffix :: t m a -> t m a -> t m a
- transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b
- map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b
- sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a
- mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b
- mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m ()
- trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a
- tap :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m a
- tapOffsetEvery :: (IsStream t, Monad m) => Int -> Int -> Fold m a b -> t m a -> t m a
- tapAsync :: (IsStream t, MonadAsync m) => Fold m a b -> t m a -> t m a
- tapRate :: (IsStream t, MonadAsync m, MonadCatch m) => Double -> (Int -> m b) -> t m a -> t m a
- pollCounts :: (IsStream t, MonadAsync m) => (a -> Bool) -> (t m Int -> t m Int) -> Fold m Int b -> t m a -> t m a
- scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
- scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> t m b
- postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
- postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> t m b
- prescanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
- prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
- scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a
- scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a
- scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
- postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
- mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a
- applyAsync :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
- (|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
- (|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b
- filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
- filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
- mapMaybe :: (IsStream t, Monad m) => (a -> Maybe b) -> t m a -> t m b
- mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b
- deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a
- uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a
- insertBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a
- intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a
- intersperseSuffix :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- intersperseSuffixBySpan :: (IsStream t, MonadAsync m) => Int -> m a -> t m a -> t m a
- interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a
- delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
- indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a)
- indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a)
- reverse :: (IsStream t, Monad m) => t m a -> t m a
- reverse' :: (IsStream t, MonadIO m, Storable a) => t m a -> t m a
- splitParse :: (IsStream t, MonadThrow m) => Parser m a b -> t m a -> t m b
- take :: (IsStream t, Monad m) => Int -> t m a -> t m a
- takeByTime :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a
- takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
- takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
- drop :: (IsStream t, Monad m) => Int -> t m a -> t m a
- dropByTime :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a
- dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
- dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
- chunksOf :: (IsStream t, Monad m) => Int -> Fold m a b -> t m a -> t m b
- chunksOf2 :: (IsStream t, Monad m) => Int -> m c -> Fold2 m c a b -> t m a -> t m b
- arraysOf :: (IsStream t, MonadIO m, Storable a) => Int -> t m a -> t m (Array a)
- intervalsOf :: (IsStream t, MonadAsync m) => Double -> Fold m a b -> t m a -> t m b
- findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int
- elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int
- splitOn :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b
- splitOnSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b
- splitWithSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b
- wordsBy :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b
- splitOnSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b
- splitOnSuffixSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b
- splitBySeq :: (IsStream t, MonadAsync m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b
- splitWithSuffixSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b
- splitInnerBy :: (IsStream t, Monad m) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
- splitInnerBySuffix :: (IsStream t, Monad m, Eq (f a), Monoid (f a)) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
- groups :: (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b
- groupsBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b
- groupsByRolling :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b
- rollingMapM :: (IsStream t, Monad m) => (a -> a -> m b) -> t m a -> t m b
- rollingMap :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b
- classifySessionsBy :: (IsStream t, MonadAsync m, Ord k) => Double -> Double -> Bool -> (Int -> m Bool) -> Fold m a (Either b b) -> t m (k, a, AbsTime) -> t m (k, b)
- classifySessionsOf :: (IsStream t, MonadAsync m, Ord k) => Double -> (Int -> m Bool) -> Fold m a (Either b b) -> t m (k, a, AbsTime) -> t m (k, b)
- classifyKeepAliveSessions :: (IsStream t, MonadAsync m, Ord k) => Double -> (Int -> m Bool) -> Fold m a (Either b b) -> t m (k, a, AbsTime) -> t m (k, b)
- append :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- interleave :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- interleaveMin :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- interleaveSuffix :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- interleaveInfix :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- wSerialFst :: IsStream t => t m a -> t m a -> t m a
- wSerialMin :: IsStream t => t m a -> t m a -> t m a
- roundrobin :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- mergeBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a
- mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a
- mergeAsyncBy :: (IsStream t, MonadAsync m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a
- mergeAsyncByM :: (IsStream t, MonadAsync m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a
- zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c
- zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
- zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c
- zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c
- foldWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a
- foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
- forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
- concat :: (IsStream t, Monad m) => t m (t m a) -> t m a
- concatM :: (IsStream t, Monad m) => m (t m a) -> t m a
- concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b
- concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b
- concatMapWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m b) -> t m a -> t m b
- concatUnfold :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
- concatUnfoldInterleave :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
- concatUnfoldRoundrobin :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
- concatMapIterateWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m a) -> t m a -> t m a
- concatMapTreeWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m (Either a b)) -> t m (Either a b) -> t m (Either a b)
- concatMapLoopWith :: (IsStream t, MonadAsync m) => (forall x. t m x -> t m x -> t m x) -> (a -> t m (Either b c)) -> (b -> t m a) -> t m a -> t m c
- concatMapTreeYieldLeavesWith :: (IsStream t, MonadAsync m) => (forall x. t m x -> t m x -> t m x) -> (a -> t m (Either a b)) -> t m a -> t m b
- mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a
- gintercalate :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
- gintercalateSuffix :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
- intercalate :: (IsStream t, Monad m) => b -> Unfold m b c -> t m b -> t m c
- intercalateSuffix :: (IsStream t, Monad m) => b -> Unfold m b c -> t m b -> t m c
- interpose :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c
- interposeSuffix :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c
- before :: (IsStream t, Monad m) => m b -> t m a -> t m a
- after :: (IsStream t, Monad m) => m b -> t m a -> t m a
- afterIO :: (IsStream t, MonadIO m, MonadBaseControl IO m) => m b -> t m a -> t m a
- bracket :: (IsStream t, MonadCatch m) => m b -> (b -> m c) -> (b -> t m a) -> t m a
- bracketIO :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> t m a) -> t m a
- onException :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a
- finally :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a
- finallyIO :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> t m a -> t m a
- handle :: (IsStream t, MonadCatch m, Exception e) => (e -> t m a) -> t m a -> t m a
- hoist :: (Monad m, Monad n) => (forall x. m x -> n x) -> SerialT m a -> SerialT n a
- generally :: (IsStream t, Monad m) => t Identity a -> t m a
- liftInner :: (Monad m, IsStream t, MonadTrans tr, Monad (tr m)) => t m a -> t (tr m) a
- usingReaderT :: (Monad m, IsStream t) => r -> (t (ReaderT r m) a -> t (ReaderT r m) a) -> t m a -> t m a
- runReaderT :: (IsStream t, Monad m) => s -> t (ReaderT s m) a -> t m a
- evalStateT :: Monad m => s -> SerialT (StateT s m) a -> SerialT m a
- usingStateT :: Monad m => s -> (SerialT (StateT s m) a -> SerialT (StateT s m) a) -> SerialT m a -> SerialT m a
- runStateT :: Monad m => s -> SerialT (StateT s m) a -> SerialT m (s, a)
- inspectMode :: IsStream t => t m a -> t m a
- once :: (Monad m, IsStream t) => m a -> t m a
- each :: (IsStream t, Foldable f) => f a -> t m a
- scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
- foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b
- foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b
- foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a)
- runStream :: Monad m => SerialT m a -> m ()
- runN :: Monad m => Int -> SerialT m a -> m ()
- runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m ()
- fromHandle :: (IsStream t, MonadIO m) => Handle -> t m String
- toHandle :: MonadIO m => Handle -> SerialT m String -> m ()
Construction
Primitives
nilM :: (IsStream t, Monad m) => m b -> t m a Source #
An empty stream producing a side effect.
> toList (nilM (print "nil")) "nil" []
Internal
cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #
Construct a stream by adding a pure value at the head of an existing
stream. For serial streams this is the same as (return a) `consM` r
but
more efficient. For concurrent streams this is not concurrent whereas
consM
is concurrent. For example:
> toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3]
Since: 0.1.0
consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #
Constructs a stream by adding a monadic action at the head of an existing stream. For example:
> toList $ getLine `consM` getLine `consM` nil hello world ["hello","world"]
Concurrent (do not use parallely
to construct infinite streams)
Since: 0.2.0
(|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #
Operator equivalent of consM
. We can read it as "parallel colon
"
to remember that |
comes before :
.
> toList $ getLine |: getLine |: nil hello world ["hello","world"]
let delay = threadDelay 1000000 >> print 1 drain $ serially $ delay |: delay |: delay |: nil drain $ parallely $ delay |: delay |: delay |: nil
Concurrent (do not use parallely
to construct infinite streams)
Since: 0.2.0
From Values
yield :: IsStream t => a -> t m a Source #
yield a = a `cons` nil
Create a singleton stream from a pure value.
The following holds in monadic streams, but not in Zip streams:
yield = pure yield = yieldM . pure
In Zip applicative streams yield
is not the same as pure
because in that
case pure
is equivalent to repeat
instead. yield
and pure
are
equally efficient, in other cases yield
may be slightly more efficient
than the other equivalent definitions.
Since: 0.4.0
yieldM :: (Monad m, IsStream t) => m a -> t m a Source #
yieldM m = m `consM` nil
Create a singleton stream from a monadic action.
> toList $ yieldM getLine hello ["hello"]
Since: 0.4.0
repeat :: (IsStream t, Monad m) => a -> t m a Source #
Generate an infinite stream by repeating a pure value.
Since: 0.4.0
repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #
repeatM = fix . consM repeatM = cycle1 . yieldM
Generate a stream by repeatedly executing a monadic action forever.
drain $ serially $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1) drain $ asyncly $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)
Concurrent, infinite (do not use with parallely
)
Since: 0.2.0
replicate :: (IsStream t, Monad m) => Int -> a -> t m a Source #
replicate = take n . repeat
Generate a stream of length n
by repeating a value n
times.
Since: 0.6.0
replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a Source #
replicateM = take n . repeatM
Generate a stream by performing a monadic action n
times. Same as:
drain $ serially $ S.replicateM 10 $ (threadDelay 1000000 >> print 1) drain $ asyncly $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)
Concurrent
Since: 0.1.1
Enumeration
class Enum a => Enumerable a where Source #
Types that can be enumerated as a stream. The operations in this type
class are equivalent to those in the Enum
type class, except that these
generate a stream instead of a list. Use the functions in
Streamly.Internal.Data.Stream.Enumeration module to define new instances.
Since: 0.6.0
enumerateFrom :: (IsStream t, Monad m) => a -> t m a Source #
enumerateFrom from
generates a stream starting with the element
from
, enumerating up to maxBound
when the type is Bounded
or
generating an infinite stream when the type is not Bounded
.
> S.toList $ S.take 4 $ S.enumerateFrom (0 :: Int) [0,1,2,3]
For Fractional
types, enumeration is numerically stable. However, no
overflow or underflow checks are performed.
> S.toList $ S.take 4 $ S.enumerateFrom 1.1 [1.1,2.1,3.1,4.1]
Since: 0.6.0
enumerateFromTo :: (IsStream t, Monad m) => a -> a -> t m a Source #
Generate a finite stream starting with the element from
, enumerating
the type up to the value to
. If to
is smaller than from
then an
empty stream is returned.
> S.toList $ S.enumerateFromTo 0 4 [0,1,2,3,4]
For Fractional
types, the last element is equal to the specified to
value after rounding to the nearest integral value.
> S.toList $ S.enumerateFromTo 1.1 4 [1.1,2.1,3.1,4.1] > S.toList $ S.enumerateFromTo 1.1 4.6 [1.1,2.1,3.1,4.1,5.1]
Since: 0.6.0
enumerateFromThen :: (IsStream t, Monad m) => a -> a -> t m a Source #
enumerateFromThen from then
generates a stream whose first element
is from
, the second element is then
and the successive elements are
in increments of then - from
. Enumeration can occur downwards or
upwards depending on whether then
comes before or after from
. For
Bounded
types the stream ends when maxBound
is reached, for
unbounded types it keeps enumerating infinitely.
> S.toList $ S.take 4 $ S.enumerateFromThen 0 2 [0,2,4,6] > S.toList $ S.take 4 $ S.enumerateFromThen 0 (-2) [0,-2,-4,-6]
Since: 0.6.0
enumerateFromThenTo :: (IsStream t, Monad m) => a -> a -> a -> t m a Source #
enumerateFromThenTo from then to
generates a finite stream whose
first element is from
, the second element is then
and the successive
elements are in increments of then - from
up to to
. Enumeration can
occur downwards or upwards depending on whether then
comes before or
after from
.
> S.toList $ S.enumerateFromThenTo 0 2 6 [0,2,4,6] > S.toList $ S.enumerateFromThenTo 0 (-2) (-6) [0,-2,-4,-6]
Since: 0.6.0
Instances
enumerateTo :: (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a Source #
From Generators
unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a Source #
unfoldr step s =
case step s of
Nothing -> nil
Just (a, b) -> a `cons` unfoldr step b
Build a stream by unfolding a pure step function step
starting from a
seed s
. The step function returns the next element in the stream and the
next seed value. When it is done it returns Nothing
and the stream ends.
For example,
let f b = if b > 3 then Nothing else Just (b, b + 1) in toList $ unfoldr f 0
[0,1,2,3]
Since: 0.1.0
unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a Source #
Build a stream by unfolding a monadic step function starting from a
seed. The step function returns the next element in the stream and the next
seed value. When it is done it returns Nothing
and the stream ends. For
example,
let f b = if b > 3 then return Nothing else print b >> return (Just (b, b + 1)) in drain $ unfoldrM f 0
0 1 2 3
When run concurrently, the next unfold step can run concurrently with the processing of the output of the previous step. Note that more than one step cannot run concurrently as the next step depends on the output of the previous step.
(asyncly $ S.unfoldrM (\n -> liftIO (threadDelay 1000000) >> return (Just (n, n + 1))) 0) & S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()
Concurrent
Since: 0.1.0
unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b Source #
Convert an Unfold
into a stream by supplying it an input seed.
>>>
unfold (UF.replicateM 10) (putStrLn "hello")
Since: 0.7.0
iterate :: (IsStream t, Monad m) => (a -> a) -> a -> t m a Source #
iterate f x = x `cons` iterate f x
Generate an infinite stream with x
as the first element and each
successive element derived by applying the function f
on the previous
element.
> S.toList $ S.take 5 $ S.iterate (+1) 1 [1,2,3,4,5]
Since: 0.1.2
iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a Source #
iterateM f m = m >>= a -> return a `consM` iterateM f (f a)
Generate an infinite stream with the first element generated by the action
m
and each successive element derived by applying the monadic function
f
on the previous element.
When run concurrently, the next iteration can run concurrently with the processing of the previous iteration. Note that more than one iteration cannot run concurrently as the next iteration depends on the output of the previous iteration.
drain $ serially $ S.take 10 $ S.iterateM (\x -> threadDelay 1000000 >> print x >> return (x + 1)) (return 0) drain $ asyncly $ S.take 10 $ S.iterateM (\x -> threadDelay 1000000 >> print x >> return (x + 1)) (return 0)
Concurrent
Since: 0.7.0 (signature change)
Since: 0.1.2
fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a Source #
fromIndices f = let g i = f i `cons` g (i + 1) in g 0
Generate an infinite stream, whose values are the output of a function f
applied on the corresponding index. Index starts at 0.
> S.toList $ S.take 5 $ S.fromIndices id [0,1,2,3,4]
Since: 0.6.0
fromIndicesM :: (IsStream t, MonadAsync m) => (Int -> m a) -> t m a Source #
fromIndicesM f = let g i = f i `consM` g (i + 1) in g 0
Generate an infinite stream, whose values are the output of a monadic
function f
applied on the corresponding index. Index starts at 0.
Concurrent
Since: 0.6.0
From Containers
fromList :: (Monad m, IsStream t) => [a] -> t m a Source #
fromList =foldr
cons
nil
Construct a stream from a list of pure values. This is more efficient than
fromFoldable
for serial streams.
Since: 0.4.0
fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a Source #
fromListM =foldr
consM
nil
Construct a stream from a list of monadic actions. This is more efficient
than fromFoldableM
for serial streams.
Since: 0.4.0
fromFoldable :: (IsStream t, Foldable f) => f a -> t m a Source #
fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a Source #
fromFoldableM =foldr
consM
nil
Construct a stream from a Foldable
containing monadic actions.
drain $ serially $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1) drain $ asyncly $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1)
Concurrent (do not use with parallely
on infinite containers)
Since: 0.3.0
fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a Source #
Takes a callback setter function and provides it with a callback. The callback when invoked adds a value at the tail of the stream. Returns a stream of values generated by the callback.
Internal
Time related
currentTime :: (IsStream t, MonadAsync m) => Double -> t m AbsTime Source #
currentTime g
returns a stream of absolute timestamps using a clock of
granularity g
specified in seconds. A low granularity clock is more
expensive in terms of CPU usage.
Note: This API is not safe on 32-bit machines.
Internal
Elimination
Deconstruction
uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a)) Source #
Decompose a stream into its head and tail. If the stream is empty, returns
Nothing
. If the stream is non-empty, returns Just (a, ma)
, where a
is
the head of the stream and ma
its tail.
This is a brute force primitive. Avoid using it as long as possible, use it when no other combinator can do the job. This can be used to do pretty much anything in an imperative manner, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.
Since: 0.1.0
tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #
tail = fmap (fmap snd) . uncons
Extract all but the first element of the stream, if any.
Since: 0.1.1
init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #
Extract all but the last element of the stream, if any.
Since: 0.5.0
Folding
Right Folds
foldrM :: Monad m => (a -> m b -> m b) -> m b -> SerialT m a -> m b Source #
Right associative/lazy pull fold. foldrM build final stream
constructs
an output structure using the step function build
. build
is invoked with
the next input element and the remaining (lazy) tail of the output
structure. It builds a lazy output expression using the two. When the "tail
structure" in the output expression is evaluated it calls build
again thus
lazily consuming the input stream
until either the output expression built
by build
is free of the "tail" or the input is exhausted in which case
final
is used as the terminating case for the output structure. For more
details see the description in the previous section.
Example, determine if any element is odd
in a stream:
>>>
S.foldrM (\x xs -> if odd x then return True else xs) (return False) $ S.fromList (2:4:5:undefined)
> True
Since: 0.7.0 (signature changed)
Since: 0.2.0 (signature changed)
Since: 0.1.0
foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #
Right fold to a streaming monad.
foldrS S.cons S.nil === id
foldrS
can be used to perform stateless stream to stream transformations
like map and filter in general. It can be coupled with a scan to perform
stateful transformations. However, note that the custom map and filter
routines can be much more efficient than this due to better stream fusion.
>>>
S.toList $ S.foldrS S.cons S.nil $ S.fromList [1..5]
> [1,2,3,4,5]
Find if any element in the stream is True
:
>>>
S.toList $ S.foldrS (\x xs -> if odd x then return True else xs) (return False) $ (S.fromList (2:4:5:undefined) :: SerialT IO Int)
> [True]
Map (+2) on odd elements and filter out the even elements:
>>>
S.toList $ S.foldrS (\x xs -> if odd x then (x + 2) `S.cons` xs else xs) S.nil $ (S.fromList [1..5] :: SerialT IO Int)
> [3,5,7]
foldrM
can also be represented in terms of foldrS
, however, the former
is much more efficient:
foldrM f z s = runIdentityT $ foldrS (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s
Internal
foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> t m a -> s m b Source #
Right fold to a transformer monad. This is the most general right fold
function. foldrS
is a special case of foldrT
, however foldrS
implementation can be more efficient:
foldrS = foldrT foldrM f z s = runIdentityT $ foldrT (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s
foldrT
can be used to translate streamly streams to other transformer
monads e.g. to a different streaming type.
Internal
foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b Source #
Right fold, lazy for lazy monads and pure streams, and strict for strict monads.
Please avoid using this routine in strict monads like IO unless you need a
strict right fold. This is provided only for use in lazy monads (e.g.
Identity) or pure streams. Note that with this signature it is not possible
to implement a lazy foldr when the monad m
is strict. In that case it
would be strict in its accumulator and therefore would necessarily consume
all its input.
Since: 0.1.0
Left Folds
foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b Source #
Left associative/strict push fold. foldl' reduce initial stream
invokes
reduce
with the accumulator and the next input in the input stream, using
initial
as the initial value of the current value of the accumulator. When
the input is exhausted the current value of the accumulator is returned.
Make sure to use a strict data structure for accumulator to not build
unnecessary lazy expressions unless that's what you want. See the previous
section for more details.
Since: 0.2.0
foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #
Strict left fold, for non-empty streams, using first element as the
starting value. Returns Nothing
if the stream is empty.
Since: 0.5.0
foldlM' :: Monad m => (b -> a -> m b) -> b -> SerialT m a -> m b Source #
Like foldl'
but with a monadic step function.
Since: 0.2.0
Composable Left Folds
fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #
Fold a stream using the supplied left fold.
>>>
S.fold FL.sum (S.enumerateFromTo 1 100)
5050
Since: 0.7.0
parse :: MonadThrow m => Parser m a b -> SerialT m a -> m b Source #
Parse a stream using the supplied Parse
.
Internal
Concurrent Folds
foldAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b Source #
Same as |$.
.
Internal
(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #
Parallel fold application operator; applies a fold function t m a -> m b
to a stream t m a
concurrently; The the input stream is evaluated
asynchronously in an independent thread yielding elements to a buffer and
the folding action runs in another thread consuming the input from the
buffer.
If you read the signature as (t m a -> m b) -> (t m a -> m b)
you can look
at it as a transformation that converts a fold function to a buffered
concurrent fold function.
The .
at the end of the operator is a mnemonic for termination of the
stream.
S.foldlM' (\_ a -> threadDelay 1000000 >> print a) () |$. S.repeatM (threadDelay 1000000 >> return 1)
Concurrent
Since: 0.3.0
(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #
Parallel reverse function application operator for applying a run or fold
functions to a stream. Just like |$.
except that the operands are reversed.
S.repeatM (threadDelay 1000000 >> return 1) |&. S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()
Concurrent
Since: 0.3.0
Full Folds
drain :: Monad m => SerialT m a -> m () Source #
drain = mapM_ (\_ -> return ())
Run a stream, discarding the results. By default it interprets the stream
as SerialT
, to run other types of streams use the type adapting
combinators for example drain .
.asyncly
Since: 0.7.0
last :: Monad m => SerialT m a -> m (Maybe a) Source #
Extract the last element of the stream, if any.
last xs = xs !! (length xs - 1)
Since: 0.1.1
sum :: (Monad m, Num a) => SerialT m a -> m a Source #
Determine the sum of all elements of a stream of numbers. Returns 0
when
the stream is empty. Note that this is not numerically stable for floating
point numbers.
Since: 0.1.0
product :: (Monad m, Num a) => SerialT m a -> m a Source #
Determine the product of all elements of a stream of numbers. Returns 1
when the stream is empty.
Since: 0.1.1
mconcat :: (Monad m, Monoid a) => SerialT m a -> m a Source #
Fold a stream of monoid elements by appending them.
Internal
maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #
Determine the maximum element in a stream using the supplied comparison function.
Since: 0.6.0
minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #
Determine the minimum element in a stream using the supplied comparison function.
Since: 0.6.0
the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a) Source #
Ensures that all the elements of the stream are identical and then returns that unique element.
Since: 0.6.0
Lazy Folds
toList :: Monad m => SerialT m a -> m [a] Source #
toList = S.foldr (:) []
Convert a stream into a list in the underlying monad. The list can be
consumed lazily in a lazy monad (e.g. Identity
). In a strict monad (e.g.
IO) the whole list is generated and buffered before it can be consumed.
Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.
Since: 0.1.0
toListRev :: Monad m => SerialT m a -> m [a] Source #
toListRev = S.foldl' (flip (:)) []
Convert a stream into a list in reverse order in the underlying monad.
Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.
Internal
toPure :: Monad m => SerialT m a -> m (SerialT Identity a) Source #
Convert a stream to a pure stream.
toPure = foldr cons nil
Internal
toPureRev :: Monad m => SerialT m a -> m (SerialT Identity a) Source #
Convert a stream to a pure stream in reverse order.
toPureRev = foldl' (flip cons) nil
Internal
Composable Left Folds
toStream :: Monad m => Fold m a (SerialT Identity a) Source #
A fold that buffers its input to a pure stream.
Warning! working on large streams accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.
Internal
toStreamRev :: Monad m => Fold m a (SerialT Identity a) Source #
Buffers the input stream to a pure stream in the reverse order of the input.
Warning! working on large streams accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.
Internal
Partial Folds
drainN :: Monad m => Int -> SerialT m a -> m () Source #
drainN n = drain . take n
Run maximum up to n
iterations of a stream.
Since: 0.7.0
drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #
drainWhile p = drain . takeWhile p
Run a stream as long as the predicate holds true.
Since: 0.7.0
(!!) :: Monad m => SerialT m a -> Int -> m (Maybe a) Source #
Lookup the element at the given index.
Since: 0.6.0
head :: Monad m => SerialT m a -> m (Maybe a) Source #
Extract the first element of the stream, if any.
head = (!! 0)
Since: 0.1.0
headElse :: Monad m => a -> SerialT m a -> m a Source #
Extract the first element of the stream, if any, otherwise use the supplied default value. It can help avoid one branch in high performance code.
Internal
findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a) Source #
Returns the first element that satisfies the given predicate.
Since: 0.6.0
lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b) Source #
In a stream of (key-value) pairs (a, b)
, return the value b
of the
first pair where the key equals the given value a
.
lookup = snd <$> find ((==) . fst)
Since: 0.5.0
findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int) Source #
Returns the first index that satisfies the given predicate.
Since: 0.5.0
elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int) Source #
Returns the first index where a given value is found in the stream.
elemIndex a = findIndex (== a)
Since: 0.5.0
elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #
Determine whether an element is present in the stream.
Since: 0.1.0
notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #
Determine whether an element is not present in the stream.
Since: 0.1.0
all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #
Determine whether all elements of a stream satisfy a predicate.
Since: 0.1.0
any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #
Determine whether any of the elements of a stream satisfy a predicate.
Since: 0.1.0
and :: Monad m => SerialT m Bool -> m Bool Source #
Determines if all elements of a boolean stream are True.
Since: 0.5.0
or :: Monad m => SerialT m Bool -> m Bool Source #
Determines whether at least one element of a boolean stream is True.
Since: 0.5.0
Multi-Stream folds
eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool Source #
Compare two streams for equality using an equality function.
Since: 0.6.0
cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering Source #
Compare two streams lexicographically using a comparison function.
Since: 0.6.0
isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #
Returns True
if the first stream is the same as or a prefix of the
second. A stream is a prefix of itself.
> S.isPrefixOf (S.fromList "hello") (S.fromList "hello" :: SerialT IO Char) True
Since: 0.6.0
isSuffixOf :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m Bool Source #
Returns True
if the first stream is a suffix of the second. A stream is
considered a suffix of itself.
> S.isSuffixOf (S.fromList "hello") (S.fromList "hello" :: SerialT IO Char) True
Space: O(n)
, buffers entire input stream and the suffix.
Internal
Suboptimal - Help wanted.
isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #
Returns True
if all the elements of the first stream occur, in order, in
the second stream. The elements do not have to occur consecutively. A stream
is a subsequence of itself.
> S.isSubsequenceOf (S.fromList "hlo") (S.fromList "hello" :: SerialT IO Char) True
Since: 0.6.0
stripPrefix :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m (Maybe (t m a)) Source #
Strip prefix if present and tell whether it was stripped or not. Returns
Nothing
if the stream does not start with the given prefix, stripped
stream otherwise. Returns Just nil
when the prefix is the same as the
stream.
Space: O(1)
Since: 0.6.0
stripSuffix :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m (Maybe (SerialT m a)) Source #
Drops the given suffix from a stream. Returns Nothing
if the stream does
not end with the given suffix. Returns Just nil
when the suffix is the
same as the stream.
It may be more efficient to convert the stream to an Array and use stripSuffix on that especially if the elements have a Storable or Prim instance.
Space: O(n)
, buffers the entire input stream as well as the suffix
Internal
dropPrefix :: t m a -> t m a -> t m a Source #
Drop prefix from the input stream if present.
Space: O(1)
Unimplemented - Help wanted.
dropInfix :: t m a -> t m a -> t m a Source #
Drop all matching infix from the input stream if present. Infix stream may be consumed multiple times.
Space: O(n)
where n is the length of the infix.
Unimplemented - Help wanted.
dropSuffix :: t m a -> t m a -> t m a Source #
Drop suffix from the input stream if present. Suffix stream may be consumed multiple times.
Space: O(n)
where n is the length of the suffix.
Unimplemented - Help wanted.
Transformation
transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b Source #
Use a Pipe
to transform a stream.
Internal
Mapping
sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a Source #
sequence = mapM id
Replace the elements of a stream of monadic actions with the outputs of those actions.
> drain $ S.sequence $ S.fromList [putStr "a", putStr "b", putStrLn "c"] abc drain $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1) & (serially . S.sequence) drain $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1) & (asyncly . S.sequence)
Concurrent (do not use with parallely
on infinite streams)
Since: 0.1.0
mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b Source #
mapM f = sequence . map f
Apply a monadic function to each element of the stream and replace it with the output of the resulting action.
> drain $ S.mapM putStr $ S.fromList ["a", "b", "c"] abc drain $ S.replicateM 10 (return 1) & (serially . S.mapM (\x -> threadDelay 1000000 >> print x)) drain $ S.replicateM 10 (return 1) & (asyncly . S.mapM (\x -> threadDelay 1000000 >> print x))
Concurrent (do not use with parallely
on infinite streams)
Since: 0.1.0
Special Maps
mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m () Source #
mapM_ = drain . mapM
Apply a monadic action to each element of the stream and discard the output of the action. This is not really a pure transformation operation but a transformation followed by fold.
Since: 0.1.0
trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a Source #
Apply a monadic function to each element flowing through the stream and discard the results.
> S.drain $ S.trace print (S.enumerateFromTo 1 2) 1 2
Compare with tap
.
Since: 0.7.0
tap :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m a Source #
Tap the data flowing through a stream into a Fold
. For example, you may
add a tap to log the contents flowing through the stream. The fold is used
only for effects, its result is discarded.
Fold m a b | -----stream m a ---------------stream m a-----
> S.drain $ S.tap (FL.drainBy print) (S.enumerateFromTo 1 2) 1 2
Compare with trace
.
Since: 0.7.0
tapOffsetEvery :: (IsStream t, Monad m) => Int -> Int -> Fold m a b -> t m a -> t m a Source #
tapOffsetEvery offset n
taps every n
th element in the stream
starting at offset
. offset
can be between 0
and n - 1
. Offset 0
means start at the first element in the stream. If the offset is outside
this range then offset
is used as offset.mod
n
>>> S.drain $ S.tapOffsetEvery 0 2 (FL.mapM print FL.toList) $ S.enumerateFromTo 0 10 > [0,2,4,6,8,10]
Internal
tapAsync :: (IsStream t, MonadAsync m) => Fold m a b -> t m a -> t m a Source #
Redirect a copy of the stream to a supplied fold and run it concurrently
in an independent thread. The fold may buffer some elements. The buffer size
is determined by the prevailing maxBuffer
setting.
Stream m a -> m b | -----stream m a ---------------stream m a-----
> S.drain $ S.tapAsync (S.mapM_ print) (S.enumerateFromTo 1 2) 1 2
Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.
Compare with tap
.
Internal
tapRate :: (IsStream t, MonadAsync m, MonadCatch m) => Double -> (Int -> m b) -> t m a -> t m a Source #
Calls the supplied function with the number of elements consumed
every n
seconds. The given function is run in a separate thread
until the end of the stream. In case there is an exception in the
stream the thread is killed during the next major GC.
Note: The action is not guaranteed to run if the main thread exits.
> delay n = threadDelay (round $ n * 1000000) >> return n > S.drain $ S.tapRate 2 (\n -> print $ show n ++ " elements processed") (delay 1 S.|: delay 0.5 S.|: delay 0.5 S.|: S.nil) 2 elements processed 1 elements processed
Note: This may not work correctly on 32-bit machines.
Internal
pollCounts :: (IsStream t, MonadAsync m) => (a -> Bool) -> (t m Int -> t m Int) -> Fold m Int b -> t m a -> t m a Source #
pollCounts predicate transform fold stream
counts those elements in the
stream that pass the predicate
. The resulting count stream is sent to
another thread which transforms it using transform
and then folds it using
fold
. The thread is automatically cleaned up if the stream stops or
aborts due to exception.
For example, to print the count of elements processed every second:
> S.drain $ S.pollCounts (const True) (S.rollingMap (-) . S.delayPost 1) (FL.drainBy print) $ S.enumerateFrom 0
Note: This may not work correctly on 32-bit machines.
Internal
Scanning
Left scans
scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #
Strict left scan. Like map
, scanl'
too is a one to one transformation,
however it adds an extra element.
> S.toList $ S.scanl' (+) 0 $ fromList [1,2,3,4] [0,1,3,6,10]
> S.toList $ S.scanl' (flip (:)) [] $ S.fromList [1,2,3,4] [[],[1],[2,1],[3,2,1],[4,3,2,1]]
The output of scanl'
is the initial value of the accumulator followed by
all the intermediate steps and the final result of foldl'
.
By streaming the accumulated state after each fold step, we can share the state across multiple stages of stream composition. Each stage can modify or extend the state, do some processing with it and emit it for the next stage, thus modularizing the stream processing. This can be useful in stateful or event-driven programming.
Consider the following monolithic example, computing the sum and the product
of the elements in a stream in one go using a foldl'
:
> S.foldl' (\(s, p) x -> (s + x, p * x)) (0,1) $ S.fromList [1,2,3,4] (10,24)
Using scanl'
we can make it modular by computing the sum in the first
stage and passing it down to the next stage for computing the product:
> S.foldl' (\(_, p) (s, x) -> (s, p * x)) (0,1) $ S.scanl' (\(s, _) x -> (s + x, x)) (0,1) $ S.fromList [1,2,3,4] (10,24)
IMPORTANT: scanl'
evaluates the accumulator to WHNF. To avoid building
lazy expressions inside the accumulator, it is recommended that a strict
data structure is used for accumulator.
Since: 0.2.0
scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> t m b Source #
Like scanl'
but with a monadic fold function.
Since: 0.4.0
postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #
Like scanl'
but does not stream the initial value of the accumulator.
postscanl' f z xs = S.drop 1 $ S.scanl' f z xs
Since: 0.7.0
postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> t m b Source #
Like postscanl'
but with a monadic step function.
Since: 0.7.0
prescanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #
Like scanl' but does not stream the final value of the accumulator.
Internal
prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #
Like postscanl' but with a monadic step function.
Internal
scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a Source #
Like scanl'
but for a non-empty stream. The first element of the stream
is used as the initial value of the accumulator. Does nothing if the stream
is empty.
> S.toList $ S.scanl1 (+) $ fromList [1,2,3,4] [1,3,6,10]
Since: 0.6.0
scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a Source #
Like scanl1'
but with a monadic step function.
Since: 0.6.0
Scan Using Fold
scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #
Scan a stream using the given monadic fold.
Since: 0.7.0
postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #
Postscan a stream using the given monadic fold.
Since: 0.7.0
Concurrent Transformation
mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #
Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer.
Internal
applyAsync :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b Source #
Same as |$
.
Internal
(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 Source #
Parallel transform application operator; applies a stream transformation
function t m a -> t m b
to a stream t m a
concurrently; the input stream
is evaluated asynchronously in an independent thread yielding elements to a
buffer and the transformation function runs in another thread consuming the
input from the buffer. |$
is just like regular function application
operator $
except that it is concurrent.
If you read the signature as (t m a -> t m b) -> (t m a -> t m b)
you can
look at it as a transformation that converts a transform function to a
buffered concurrent transform function.
The following code prints a value every second even though each stage adds a 1 second delay.
drain $ S.mapM (\x -> threadDelay 1000000 >> print x) |$ S.repeatM (threadDelay 1000000 >> return 1)
Concurrent
Since: 0.3.0
(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 Source #
Parallel reverse function application operator for streams; just like the
regular reverse function application operator &
except that it is
concurrent.
drain $ S.repeatM (threadDelay 1000000 >> return 1) |& S.mapM (\x -> threadDelay 1000000 >> print x)
Concurrent
Since: 0.3.0
Filtering
filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #
Include only those elements that pass a predicate.
Since: 0.1.0
filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #
Same as filter
but with a monadic predicate.
Since: 0.4.0
Mapping Filters
mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b Source #
Deleting Elements
deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a Source #
Deletes the first occurrence of the element in the stream that satisfies the given equality predicate.
> S.toList $ S.deleteBy (==) 3 $ S.fromList [1,3,3,5] [1,3,5]
Since: 0.6.0
uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a Source #
Drop repeated elements that are adjacent to each other.
Since: 0.6.0
Inserting Elements
intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #
Generate a stream by inserting the result of a monadic action between consecutive elements of the given stream. Note that the monadic action is performed after the stream action before which its result is inserted.
> S.toList $ S.intersperseM (return ',') $ S.fromList "hello" "h,e,l,l,o"
Since: 0.5.0
intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a Source #
Generate a stream by inserting a given element between consecutive elements of the given stream.
> S.toList $ S.intersperse ',' $ S.fromList "hello" "h,e,l,l,o"
Since: 0.7.0
intersperseSuffix :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #
Insert a monadic action after each element in the stream.
Internal
intersperseSuffixBySpan :: (IsStream t, MonadAsync m) => Int -> m a -> t m a -> t m a Source #
Like intersperseSuffix
but intersperses a monadic action into the input
stream after every n
elements and after the last element.
> S.toList $ S.intersperseSuffixBySpan 2 (return ',') $ S.fromList "hello" "he,ll,o,"
Internal
interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a Source #
Intersperse a monadic action into the input stream after every n
seconds.
> S.drain $ S.interjectSuffix 1 (putChar ',') $ S.mapM (\x -> threadDelay 1000000 >> putChar x) $ S.fromList "hello" "h,e,l,l,o"
Internal
delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #
Introduces a delay of specified seconds after each element of a stream.
Internal
Indexing
indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a) Source #
indexed = S.postscanl' (\(i, _) x -> (i + 1, x)) (-1,undefined) indexed = S.zipWith (,) (S.enumerateFrom 0)
Pair each element in a stream with its index, starting from index 0.
> S.toList $ S.indexed $ S.fromList "hello" [(0,h
),(1,e
),(2,l
),(3,l
),(4,o
)]
Since: 0.6.0
indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a) Source #
indexedR n = S.postscanl' (\(i, _) x -> (i - 1, x)) (n + 1,undefined) indexedR n = S.zipWith (,) (S.enumerateFromThen n (n - 1))
Pair each element in a stream with its index, starting from the
given index n
and counting down.
> S.toList $ S.indexedR 10 $ S.fromList "hello" [(10,h
),(9,e
),(8,l
),(7,l
),(6,o
)]
Since: 0.6.0
Reordering
reverse :: (IsStream t, Monad m) => t m a -> t m a Source #
Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.
Since 0.7.0 (Monad m constraint)
Since: 0.1.1
Parsing
splitParse :: (IsStream t, MonadThrow m) => Parser m a b -> t m a -> t m b Source #
Apply a Parse
repeatedly on a stream and emit the parsed values in the
output stream.
>>>
S.toList $ S.splitParse (PR.take 2 $ PR.fromFold FL.sum) $ S.fromList [1..10]
> [3,7,11,15,19]
>>>
S.toList $ S.splitParse (PR.line FL.toList) $ S.fromList "hello\nworld"
> ["hello\n","world"]
Trimming
take :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #
Take first n
elements from the stream and discard the rest.
Since: 0.1.0
takeByTime :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a Source #
takeByTime duration
yields stream elements upto specified time
duration
. The duration starts when the stream is evaluated for the first
time, before the first element is yielded. The time duration is checked
before generating each element, if the duration has expired the stream
stops.
The total time taken in executing the stream is guaranteed to be at least
duration
, however, because the duration is checked before generating an
element, the upper bound is indeterminate and depends on the time taken in
generating and processing the last element.
No element is yielded if the duration is zero. At least one element is yielded if the duration is non-zero.
Internal
takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #
End the stream as soon as the predicate fails on an element.
Since: 0.1.0
takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #
Same as takeWhile
but with a monadic predicate.
Since: 0.4.0
drop :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #
Discard first n
elements from the stream and take the rest.
Since: 0.1.0
dropByTime :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a Source #
dropByTime duration
drops stream elements until specified duration
has
passed. The duration begins when the stream is evaluated for the first
time. The time duration is checked after generating a stream element, the
element is yielded if the duration has expired otherwise it is dropped.
The time elapsed before starting to generate the first element is at most
duration
, however, because the duration expiry is checked after the
element is generated, the lower bound is indeterminate and depends on the
time taken in generating an element.
All elements are yielded if the duration is zero.
Internal
dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #
Drop elements in the stream as long as the predicate succeeds and then take the rest of the stream.
Since: 0.1.0
dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #
Same as dropWhile
but with a monadic predicate.
Since: 0.4.0
Breaking
chunksOf :: (IsStream t, Monad m) => Int -> Fold m a b -> t m a -> t m b Source #
Group the input stream into groups of n
elements each and then fold each
group using the provided fold function.
> S.toList $ S.chunksOf 2 FL.sum (S.enumerateFromTo 1 10) [3,7,11,15,19]
This can be considered as an n-fold version of ltake
where we apply
ltake
repeatedly on the leftover stream until the stream exhausts.
Since: 0.7.0
chunksOf2 :: (IsStream t, Monad m) => Int -> m c -> Fold2 m c a b -> t m a -> t m b Source #
Internal
arraysOf :: (IsStream t, MonadIO m, Storable a) => Int -> t m a -> t m (Array a) Source #
arraysOf n stream
groups the elements in the input stream into arrays of
n
elements each.
Same as the following but may be more efficient:
arraysOf n = S.chunksOf n (A.writeN n)
Internal
intervalsOf :: (IsStream t, MonadAsync m) => Double -> Fold m a b -> t m a -> t m b Source #
Group the input stream into windows of n
second each and then fold each
group using the provided fold function.
Since: 0.7.0
Searching
findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int Source #
Find all the indices where the element in the stream satisfies the given predicate.
Since: 0.5.0
elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int Source #
Find all the indices where the value of the element in the stream is equal to the given value.
Since: 0.5.0
Splitting
Streams can be sliced into segments in space or in time. We use the
term chunk
to refer to a spatial length of the stream (spatial window)
and the term session
to refer to a length in time (time window).
splitOn :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #
Split on an infixed separator element, dropping the separator. Splits the
stream on separator elements determined by the supplied predicate, separator
is considered as infixed between two segments, if one side of the separator
is missing then it is parsed as an empty stream. The supplied Fold
is
applied on the split segments. With -
representing non-separator elements
and .
as separator, splitOn
splits as follows:
"--.--" => "--" "--" "--." => "--" "" ".--" => "" "--"
splitOn (== x)
is an inverse of intercalate (S.yield x)
Let's use the following definition for illustration:
splitOn' p xs = S.toList $ S.splitOn p (FL.toList) (S.fromList xs)
>>>
splitOn' (== '.') ""
[""]
>>>
splitOn' (== '.') "."
["",""]
>>>
splitOn' (== '.') ".a"
> ["","a"]
>>>
splitOn' (== '.') "a."
> ["a",""]
>>>
splitOn' (== '.') "a.b"
> ["a","b"]
>>>
splitOn' (== '.') "a..b"
> ["a","","b"]
Since: 0.7.0
splitOnSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #
Like splitOn
but the separator is considered as suffixed to the segments
in the stream. A missing suffix at the end is allowed. A separator at the
beginning is parsed as empty segment. With -
representing elements and
.
as separator, splitOnSuffix
splits as follows:
"--.--." => "--" "--" "--.--" => "--" "--" ".--." => "" "--"
splitOnSuffix' p xs = S.toList $ S.splitSuffixBy p (FL.toList) (S.fromList xs)
>>>
splitOnSuffix' (== '.') ""
[]
>>>
splitOnSuffix' (== '.') "."
[""]
>>>
splitOnSuffix' (== '.') "a"
["a"]
>>>
splitOnSuffix' (== '.') ".a"
> ["","a"]
>>>
splitOnSuffix' (== '.') "a."
> ["a"]
>>>
splitOnSuffix' (== '.') "a.b"
> ["a","b"]
>>>
splitOnSuffix' (== '.') "a.b."
> ["a","b"]
>>>
splitOnSuffix' (== '.') "a..b.."
> ["a","","b",""]
lines = splitOnSuffix (== '\n')
Since: 0.7.0
splitWithSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #
Like splitOnSuffix
but keeps the suffix attached to the resulting
splits.
splitWithSuffix' p xs = S.toList $ S.splitWithSuffix p (FL.toList) (S.fromList xs)
>>>
splitWithSuffix' (== '.') ""
[]
>>>
splitWithSuffix' (== '.') "."
["."]
>>>
splitWithSuffix' (== '.') "a"
["a"]
>>>
splitWithSuffix' (== '.') ".a"
> [".","a"]
>>>
splitWithSuffix' (== '.') "a."
> ["a."]
>>>
splitWithSuffix' (== '.') "a.b"
> ["a.","b"]
>>>
splitWithSuffix' (== '.') "a.b."
> ["a.","b."]
>>>
splitWithSuffix' (== '.') "a..b.."
> ["a.",".","b.","."]
Since: 0.7.0
wordsBy :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #
Like splitOn
after stripping leading, trailing, and repeated separators.
Therefore, ".a..b."
with .
as the separator would be parsed as
["a","b"]
. In other words, its like parsing words from whitespace
separated text.
wordsBy' p xs = S.toList $ S.wordsBy p (FL.toList) (S.fromList xs)
>>>
wordsBy' (== ',') ""
> []
>>>
wordsBy' (== ',') ","
> []
>>>
wordsBy' (== ',') ",a,,b,"
> ["a","b"]
words = wordsBy isSpace
Since: 0.7.0
splitOnSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #
Like splitOn
but the separator is a sequence of elements instead of a
single element.
For illustration, let's define a function that operates on pure lists:
splitOnSeq' pat xs = S.toList $ S.splitOnSeq (A.fromList pat) (FL.toList) (S.fromList xs)
>>>
splitOnSeq' "" "hello"
> ["h","e","l","l","o"]
>>>
splitOnSeq' "hello" ""
> [""]
>>>
splitOnSeq' "hello" "hello"
> ["",""]
>>>
splitOnSeq' "x" "hello"
> ["hello"]
>>>
splitOnSeq' "h" "hello"
> ["","ello"]
>>>
splitOnSeq' "o" "hello"
> ["hell",""]
>>>
splitOnSeq' "e" "hello"
> ["h","llo"]
>>>
splitOnSeq' "l" "hello"
> ["he","","o"]
>>>
splitOnSeq' "ll" "hello"
> ["he","o"]
splitOnSeq
is an inverse of intercalate
. The following law always holds:
intercalate . splitOn == id
The following law holds when the separator is non-empty and contains none of the elements present in the input lists:
splitOn . intercalate == id
Internal
splitOnSuffixSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #
Like splitSuffixBy
but the separator is a sequence of elements, instead
of a predicate for a single element.
splitSuffixOn_ pat xs = S.toList $ S.splitSuffixOn (A.fromList pat) (FL.toList) (S.fromList xs)
>>>
splitSuffixOn_ "." ""
[""]
>>>
splitSuffixOn_ "." "."
[""]
>>>
splitSuffixOn_ "." "a"
["a"]
>>>
splitSuffixOn_ "." ".a"
> ["","a"]
>>>
splitSuffixOn_ "." "a."
> ["a"]
>>>
splitSuffixOn_ "." "a.b"
> ["a","b"]
>>>
splitSuffixOn_ "." "a.b."
> ["a","b"]
>>>
splitSuffixOn_ "." "a..b.."
> ["a","","b",""]
lines = splitSuffixOn "\n"
Internal
splitBySeq :: (IsStream t, MonadAsync m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #
Like splitOnSeq
but splits the separator as well, as an infix token.
splitOn'_ pat xs = S.toList $ S.splitOn' (A.fromList pat) (FL.toList) (S.fromList xs)
>>>
splitOn'_ "" "hello"
> ["h","","e","","l","","l","","o"]
>>>
splitOn'_ "hello" ""
> [""]
>>>
splitOn'_ "hello" "hello"
> ["","hello",""]
>>>
splitOn'_ "x" "hello"
> ["hello"]
>>>
splitOn'_ "h" "hello"
> ["","h","ello"]
>>>
splitOn'_ "o" "hello"
> ["hell","o",""]
>>>
splitOn'_ "e" "hello"
> ["h","e","llo"]
>>>
splitOn'_ "l" "hello"
> ["he","l","","l","o"]
>>>
splitOn'_ "ll" "hello"
> ["he","ll","o"]
Internal
splitWithSuffixSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #
Like splitSuffixOn
but keeps the suffix intact in the splits.
splitSuffixOn'_ pat xs = S.toList $ FL.splitSuffixOn' (A.fromList pat) (FL.toList) (S.fromList xs)
>>>
splitSuffixOn'_ "." ""
[""]
>>>
splitSuffixOn'_ "." "."
["."]
>>>
splitSuffixOn'_ "." "a"
["a"]
>>>
splitSuffixOn'_ "." ".a"
> [".","a"]
>>>
splitSuffixOn'_ "." "a."
> ["a."]
>>>
splitSuffixOn'_ "." "a.b"
> ["a.","b"]
>>>
splitSuffixOn'_ "." "a.b."
> ["a.","b."]
>>>
splitSuffixOn'_ "." "a..b.."
> ["a.",".","b.","."]
Internal
splitInnerBy :: (IsStream t, Monad m) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a) Source #
splitInnerBy splitter joiner stream
splits the inner containers f a
of
an input stream t m (f a)
using the splitter
function. Container
elements f a
are collected until a split occurs, then all the elements
before the split are joined using the joiner
function.
For example, if we have a stream of Array Word8
, we may want to split the
stream into arrays representing lines separated by '\n' byte such that the
resulting stream after a split would be one array for each line.
CAUTION! This is not a true streaming function as the container size after the split and merge may not be bounded.
Internal
splitInnerBySuffix :: (IsStream t, Monad m, Eq (f a), Monoid (f a)) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a) Source #
Like splitInnerBy
but splits assuming the separator joins the segment in
a suffix style.
Internal
Grouping
groups :: (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b Source #
groups = groupsBy (==) groups = groupsByRolling (==)
Groups contiguous spans of equal elements together in individual groups.
>>>
S.toList $ S.groups FL.toList $ S.fromList [1,1,2,2]
> [[1,1],[2,2]]
Since: 0.7.0
groupsBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b Source #
groupsBy cmp f $ S.fromList [a,b,c,...]
assigns the element a
to the
first group, if a `cmp` b
is True
then b
is also assigned to the same
group. If a `cmp` c
is True
then c
is also assigned to the same
group and so on. When the comparison fails a new group is started. Each
group is folded using the fold f
and the result of the fold is emitted in
the output stream.
>>>
S.toList $ S.groupsBy (>) FL.toList $ S.fromList [1,3,7,0,2,5]
> [[1,3,7],[0,2,5]]
Since: 0.7.0
groupsByRolling :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b Source #
Unlike groupsBy
this function performs a rolling comparison of two
successive elements in the input stream. groupsByRolling cmp f $ S.fromList
[a,b,c,...]
assigns the element a
to the first group, if a `cmp` b
is
True
then b
is also assigned to the same group. If b `cmp` c
is
True
then c
is also assigned to the same group and so on. When the
comparison fails a new group is started. Each group is folded using the fold
f
.
>>>
S.toList $ S.groupsByRolling (\a b -> a + 1 == b) FL.toList $ S.fromList [1,2,3,7,8,9]
> [[1,2,3],[7,8,9]]
Since: 0.7.0
Group map
rollingMapM :: (IsStream t, Monad m) => (a -> a -> m b) -> t m a -> t m b Source #
Like rollingMap
but with an effectful map function.
Internal
rollingMap :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b Source #
Apply a function on every two successive elements of a stream. If the stream consists of a single element the output is an empty stream.
Internal
Windowed Classification
Split the stream into windows or chunks in space or time. Each window can be associated with a key, all events associated with a particular key in the window can be folded to a single result. The stream is split into windows of specified size, the window can be terminated early if the closing flag is specified in the input stream.
The term "chunk" is used for a space window and the term "session" is used for a time window.
Tumbling Windows
A new window starts after the previous window is finished.
:: (IsStream t, MonadAsync m, Ord k) | |
=> Double | timer tick in seconds |
-> Double | session timeout in seconds |
-> Bool | reset the timeout when an event is received |
-> (Int -> m Bool) | predicate to eject sessions based on session count |
-> Fold m a (Either b b) | Fold to be applied to session events |
-> t m (k, a, AbsTime) | session key, data, timestamp |
-> t m (k, b) | session key, fold result |
classifySessionsBy tick timeout idle pred f stream
groups timestamped
events in an input event stream into sessions based on a session key. Each
element in the stream is an event consisting of a triple (session key,
sesssion data, timestamp)
. session key
is a key that uniquely identifies
the session. All the events belonging to a session are folded using the
fold f
until the fold returns a Left
result or a timeout has occurred.
The session key and the result of the fold are emitted in the output stream
when the session is purged.
When idle
is False
, timeout
is the maximum lifetime of a session in
seconds, measured from the timestamp
of the first event in that session.
When idle
is True
then the timeout is an idle timeout, it is reset after
every event received in the session.
timestamp
in an event characterizes the time when the input event was
generated, this is an absolute time measured from some Epoch
. The notion
of current time is maintained by a monotonic event time clock using the
timestamps seen in the input stream. The latest timestamp seen till now is
used as the base for the current time. When no new events are seen, a timer
is started with a tick duration specified by tick
. This timer is used to
detect session timeouts in the absence of new events.
The predicate pred
is invoked with the current session count, if it
returns True
a session is ejected from the session cache before inserting
a new session. This could be useful to alert or eject sessions when the
number of sessions becomes too high.
Internal
:: (IsStream t, MonadAsync m, Ord k) | |
=> Double | time window size |
-> (Int -> m Bool) | predicate to eject sessions on session count |
-> Fold m a (Either b b) | Fold to be applied to session events |
-> t m (k, a, AbsTime) | session key, data, timestamp |
-> t m (k, b) |
Split the stream into fixed size time windows of specified interval in
seconds. Within each such window, fold the elements in sessions identified
by the session keys. The fold result is emitted in the output stream if the
fold returns a Left
result or if the time window ends.
Session timestamp
in the input stream is an absolute time from some epoch,
characterizing the time when the input element was generated. To detect
session window end, a monotonic event time clock is maintained synced with
the timestamps with a clock resolution of 1 second.
If the ejection predicate returns True
, the session with the longest
lifetime is ejected before inserting a new session.
classifySessionsOf interval pred = classifySessionsBy 1 interval False pred
Internal
Keep Alive Windows
The window size is extended if an event arrives within the specified window size. This can represent sessions with idle or inactive timeout.
classifyKeepAliveSessions Source #
:: (IsStream t, MonadAsync m, Ord k) | |
=> Double | session inactive timeout |
-> (Int -> m Bool) | predicate to eject sessions on session count |
-> Fold m a (Either b b) | Fold to be applied to session payload data |
-> t m (k, a, AbsTime) | session key, data, timestamp |
-> t m (k, b) |
Like classifySessionsOf
but the session is kept alive if an event is
received within the session window. The session times out and gets closed
only if no event is received within the specified session window size.
If the ejection predicate returns True
, the session that was idle for
the longest time is ejected before inserting a new session.
classifyKeepAliveSessions timeout pred = classifySessionsBy 1 timeout True pred
Internal
Sliding Window Buffers
Combining Streams
Appending
append :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Append the outputs of two streams, yielding all the elements from the first stream and then yielding all the elements from the second stream.
IMPORTANT NOTE: This could be 100x faster than serial/<>
for appending a
few (say 100) streams because it can fuse via stream fusion. However, it
does not scale for a large number of streams (say 1000s) and becomes
qudartically slow. Therefore use this for custom appending of a few streams
but use concatMap
or 'concatMapWith serial' for appending n
streams or
infinite containers of streams.
Internal
Interleaving
interleave :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. If any of the streams finishes early the other stream continues alone until it too finishes.
>>>
:set -XOverloadedStrings
>>>
interleave "ab" ",,,," :: SerialT Identity Char
fromList "a,b,,,">>>
interleave "abcd" ",," :: SerialT Identity Char
fromList "a,b,cd"
interleave
is dual to interleaveMin
, it can be called interleaveMax
.
Do not use at scale in concatMapWith.
Internal
interleaveMin :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. The output stops as soon as any of the two streams finishes, discarding the remaining part of the other stream. The last element of the resulting stream would be from the longer stream.
>>>
:set -XOverloadedStrings
>>>
interleaveMin "ab" ",,,," :: SerialT Identity Char
fromList "a,b,">>>
interleaveMin "abcd" ",," :: SerialT Identity Char
fromList "a,b,c"
interleaveMin
is dual to interleave
.
Do not use at scale in concatMapWith.
Internal
interleaveSuffix :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. As soon as the first stream finishes, the output stops, discarding the remaining part of the second stream. In this case, the last element in the resulting stream would be from the second stream. If the second stream finishes early then the first stream still continues to yield elements until it finishes.
>>>
:set -XOverloadedStrings
>>>
interleaveSuffix "abc" ",,,," :: SerialT Identity Char
fromList "a,b,c,">>>
interleaveSuffix "abc" "," :: SerialT Identity Char
fromList "a,bc"
interleaveSuffix
is a dual of interleaveInfix
.
Do not use at scale in concatMapWith.
Internal
interleaveInfix :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream and ending at the first stream. If the second stream is longer than the first, elements from the second stream are infixed with elements from the first stream. If the first stream is longer then it continues yielding elements even after the second stream has finished.
>>>
:set -XOverloadedStrings
>>>
interleaveInfix "abc" ",,,," :: SerialT Identity Char
fromList "a,b,c">>>
interleaveInfix "abc" "," :: SerialT Identity Char
fromList "a,bc"
interleaveInfix
is a dual of interleaveSuffix
.
Do not use at scale in concatMapWith.
Internal
wSerialFst :: IsStream t => t m a -> t m a -> t m a Source #
Like wSerial
but stops interleaving as soon as the first stream stops.
Since: 0.7.0
wSerialMin :: IsStream t => t m a -> t m a -> t m a Source #
Like wSerial
but stops interleaving as soon as any of the two streams
stops.
Since: 0.7.0
Scheduling
roundrobin :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Schedule the execution of two streams in a fair round-robin manner,
executing each stream once, alternately. Execution of a stream may not
necessarily result in an output, a stream may chose to Skip
producing an
element until later giving the other stream a chance to run. Therefore, this
combinator fairly interleaves the execution of two streams rather than
fairly interleaving the output of the two streams. This can be useful in
co-operative multitasking without using explicit threads. This can be used
as an alternative to async
.
Do not use at scale in concatMapWith.
Internal
Parallel
parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
Like parallel
but stops the output as soon as the first stream stops.
Internal
parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
Like parallel
but stops the output as soon as any of the two streams
stops.
Internal
Merging
mergeBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #
Merge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.
If the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order.
> S.toList $ S.mergeBy compare (S.fromList [1,3,5]) (S.fromList [2,4,6,8]) [1,2,3,4,5,6,8]
Since: 0.6.0
mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #
Like mergeBy
but with a monadic comparison function.
Merge two streams randomly:
> randomly _ _ = randomIO >>= x -> return $ if x then LT else GT > S.toList $ S.mergeByM randomly (S.fromList [1,1,1,1]) (S.fromList [2,2,2,2]) [2,1,2,2,2,1,1,1]
Merge two streams in a proportion of 2:1:
proportionately m n = do ref <- newIORef $ cycle $ concat [replicate m LT, replicate n GT] return $ \_ _ -> do r <- readIORef ref writeIORef ref $ tail r return $ head r main = do f <- proportionately 2 1 xs <- S.toList $ S.mergeByM f (S.fromList [1,1,1,1,1,1]) (S.fromList [2,2,2]) print xs
[1,1,2,1,1,2,1,1,2]
Since: 0.6.0
mergeAsyncBy :: (IsStream t, MonadAsync m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #
Like mergeBy
but merges concurrently (i.e. both the elements being
merged are generated concurrently).
Since: 0.6.0
mergeAsyncByM :: (IsStream t, MonadAsync m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #
Like mergeByM
but merges concurrently (i.e. both the elements being
merged are generated concurrently).
Since: 0.6.0
Zipping
zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #
Zip two streams serially using a pure zipping function.
> S.toList $ S.zipWith (+) (S.fromList [1,2,3]) (S.fromList [4,5,6]) [5,7,9]
Since: 0.1.0
zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #
Like zipWith
but using a monadic zipping function.
Since: 0.4.0
zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #
Like zipWith
but zips concurrently i.e. both the streams being zipped
are generated concurrently.
Since: 0.1.0
zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #
Like zipWithM
but zips concurrently i.e. both the streams being zipped
are generated concurrently.
Since: 0.4.0
Folding Containers of Streams
foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #
forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #
Like foldMapWith
but with the last two arguments reversed i.e. the
monadic streaming function is the last argument.
Equivalent to:
forEachWith = flip S.foldMapWith
Since: 0.1.0 (Streamly)
Folding Streams of Streams
concat :: (IsStream t, Monad m) => t m (t m a) -> t m a Source #
Flatten a stream of streams to a single stream.
concat = concatMap id
Internal
concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b Source #
Map a stream producing function on each element of the stream and then flatten the results into a single stream.
concatMap =concatMapWith
serial
concatMap f =concatMapM
(return . f)
Since: 0.6.0
concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b Source #
Map a stream producing monadic function on each element of the stream
and then flatten the results into a single stream. Since the stream
generation function is monadic, unlike concatMap
, it can produce an
effect at the beginning of each iteration of the inner loop.
Since: 0.6.0
concatMapWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m b) -> t m a -> t m b Source #
concatMapWith merge map stream
is a two dimensional looping combinator.
The first argument specifies a merge or concat function that is used to
merge the streams generated by applying the second argument i.e. the map
function to each element of the input stream. The concat function could be
serial
, parallel
, async
, ahead
or any other zip or merge function
and the second argument could be any stream generation function using a
seed.
Compare foldMapWith
Since: 0.7.0
Flattening Using Unfolds
concatUnfoldInterleave :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #
Like concatUnfold
but interleaves the streams in the same way as
interleave
behaves instead of appending them.
Internal
concatUnfoldRoundrobin :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #
Like concatUnfold
but executes the streams in the same way as
roundrobin
.
Internal
Feedback Loops
concatMapIterateWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m a) -> t m a -> t m a Source #
Like iterateM
but using a stream generator function.
Internal
concatMapTreeWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m (Either a b)) -> t m (Either a b) -> t m (Either a b) Source #
Traverse a forest with recursive tree structures whose non-leaf nodes are
of type a
and leaf nodes are of type b
, flattening all the trees into
streams and combining the streams into a single stream consisting of both
leaf and non-leaf nodes.
concatMapTreeWith
is a generalization of concatMap
, using a recursive
feedback loop to append the non-leaf nodes back to the input stream enabling
recursive traversal. concatMap
flattens a single level nesting whereas
concatMapTreeWith
flattens a recursively nested structure.
Traversing a directory tree recursively is a canonical use case of
concatMapTreeWith
.
concatMapTreeWith combine f xs = concatMapIterateWith combine g xs where g (Left tree) = f tree g (Right leaf) = nil
Internal
:: (IsStream t, MonadAsync m) | |
=> (forall x. t m x -> t m x -> t m x) | |
-> (a -> t m (Either b c)) | |
-> (b -> t m a) | feedback function to feed |
-> t m a | |
-> t m c |
Flatten a stream with a feedback loop back into the input.
For example, exceptions generated by the output stream can be fed back to the input to take any corrective action. The corrective action may be to retry the action or do nothing or log the errors. For the retry case we need a feedback loop.
Internal
concatMapTreeYieldLeavesWith :: (IsStream t, MonadAsync m) => (forall x. t m x -> t m x -> t m x) -> (a -> t m (Either a b)) -> t m a -> t m b Source #
Concat a stream of trees, generating only leaves.
Compare with concatMapTreeWith
. While the latter returns all nodes in the
tree, this one returns only the leaves.
Traversing a directory tree recursively and yielding on the files is a
canonical use case of concatMapTreeYieldLeavesWith
.
concatMapTreeYieldLeavesWith combine f = concatMapLoopWith combine f yield
Internal
mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a Source #
Iterate a lazy function f
of the shape `m a -> t m a` until it gets
fully defined i.e. becomes independent of its argument action, then return
the resulting value of the function (`t m a`).
It can be used to construct a stream that uses a cyclic definition. For example:
import Streamly.Internal.Prelude as S import System.IO.Unsafe (unsafeInterleaveIO) main = do S.mapM_ print $ S.mfix $ x -> do a <- S.fromList [1,2] b <- S.fromListM [return 3, unsafeInterleaveIO (fmap fst x)] return (a, b)
Note that the function f
must be lazy in its argument, that's why we use
unsafeInterleaveIO
because IO monad is strict.
Internal
Inserting Streams in Streams
gintercalate :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #
interleaveInfix
followed by unfold and concat.
Internal
gintercalateSuffix :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #
interleaveSuffix
followed by unfold and concat.
Internal
intercalate :: (IsStream t, Monad m) => b -> Unfold m b c -> t m b -> t m c Source #
intersperse
followed by unfold and concat.
unwords = intercalate " " UF.fromList
>>>
intercalate " " UF.fromList ["abc", "def", "ghi"]
> "abc def ghi"
Internal
intercalateSuffix :: (IsStream t, Monad m) => b -> Unfold m b c -> t m b -> t m c Source #
intersperseSuffix
followed by unfold and concat.
unlines = intercalateSuffix "\n" UF.fromList
>>>
intercalate "\n" UF.fromList ["abc", "def", "ghi"]
> "abc\ndef\nghi\n"
Internal
interpose :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #
Unfold the elements of a stream, intersperse the given element between the unfolded streams and then concat them into a single stream.
unwords = S.interpose ' '
Internal
interposeSuffix :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #
Unfold the elements of a stream, append the given element after each unfolded stream and then concat them into a single stream.
unlines = S.interposeSuffix '\n'
Internal
Exceptions
before :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #
Run a side effect before the stream yields its first element.
Since: 0.7.0
after :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #
Run a side effect whenever the stream stops normally.
Prefer afterIO
over this as the after
action in this combinator is not
executed if the unfold is partially evaluated lazily and then garbage
collected.
Since: 0.7.0
afterIO :: (IsStream t, MonadIO m, MonadBaseControl IO m) => m b -> t m a -> t m a Source #
Run a side effect whenever the stream stops normally or is garbage collected after a partial lazy evaluation.
Internal
bracket :: (IsStream t, MonadCatch m) => m b -> (b -> m c) -> (b -> t m a) -> t m a Source #
Run the first action before the stream starts and remember its output, generate a stream using the output, run the second action using the remembered value as an argument whenever the stream ends normally or due to an exception.
Prefer bracketIO
over this as the after
action in this combinator is not
executed if the unfold is partially evaluated lazily and then garbage
collected.
Since: 0.7.0
bracketIO :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> t m a) -> t m a Source #
Run the first action before the stream starts and remember its output, generate a stream using the output, run the second action using the remembered value as an argument whenever the stream ends normally, due to an exception or if it is garbage collected after a partial lazy evaluation.
Internal
onException :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a Source #
Run a side effect whenever the stream aborts due to an exception.
Since: 0.7.0
finally :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a Source #
Run a side effect whenever the stream stops normally or aborts due to an exception.
Prefer finallyIO
over this as the after
action in this combinator is not
executed if the unfold is partially evaluated lazily and then garbage
collected.
Since: 0.7.0
finallyIO :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> t m a -> t m a Source #
Run a side effect whenever the stream stops normally, aborts due to an exception or if it is garbage collected after a partial lazy evaluation.
Internal
handle :: (IsStream t, MonadCatch m, Exception e) => (e -> t m a) -> t m a -> t m a Source #
When evaluating a stream if an exception occurs, stream evaluation aborts and the specified exception handler is run with the exception as argument.
Since: 0.7.0
Generalize Inner Monad
hoist :: (Monad m, Monad n) => (forall x. m x -> n x) -> SerialT m a -> SerialT n a Source #
Transform the inner monad of a stream using a natural transformation.
Internal
generally :: (IsStream t, Monad m) => t Identity a -> t m a Source #
Generalize the inner monad of the stream from Identity
to any monad.
Internal
Transform Inner Monad
liftInner :: (Monad m, IsStream t, MonadTrans tr, Monad (tr m)) => t m a -> t (tr m) a Source #
Lift the inner monad of a stream using a monad transformer.
Internal
usingReaderT :: (Monad m, IsStream t) => r -> (t (ReaderT r m) a -> t (ReaderT r m) a) -> t m a -> t m a Source #
Run a stream transformation using a given environment.
Internal
runReaderT :: (IsStream t, Monad m) => s -> t (ReaderT s m) a -> t m a Source #
Evaluate the inner monad of a stream as ReaderT
.
Internal
usingStateT :: Monad m => s -> (SerialT (StateT s m) a -> SerialT (StateT s m) a) -> SerialT m a -> SerialT m a Source #
Run a stateful (StateT) stream transformation using a given state.
This is supported only for SerialT
as concurrent state updation may not be
safe.
Internal
Diagnostics
inspectMode :: IsStream t => t m a -> t m a Source #
Print debug information about an SVar when the stream ends
Internal
Deprecated
once :: (Monad m, IsStream t) => m a -> t m a Source #
Deprecated: Please use yieldM instead.
Same as yieldM
Since: 0.2.0
each :: (IsStream t, Foldable f) => f a -> t m a Source #
Deprecated: Please use fromFoldable instead.
Same as fromFoldable
.
Since: 0.1.0
scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b Source #
Deprecated: Please use scanl followed by map instead.
Strict left scan with an extraction function. Like scanl'
, but applies a
user supplied extraction function (the third argument) at each step. This is
designed to work with the foldl
library. The suffix x
is a mnemonic for
extraction.
Since: 0.7.0 (Monad m constraint)
Since 0.2.0
foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b Source #
Deprecated: Please use foldl' followed by fmap instead.
Strict left fold with an extraction function. Like the standard strict
left fold, but applies a user supplied extraction function (the third
argument) to the folded value at the end. This is designed to work with the
foldl
library. The suffix x
is a mnemonic for extraction.
Since: 0.2.0
foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b Source #
Deprecated: Please use foldlM' followed by fmap instead.
Like foldx
, but with a monadic step function.
Since: 0.2.0
foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #
Deprecated: Use foldrM instead.
Lazy right fold for non-empty streams, using first element as the starting
value. Returns Nothing
if the stream is empty.
Since: 0.5.0
runStream :: Monad m => SerialT m a -> m () Source #
Deprecated: Please use "drain" instead
Run a stream, discarding the results. By default it interprets the stream
as SerialT
, to run other types of streams use the type adapting
combinators for example runStream .
.asyncly
Since: 0.2.0
runN :: Monad m => Int -> SerialT m a -> m () Source #
Deprecated: Please use "drainN" instead
runN n = runStream . take n
Run maximum up to n
iterations of a stream.
Since: 0.6.0
runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #
Deprecated: Please use "drainWhile" instead
runWhile p = runStream . takeWhile p
Run a stream as long as the predicate holds true.
Since: 0.6.0