| Copyright | (c) 2017 Harendra Kumar | 
|---|---|
| License | BSD3 | 
| Maintainer | streamly@composewell.com | 
| Stability | experimental | 
| Portability | GHC | 
| Safe Haskell | None | 
| Language | Haskell2010 | 
Streamly.Internal.Prelude
Contents
Description
This is an Internal module consisting of released, unreleased and unimplemented APIs. For stable and released APIs please see Streamly.Prelude module. This module provides documentation only for the unreleased and unimplemented APIs. For documentation on released APIs please see Streamly.Prelude module.
Synopsis
- nil :: IsStream t => t m a
- nilM :: (IsStream t, Monad m) => m b -> t m a
- cons :: IsStream t => a -> t m a -> t m a
- (.:) :: IsStream t => a -> t m a -> t m a
- consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- (|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- yield :: IsStream t => a -> t m a
- yieldM :: (Monad m, IsStream t) => m a -> t m a
- repeat :: (IsStream t, Monad m) => a -> t m a
- repeatM :: (IsStream t, MonadAsync m) => m a -> t m a
- replicate :: (IsStream t, Monad m) => Int -> a -> t m a
- replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a
- class Enum a => Enumerable a where- enumerateFrom :: (IsStream t, Monad m) => a -> t m a
- enumerateFromTo :: (IsStream t, Monad m) => a -> a -> t m a
- enumerateFromThen :: (IsStream t, Monad m) => a -> a -> t m a
- enumerateFromThenTo :: (IsStream t, Monad m) => a -> a -> a -> t m a
 
- enumerate :: (IsStream t, Monad m, Bounded a, Enumerable a) => t m a
- enumerateTo :: (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a
- unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a
- unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a
- unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b
- iterate :: (IsStream t, Monad m) => (a -> a) -> a -> t m a
- iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a
- fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a
- fromIndicesM :: (IsStream t, MonadAsync m) => (Int -> m a) -> t m a
- fromList :: (Monad m, IsStream t) => [a] -> t m a
- fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a
- fromFoldable :: (IsStream t, Foldable f) => f a -> t m a
- fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a
- fromPrimVar :: (IsStream t, MonadIO m, Prim a) => Var IO a -> t m a
- fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a
- currentTime :: (IsStream t, MonadAsync m) => Double -> t m AbsTime
- uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a))
- tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a))
- init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a))
- foldrM :: Monad m => (a -> m b -> m b) -> m b -> SerialT m a -> m b
- foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
- foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> t m a -> s m b
- foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b
- foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b
- foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a)
- foldlM' :: Monad m => (b -> a -> m b) -> b -> SerialT m a -> m b
- fold :: Monad m => Fold m a b -> SerialT m a -> m b
- parse :: MonadThrow m => Parser m a b -> SerialT m a -> m b
- foldAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
- (|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b
- (|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b
- drain :: Monad m => SerialT m a -> m ()
- last :: Monad m => SerialT m a -> m (Maybe a)
- length :: Monad m => SerialT m a -> m Int
- sum :: (Monad m, Num a) => SerialT m a -> m a
- product :: (Monad m, Num a) => SerialT m a -> m a
- mconcat :: (Monad m, Monoid a) => SerialT m a -> m a
- maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a)
- maximum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a)
- minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a)
- minimum :: (Monad m, Ord a) => SerialT m a -> m (Maybe a)
- the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a)
- toList :: Monad m => SerialT m a -> m [a]
- toListRev :: Monad m => SerialT m a -> m [a]
- toPure :: Monad m => SerialT m a -> m (SerialT Identity a)
- toPureRev :: Monad m => SerialT m a -> m (SerialT Identity a)
- toStream :: Monad m => Fold m a (SerialT Identity a)
- toStreamRev :: Monad m => Fold m a (SerialT Identity a)
- drainN :: Monad m => Int -> SerialT m a -> m ()
- drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m ()
- (!!) :: Monad m => SerialT m a -> Int -> m (Maybe a)
- head :: Monad m => SerialT m a -> m (Maybe a)
- headElse :: Monad m => a -> SerialT m a -> m a
- findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a)
- find :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe a)
- lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b)
- findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int)
- elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int)
- null :: Monad m => SerialT m a -> m Bool
- elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool
- notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool
- all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool
- any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool
- and :: Monad m => SerialT m Bool -> m Bool
- or :: Monad m => SerialT m Bool -> m Bool
- eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool
- cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering
- isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool
- isSuffixOf :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m Bool
- isInfixOf :: (MonadIO m, Eq a, Enum a, Storable a) => SerialT m a -> SerialT m a -> m Bool
- isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool
- stripPrefix :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m (Maybe (t m a))
- stripSuffix :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m (Maybe (SerialT m a))
- dropPrefix :: t m a -> t m a -> t m a
- dropInfix :: t m a -> t m a -> t m a
- dropSuffix :: t m a -> t m a -> t m a
- transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b
- map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b
- sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a
- mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b
- mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m ()
- trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a
- tap :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m a
- tapOffsetEvery :: (IsStream t, Monad m) => Int -> Int -> Fold m a b -> t m a -> t m a
- tapAsync :: (IsStream t, MonadAsync m) => Fold m a b -> t m a -> t m a
- tapRate :: (IsStream t, MonadAsync m, MonadCatch m) => Double -> (Int -> m b) -> t m a -> t m a
- pollCounts :: (IsStream t, MonadAsync m) => (a -> Bool) -> (t m Int -> t m Int) -> Fold m Int b -> t m a -> t m a
- scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
- scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> t m b
- postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
- postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> t m b
- prescanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
- prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
- scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a
- scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a
- scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
- postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
- mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a
- applyAsync :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
- (|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b
- (|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b
- filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
- filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
- mapMaybe :: (IsStream t, Monad m) => (a -> Maybe b) -> t m a -> t m b
- mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b
- deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a
- uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a
- insertBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a
- intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a
- intersperseSuffix :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a
- intersperseSuffixBySpan :: (IsStream t, MonadAsync m) => Int -> m a -> t m a -> t m a
- interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a
- delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
- indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a)
- indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a)
- reverse :: (IsStream t, Monad m) => t m a -> t m a
- reverse' :: (IsStream t, MonadIO m, Storable a) => t m a -> t m a
- splitParse :: (IsStream t, MonadThrow m) => Parser m a b -> t m a -> t m b
- take :: (IsStream t, Monad m) => Int -> t m a -> t m a
- takeByTime :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a
- takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
- takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
- drop :: (IsStream t, Monad m) => Int -> t m a -> t m a
- dropByTime :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a
- dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
- dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
- chunksOf :: (IsStream t, Monad m) => Int -> Fold m a b -> t m a -> t m b
- chunksOf2 :: (IsStream t, Monad m) => Int -> m c -> Fold2 m c a b -> t m a -> t m b
- arraysOf :: (IsStream t, MonadIO m, Storable a) => Int -> t m a -> t m (Array a)
- intervalsOf :: (IsStream t, MonadAsync m) => Double -> Fold m a b -> t m a -> t m b
- findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int
- elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int
- splitOn :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b
- splitOnSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b
- splitWithSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b
- wordsBy :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b
- splitOnSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b
- splitOnSuffixSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b
- splitBySeq :: (IsStream t, MonadAsync m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b
- splitWithSuffixSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b
- splitInnerBy :: (IsStream t, Monad m) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
- splitInnerBySuffix :: (IsStream t, Monad m, Eq (f a), Monoid (f a)) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
- groups :: (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b
- groupsBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b
- groupsByRolling :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b
- rollingMapM :: (IsStream t, Monad m) => (a -> a -> m b) -> t m a -> t m b
- rollingMap :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b
- classifySessionsBy :: (IsStream t, MonadAsync m, Ord k) => Double -> Double -> Bool -> (Int -> m Bool) -> Fold m a (Either b b) -> t m (k, a, AbsTime) -> t m (k, b)
- classifySessionsOf :: (IsStream t, MonadAsync m, Ord k) => Double -> (Int -> m Bool) -> Fold m a (Either b b) -> t m (k, a, AbsTime) -> t m (k, b)
- classifyKeepAliveSessions :: (IsStream t, MonadAsync m, Ord k) => Double -> (Int -> m Bool) -> Fold m a (Either b b) -> t m (k, a, AbsTime) -> t m (k, b)
- append :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- interleave :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- interleaveMin :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- interleaveSuffix :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- interleaveInfix :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- wSerialFst :: IsStream t => t m a -> t m a -> t m a
- wSerialMin :: IsStream t => t m a -> t m a -> t m a
- roundrobin :: (IsStream t, Monad m) => t m b -> t m b -> t m b
- parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
- mergeBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a
- mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a
- mergeAsyncBy :: (IsStream t, MonadAsync m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a
- mergeAsyncByM :: (IsStream t, MonadAsync m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a
- zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c
- zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c
- zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c
- zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c
- foldWith :: (IsStream t, Foldable f) => (t m a -> t m a -> t m a) -> f (t m a) -> t m a
- foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b
- forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b
- concat :: (IsStream t, Monad m) => t m (t m a) -> t m a
- concatM :: (IsStream t, Monad m) => m (t m a) -> t m a
- concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b
- concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b
- concatMapWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m b) -> t m a -> t m b
- concatUnfold :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
- concatUnfoldInterleave :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
- concatUnfoldRoundrobin :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b
- concatMapIterateWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m a) -> t m a -> t m a
- concatMapTreeWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m (Either a b)) -> t m (Either a b) -> t m (Either a b)
- concatMapLoopWith :: (IsStream t, MonadAsync m) => (forall x. t m x -> t m x -> t m x) -> (a -> t m (Either b c)) -> (b -> t m a) -> t m a -> t m c
- concatMapTreeYieldLeavesWith :: (IsStream t, MonadAsync m) => (forall x. t m x -> t m x -> t m x) -> (a -> t m (Either a b)) -> t m a -> t m b
- mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a
- gintercalate :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
- gintercalateSuffix :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c
- intercalate :: (IsStream t, Monad m) => b -> Unfold m b c -> t m b -> t m c
- intercalateSuffix :: (IsStream t, Monad m) => b -> Unfold m b c -> t m b -> t m c
- interpose :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c
- interposeSuffix :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c
- before :: (IsStream t, Monad m) => m b -> t m a -> t m a
- after :: (IsStream t, Monad m) => m b -> t m a -> t m a
- afterIO :: (IsStream t, MonadIO m, MonadBaseControl IO m) => m b -> t m a -> t m a
- bracket :: (IsStream t, MonadCatch m) => m b -> (b -> m c) -> (b -> t m a) -> t m a
- bracketIO :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> t m a) -> t m a
- onException :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a
- finally :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a
- finallyIO :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> t m a -> t m a
- handle :: (IsStream t, MonadCatch m, Exception e) => (e -> t m a) -> t m a -> t m a
- hoist :: (Monad m, Monad n) => (forall x. m x -> n x) -> SerialT m a -> SerialT n a
- generally :: (IsStream t, Monad m) => t Identity a -> t m a
- liftInner :: (Monad m, IsStream t, MonadTrans tr, Monad (tr m)) => t m a -> t (tr m) a
- usingReaderT :: (Monad m, IsStream t) => r -> (t (ReaderT r m) a -> t (ReaderT r m) a) -> t m a -> t m a
- runReaderT :: (IsStream t, Monad m) => s -> t (ReaderT s m) a -> t m a
- evalStateT :: Monad m => s -> SerialT (StateT s m) a -> SerialT m a
- usingStateT :: Monad m => s -> (SerialT (StateT s m) a -> SerialT (StateT s m) a) -> SerialT m a -> SerialT m a
- runStateT :: Monad m => s -> SerialT (StateT s m) a -> SerialT m (s, a)
- inspectMode :: IsStream t => t m a -> t m a
- once :: (Monad m, IsStream t) => m a -> t m a
- each :: (IsStream t, Foldable f) => f a -> t m a
- scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
- foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b
- foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b
- foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a)
- runStream :: Monad m => SerialT m a -> m ()
- runN :: Monad m => Int -> SerialT m a -> m ()
- runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m ()
- fromHandle :: (IsStream t, MonadIO m) => Handle -> t m String
- toHandle :: MonadIO m => Handle -> SerialT m String -> m ()
Construction
Primitives
nilM :: (IsStream t, Monad m) => m b -> t m a Source #
An empty stream producing a side effect.
> toList (nilM (print "nil")) "nil" []
Internal
cons :: IsStream t => a -> t m a -> t m a infixr 5 Source #
Construct a stream by adding a pure value at the head of an existing
 stream. For serial streams this is the same as (return a) `consM` r but
 more efficient. For concurrent streams this is not concurrent whereas
 consM is concurrent. For example:
> toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3]
Since: 0.1.0
consM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #
Constructs a stream by adding a monadic action at the head of an existing stream. For example:
> toList $ getLine `consM` getLine `consM` nil hello world ["hello","world"]
Concurrent (do not use parallely to construct infinite streams)
Since: 0.2.0
(|:) :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a infixr 5 Source #
Operator equivalent of consM. We can read it as "parallel colon"
 to remember that | comes before :.
> toList $ getLine |: getLine |: nil hello world ["hello","world"]
let delay = threadDelay 1000000 >> print 1 drain $ serially $ delay |: delay |: delay |: nil drain $ parallely $ delay |: delay |: delay |: nil
Concurrent (do not use parallely to construct infinite streams)
Since: 0.2.0
From Values
yield :: IsStream t => a -> t m a Source #
yield a = a `cons` nil
Create a singleton stream from a pure value.
The following holds in monadic streams, but not in Zip streams:
yield = pure yield = yieldM . pure
In Zip applicative streams yield is not the same as pure because in that
 case pure is equivalent to repeat instead. yield and pure are
 equally efficient, in other cases yield may be slightly more efficient
 than the other equivalent definitions.
Since: 0.4.0
yieldM :: (Monad m, IsStream t) => m a -> t m a Source #
yieldM m = m `consM` nil
Create a singleton stream from a monadic action.
> toList $ yieldM getLine hello ["hello"]
Since: 0.4.0
repeat :: (IsStream t, Monad m) => a -> t m a Source #
Generate an infinite stream by repeating a pure value.
Since: 0.4.0
repeatM :: (IsStream t, MonadAsync m) => m a -> t m a Source #
repeatM = fix . consM repeatM = cycle1 . yieldM
Generate a stream by repeatedly executing a monadic action forever.
drain $ serially $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1) drain $ asyncly $ S.take 10 $ S.repeatM $ (threadDelay 1000000 >> print 1)
Concurrent, infinite (do not use with parallely)
Since: 0.2.0
replicate :: (IsStream t, Monad m) => Int -> a -> t m a Source #
replicate = take n . repeat
Generate a stream of length n by repeating a value n times.
Since: 0.6.0
replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a Source #
replicateM = take n . repeatM
Generate a stream by performing a monadic action n times. Same as:
drain $ serially $ S.replicateM 10 $ (threadDelay 1000000 >> print 1) drain $ asyncly $ S.replicateM 10 $ (threadDelay 1000000 >> print 1)
Concurrent
Since: 0.1.1
Enumeration
class Enum a => Enumerable a where Source #
Types that can be enumerated as a stream. The operations in this type
 class are equivalent to those in the Enum type class, except that these
 generate a stream instead of a list. Use the functions in
 Streamly.Internal.Data.Stream.Enumeration module to define new instances.
Since: 0.6.0
Methods
enumerateFrom :: (IsStream t, Monad m) => a -> t m a Source #
enumerateFrom from generates a stream starting with the element
 from, enumerating up to maxBound when the type is Bounded or
 generating an infinite stream when the type is not Bounded.
> S.toList $ S.take 4 $ S.enumerateFrom (0 :: Int) [0,1,2,3]
For Fractional types, enumeration is numerically stable. However, no
 overflow or underflow checks are performed.
> S.toList $ S.take 4 $ S.enumerateFrom 1.1 [1.1,2.1,3.1,4.1]
Since: 0.6.0
enumerateFromTo :: (IsStream t, Monad m) => a -> a -> t m a Source #
Generate a finite stream starting with the element from, enumerating
 the type up to the value to. If to is smaller than from then an
 empty stream is returned.
> S.toList $ S.enumerateFromTo 0 4 [0,1,2,3,4]
For Fractional types, the last element is equal to the specified to
 value after rounding to the nearest integral value.
> S.toList $ S.enumerateFromTo 1.1 4 [1.1,2.1,3.1,4.1] > S.toList $ S.enumerateFromTo 1.1 4.6 [1.1,2.1,3.1,4.1,5.1]
Since: 0.6.0
enumerateFromThen :: (IsStream t, Monad m) => a -> a -> t m a Source #
enumerateFromThen from then generates a stream whose first element
 is from, the second element is then and the successive elements are
 in increments of then - from.  Enumeration can occur downwards or
 upwards depending on whether then comes before or after from. For
 Bounded types the stream ends when maxBound is reached, for
 unbounded types it keeps enumerating infinitely.
> S.toList $ S.take 4 $ S.enumerateFromThen 0 2 [0,2,4,6] > S.toList $ S.take 4 $ S.enumerateFromThen 0 (-2) [0,-2,-4,-6]
Since: 0.6.0
enumerateFromThenTo :: (IsStream t, Monad m) => a -> a -> a -> t m a Source #
enumerateFromThenTo from then to generates a finite stream whose
 first element is from, the second element is then and the successive
 elements are in increments of then - from up to to. Enumeration can
 occur downwards or upwards depending on whether then comes before or
 after from.
> S.toList $ S.enumerateFromThenTo 0 2 6 [0,2,4,6] > S.toList $ S.enumerateFromThenTo 0 (-2) (-6) [0,-2,-4,-6]
Since: 0.6.0
Instances
enumerateTo :: (IsStream t, Monad m, Bounded a, Enumerable a) => a -> t m a Source #
From Generators
unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a Source #
unfoldr step s =
    case step s of
        Nothing -> nil
        Just (a, b) -> a `cons` unfoldr step b
Build a stream by unfolding a pure step function step starting from a
 seed s.  The step function returns the next element in the stream and the
 next seed value. When it is done it returns Nothing and the stream ends.
 For example,
let f b =
        if b > 3
        then Nothing
        else Just (b, b + 1)
in toList $ unfoldr f 0
[0,1,2,3]
Since: 0.1.0
unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a Source #
Build a stream by unfolding a monadic step function starting from a
 seed.  The step function returns the next element in the stream and the next
 seed value. When it is done it returns Nothing and the stream ends. For
 example,
let f b =
        if b > 3
        then return Nothing
        else print b >> return (Just (b, b + 1))
in drain $ unfoldrM f 0
0 1 2 3
When run concurrently, the next unfold step can run concurrently with the processing of the output of the previous step. Note that more than one step cannot run concurrently as the next step depends on the output of the previous step.
(asyncly $ S.unfoldrM (\n -> liftIO (threadDelay 1000000) >> return (Just (n, n + 1))) 0)
    & S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()
Concurrent
Since: 0.1.0
unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b Source #
Convert an Unfold into a stream by supplying it an input seed.
>>>unfold (UF.replicateM 10) (putStrLn "hello")
Since: 0.7.0
iterate :: (IsStream t, Monad m) => (a -> a) -> a -> t m a Source #
iterate f x = x `cons` iterate f x
Generate an infinite stream with x as the first element and each
 successive element derived by applying the function f on the previous
 element.
> S.toList $ S.take 5 $ S.iterate (+1) 1 [1,2,3,4,5]
Since: 0.1.2
iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a Source #
iterateM f m = m >>= a -> return a `consM` iterateM f (f a)
Generate an infinite stream with the first element generated by the action
 m and each successive element derived by applying the monadic function
 f on the previous element.
When run concurrently, the next iteration can run concurrently with the processing of the previous iteration. Note that more than one iteration cannot run concurrently as the next iteration depends on the output of the previous iteration.
drain $ serially $ S.take 10 $ S.iterateM
     (\x -> threadDelay 1000000 >> print x >> return (x + 1)) (return 0)
drain $ asyncly  $ S.take 10 $ S.iterateM
     (\x -> threadDelay 1000000 >> print x >> return (x + 1)) (return 0)
Concurrent
Since: 0.7.0 (signature change)
Since: 0.1.2
fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a Source #
fromIndices f = let g i = f i `cons` g (i + 1) in g 0
Generate an infinite stream, whose values are the output of a function f
 applied on the corresponding index.  Index starts at 0.
> S.toList $ S.take 5 $ S.fromIndices id [0,1,2,3,4]
Since: 0.6.0
fromIndicesM :: (IsStream t, MonadAsync m) => (Int -> m a) -> t m a Source #
fromIndicesM f = let g i = f i `consM` g (i + 1) in g 0
Generate an infinite stream, whose values are the output of a monadic
 function f applied on the corresponding index. Index starts at 0.
Concurrent
Since: 0.6.0
From Containers
fromList :: (Monad m, IsStream t) => [a] -> t m a Source #
fromList =foldrconsnil
Construct a stream from a list of pure values. This is more efficient than
 fromFoldable for serial streams.
Since: 0.4.0
fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a Source #
fromListM =foldrconsMnil
Construct a stream from a list of monadic actions. This is more efficient
 than fromFoldableM for serial streams.
Since: 0.4.0
fromFoldable :: (IsStream t, Foldable f) => f a -> t m a Source #
fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a Source #
fromFoldableM =foldrconsMnil
Construct a stream from a Foldable containing monadic actions.
drain $ serially $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1) drain $ asyncly $ S.fromFoldableM $ replicateM 10 (threadDelay 1000000 >> print 1)
Concurrent (do not use with parallely on infinite containers)
Since: 0.3.0
fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a Source #
Takes a callback setter function and provides it with a callback. The callback when invoked adds a value at the tail of the stream. Returns a stream of values generated by the callback.
Internal
Time related
currentTime :: (IsStream t, MonadAsync m) => Double -> t m AbsTime Source #
currentTime g returns a stream of absolute timestamps using a clock of
 granularity g specified in seconds. A low granularity clock is more
 expensive in terms of CPU usage.
Note: This API is not safe on 32-bit machines.
Internal
Elimination
Deconstruction
uncons :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (a, t m a)) Source #
Decompose a stream into its head and tail. If the stream is empty, returns
 Nothing. If the stream is non-empty, returns Just (a, ma), where a is
 the head of the stream and ma its tail.
This is a brute force primitive. Avoid using it as long as possible, use it when no other combinator can do the job. This can be used to do pretty much anything in an imperative manner, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.
Since: 0.1.0
tail :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #
tail = fmap (fmap snd) . uncons
Extract all but the first element of the stream, if any.
Since: 0.1.1
init :: (IsStream t, Monad m) => SerialT m a -> m (Maybe (t m a)) Source #
Extract all but the last element of the stream, if any.
Since: 0.5.0
Folding
Right Folds
foldrM :: Monad m => (a -> m b -> m b) -> m b -> SerialT m a -> m b Source #
Right associative/lazy pull fold. foldrM build final stream constructs
 an output structure using the step function build. build is invoked with
 the next input element and the remaining (lazy) tail of the output
 structure. It builds a lazy output expression using the two. When the "tail
 structure" in the output expression is evaluated it calls build again thus
 lazily consuming the input stream until either the output expression built
 by build is free of the "tail" or the input is exhausted in which case
 final is used as the terminating case for the output structure. For more
 details see the description in the previous section.
Example, determine if any element is odd in a stream:
>>>S.foldrM (\x xs -> if odd x then return True else xs) (return False) $ S.fromList (2:4:5:undefined)> True
Since: 0.7.0 (signature changed)
Since: 0.2.0 (signature changed)
Since: 0.1.0
foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b Source #
Right fold to a streaming monad.
foldrS S.cons S.nil === id
foldrS can be used to perform stateless stream to stream transformations
 like map and filter in general. It can be coupled with a scan to perform
 stateful transformations. However, note that the custom map and filter
 routines can be much more efficient than this due to better stream fusion.
>>>S.toList $ S.foldrS S.cons S.nil $ S.fromList [1..5]> [1,2,3,4,5]
Find if any element in the stream is True:
>>>S.toList $ S.foldrS (\x xs -> if odd x then return True else xs) (return False) $ (S.fromList (2:4:5:undefined) :: SerialT IO Int)> [True]
Map (+2) on odd elements and filter out the even elements:
>>>S.toList $ S.foldrS (\x xs -> if odd x then (x + 2) `S.cons` xs else xs) S.nil $ (S.fromList [1..5] :: SerialT IO Int)> [3,5,7]
foldrM can also be represented in terms of foldrS, however, the former
 is much more efficient:
foldrM f z s = runIdentityT $ foldrS (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s
Internal
foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s) => (a -> s m b -> s m b) -> s m b -> t m a -> s m b Source #
Right fold to a transformer monad.  This is the most general right fold
 function. foldrS is a special case of foldrT, however foldrS
 implementation can be more efficient:
foldrS = foldrT foldrM f z s = runIdentityT $ foldrT (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s
foldrT can be used to translate streamly streams to other transformer
 monads e.g.  to a different streaming type.
Internal
foldr :: Monad m => (a -> b -> b) -> b -> SerialT m a -> m b Source #
Right fold, lazy for lazy monads and pure streams, and strict for strict monads.
Please avoid using this routine in strict monads like IO unless you need a
 strict right fold. This is provided only for use in lazy monads (e.g.
 Identity) or pure streams. Note that with this signature it is not possible
 to implement a lazy foldr when the monad m is strict. In that case it
 would be strict in its accumulator and therefore would necessarily consume
 all its input.
Since: 0.1.0
Left Folds
foldl' :: Monad m => (b -> a -> b) -> b -> SerialT m a -> m b Source #
Left associative/strict push fold. foldl' reduce initial stream invokes
 reduce with the accumulator and the next input in the input stream, using
 initial as the initial value of the current value of the accumulator. When
 the input is exhausted the current value of the accumulator is returned.
 Make sure to use a strict data structure for accumulator to not build
 unnecessary lazy expressions unless that's what you want. See the previous
 section for more details.
Since: 0.2.0
foldl1' :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #
Strict left fold, for non-empty streams, using first element as the
 starting value. Returns Nothing if the stream is empty.
Since: 0.5.0
foldlM' :: Monad m => (b -> a -> m b) -> b -> SerialT m a -> m b Source #
Like foldl' but with a monadic step function.
Since: 0.2.0
Composable Left Folds
fold :: Monad m => Fold m a b -> SerialT m a -> m b Source #
Fold a stream using the supplied left fold.
>>>S.fold FL.sum (S.enumerateFromTo 1 100)5050
Since: 0.7.0
parse :: MonadThrow m => Parser m a b -> SerialT m a -> m b Source #
Parse a stream using the supplied Parse.
Internal
Concurrent Folds
foldAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b Source #
Same as |$..
Internal
(|$.) :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> m b infixr 0 Source #
Parallel fold application operator; applies a fold function t m a -> m b
 to a stream t m a concurrently; The the input stream is evaluated
 asynchronously in an independent thread yielding elements to a buffer and
 the folding action runs in another thread consuming the input from the
 buffer.
If you read the signature as (t m a -> m b) -> (t m a -> m b) you can look
 at it as a transformation that converts a fold function to a buffered
 concurrent fold function.
The . at the end of the operator is a mnemonic for termination of the
 stream.
   S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()
      |$. S.repeatM (threadDelay 1000000 >> return 1)
Concurrent
Since: 0.3.0
(|&.) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> m b) -> m b infixl 1 Source #
Parallel reverse function application operator for applying a run or fold
 functions to a stream. Just like |$. except that the operands are reversed.
S.repeatM (threadDelay 1000000 >> return 1) |&. S.foldlM' (\_ a -> threadDelay 1000000 >> print a) ()
Concurrent
Since: 0.3.0
Full Folds
drain :: Monad m => SerialT m a -> m () Source #
drain = mapM_ (\_ -> return ())
Run a stream, discarding the results. By default it interprets the stream
 as SerialT, to run other types of streams use the type adapting
 combinators for example drain . .asyncly
Since: 0.7.0
last :: Monad m => SerialT m a -> m (Maybe a) Source #
Extract the last element of the stream, if any.
last xs = xs !! (length xs - 1)
Since: 0.1.1
sum :: (Monad m, Num a) => SerialT m a -> m a Source #
Determine the sum of all elements of a stream of numbers. Returns 0 when
 the stream is empty. Note that this is not numerically stable for floating
 point numbers.
Since: 0.1.0
product :: (Monad m, Num a) => SerialT m a -> m a Source #
Determine the product of all elements of a stream of numbers. Returns 1
 when the stream is empty.
Since: 0.1.1
mconcat :: (Monad m, Monoid a) => SerialT m a -> m a Source #
Fold a stream of monoid elements by appending them.
Internal
maximumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #
Determine the maximum element in a stream using the supplied comparison function.
Since: 0.6.0
minimumBy :: Monad m => (a -> a -> Ordering) -> SerialT m a -> m (Maybe a) Source #
Determine the minimum element in a stream using the supplied comparison function.
Since: 0.6.0
the :: (Eq a, Monad m) => SerialT m a -> m (Maybe a) Source #
Ensures that all the elements of the stream are identical and then returns that unique element.
Since: 0.6.0
Lazy Folds
toList :: Monad m => SerialT m a -> m [a] Source #
toList = S.foldr (:) []
Convert a stream into a list in the underlying monad. The list can be
 consumed lazily in a lazy monad (e.g. Identity). In a strict monad (e.g.
 IO) the whole list is generated and buffered before it can be consumed.
Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.
Since: 0.1.0
toListRev :: Monad m => SerialT m a -> m [a] Source #
toListRev = S.foldl' (flip (:)) []
Convert a stream into a list in reverse order in the underlying monad.
Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.
Internal
toPure :: Monad m => SerialT m a -> m (SerialT Identity a) Source #
Convert a stream to a pure stream.
toPure = foldr cons nil
Internal
toPureRev :: Monad m => SerialT m a -> m (SerialT Identity a) Source #
Convert a stream to a pure stream in reverse order.
toPureRev = foldl' (flip cons) nil
Internal
Composable Left Folds
toStream :: Monad m => Fold m a (SerialT Identity a) Source #
A fold that buffers its input to a pure stream.
Warning! working on large streams accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.
Internal
toStreamRev :: Monad m => Fold m a (SerialT Identity a) Source #
Buffers the input stream to a pure stream in the reverse order of the input.
Warning! working on large streams accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.
Internal
Partial Folds
drainN :: Monad m => Int -> SerialT m a -> m () Source #
drainN n = drain . take n
Run maximum up to n iterations of a stream.
Since: 0.7.0
drainWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #
drainWhile p = drain . takeWhile p
Run a stream as long as the predicate holds true.
Since: 0.7.0
(!!) :: Monad m => SerialT m a -> Int -> m (Maybe a) Source #
Lookup the element at the given index.
Since: 0.6.0
head :: Monad m => SerialT m a -> m (Maybe a) Source #
Extract the first element of the stream, if any.
head = (!! 0)
Since: 0.1.0
headElse :: Monad m => a -> SerialT m a -> m a Source #
Extract the first element of the stream, if any, otherwise use the supplied default value. It can help avoid one branch in high performance code.
Internal
findM :: Monad m => (a -> m Bool) -> SerialT m a -> m (Maybe a) Source #
Returns the first element that satisfies the given predicate.
Since: 0.6.0
lookup :: (Monad m, Eq a) => a -> SerialT m (a, b) -> m (Maybe b) Source #
In a stream of (key-value) pairs (a, b), return the value b of the
 first pair where the key equals the given value a.
lookup = snd <$> find ((==) . fst)
Since: 0.5.0
findIndex :: Monad m => (a -> Bool) -> SerialT m a -> m (Maybe Int) Source #
Returns the first index that satisfies the given predicate.
Since: 0.5.0
elemIndex :: (Monad m, Eq a) => a -> SerialT m a -> m (Maybe Int) Source #
Returns the first index where a given value is found in the stream.
elemIndex a = findIndex (== a)
Since: 0.5.0
elem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #
Determine whether an element is present in the stream.
Since: 0.1.0
notElem :: (Monad m, Eq a) => a -> SerialT m a -> m Bool Source #
Determine whether an element is not present in the stream.
Since: 0.1.0
all :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #
Determine whether all elements of a stream satisfy a predicate.
Since: 0.1.0
any :: Monad m => (a -> Bool) -> SerialT m a -> m Bool Source #
Determine whether any of the elements of a stream satisfy a predicate.
Since: 0.1.0
and :: Monad m => SerialT m Bool -> m Bool Source #
Determines if all elements of a boolean stream are True.
Since: 0.5.0
or :: Monad m => SerialT m Bool -> m Bool Source #
Determines whether at least one element of a boolean stream is True.
Since: 0.5.0
Multi-Stream folds
eqBy :: (IsStream t, Monad m) => (a -> b -> Bool) -> t m a -> t m b -> m Bool Source #
Compare two streams for equality using an equality function.
Since: 0.6.0
cmpBy :: (IsStream t, Monad m) => (a -> b -> Ordering) -> t m a -> t m b -> m Ordering Source #
Compare two streams lexicographically using a comparison function.
Since: 0.6.0
isPrefixOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #
Returns True if the first stream is the same as or a prefix of the
 second. A stream is a prefix of itself.
> S.isPrefixOf (S.fromList "hello") (S.fromList "hello" :: SerialT IO Char) True
Since: 0.6.0
isSuffixOf :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m Bool Source #
Returns True if the first stream is a suffix of the second. A stream is
 considered a suffix of itself.
> S.isSuffixOf (S.fromList "hello") (S.fromList "hello" :: SerialT IO Char) True
Space: O(n), buffers entire input stream and the suffix.
Internal
Suboptimal - Help wanted.
isSubsequenceOf :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m Bool Source #
Returns True if all the elements of the first stream occur, in order, in
 the second stream. The elements do not have to occur consecutively. A stream
 is a subsequence of itself.
> S.isSubsequenceOf (S.fromList "hlo") (S.fromList "hello" :: SerialT IO Char) True
Since: 0.6.0
stripPrefix :: (Eq a, IsStream t, Monad m) => t m a -> t m a -> m (Maybe (t m a)) Source #
Strip prefix if present and tell whether it was stripped or not. Returns
 Nothing if the stream does not start with the given prefix, stripped
 stream otherwise. Returns Just nil when the prefix is the same as the
 stream.
Space: O(1)
Since: 0.6.0
stripSuffix :: (Monad m, Eq a) => SerialT m a -> SerialT m a -> m (Maybe (SerialT m a)) Source #
Drops the given suffix from a stream. Returns Nothing if the stream does
 not end with the given suffix. Returns Just nil when the suffix is the
 same as the stream.
It may be more efficient to convert the stream to an Array and use stripSuffix on that especially if the elements have a Storable or Prim instance.
Space: O(n), buffers the entire input stream as well as the suffix
Internal
dropPrefix :: t m a -> t m a -> t m a Source #
Drop prefix from the input stream if present.
Space: O(1)
Unimplemented - Help wanted.
dropInfix :: t m a -> t m a -> t m a Source #
Drop all matching infix from the input stream if present. Infix stream may be consumed multiple times.
Space: O(n) where n is the length of the infix.
Unimplemented - Help wanted.
dropSuffix :: t m a -> t m a -> t m a Source #
Drop suffix from the input stream if present. Suffix stream may be consumed multiple times.
Space: O(n) where n is the length of the suffix.
Unimplemented - Help wanted.
Transformation
transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b Source #
Use a Pipe to transform a stream.
Internal
Mapping
sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a Source #
sequence = mapM id
Replace the elements of a stream of monadic actions with the outputs of those actions.
> drain $ S.sequence $ S.fromList [putStr "a", putStr "b", putStrLn "c"]
abc
drain $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1)
          & (serially . S.sequence)
drain $ S.replicateM 10 (return $ threadDelay 1000000 >> print 1)
          & (asyncly . S.sequence)
Concurrent (do not use with parallely on infinite streams)
Since: 0.1.0
mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b Source #
mapM f = sequence . map f
Apply a monadic function to each element of the stream and replace it with the output of the resulting action.
> drain $ S.mapM putStr $ S.fromList ["a", "b", "c"]
abc
drain $ S.replicateM 10 (return 1)
          & (serially . S.mapM (\x -> threadDelay 1000000 >> print x))
drain $ S.replicateM 10 (return 1)
          & (asyncly . S.mapM (\x -> threadDelay 1000000 >> print x))
Concurrent (do not use with parallely on infinite streams)
Since: 0.1.0
Special Maps
mapM_ :: Monad m => (a -> m b) -> SerialT m a -> m () Source #
mapM_ = drain . mapM
Apply a monadic action to each element of the stream and discard the output of the action. This is not really a pure transformation operation but a transformation followed by fold.
Since: 0.1.0
trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a Source #
Apply a monadic function to each element flowing through the stream and discard the results.
> S.drain $ S.trace print (S.enumerateFromTo 1 2) 1 2
Compare with tap.
Since: 0.7.0
tap :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m a Source #
Tap the data flowing through a stream into a Fold. For example, you may
 add a tap to log the contents flowing through the stream. The fold is used
 only for effects, its result is discarded.
                  Fold m a b
                      |
-----stream m a ---------------stream m a-----
> S.drain $ S.tap (FL.drainBy print) (S.enumerateFromTo 1 2) 1 2
Compare with trace.
Since: 0.7.0
tapOffsetEvery :: (IsStream t, Monad m) => Int -> Int -> Fold m a b -> t m a -> t m a Source #
tapOffsetEvery offset n taps every nth element in the stream
 starting at offset. offset can be between 0 and n - 1. Offset 0
 means start at the first element in the stream. If the offset is outside
 this range then offset  is used as offset.mod n
>>> S.drain $ S.tapOffsetEvery 0 2 (FL.mapM print FL.toList) $ S.enumerateFromTo 0 10 > [0,2,4,6,8,10]
Internal
tapAsync :: (IsStream t, MonadAsync m) => Fold m a b -> t m a -> t m a Source #
Redirect a copy of the stream to a supplied fold and run it concurrently
 in an independent thread. The fold may buffer some elements. The buffer size
 is determined by the prevailing maxBuffer setting.
              Stream m a -> m b
                      |
-----stream m a ---------------stream m a-----
> S.drain $ S.tapAsync (S.mapM_ print) (S.enumerateFromTo 1 2) 1 2
Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained.
Compare with tap.
Internal
tapRate :: (IsStream t, MonadAsync m, MonadCatch m) => Double -> (Int -> m b) -> t m a -> t m a Source #
Calls the supplied function with the number of elements consumed
 every n seconds. The given function is run in a separate thread
 until the end of the stream. In case there is an exception in the
 stream the thread is killed during the next major GC.
Note: The action is not guaranteed to run if the main thread exits.
> delay n = threadDelay (round $ n * 1000000) >> return n > S.drain $ S.tapRate 2 (\n -> print $ show n ++ " elements processed") (delay 1 S.|: delay 0.5 S.|: delay 0.5 S.|: S.nil) 2 elements processed 1 elements processed
Note: This may not work correctly on 32-bit machines.
Internal
pollCounts :: (IsStream t, MonadAsync m) => (a -> Bool) -> (t m Int -> t m Int) -> Fold m Int b -> t m a -> t m a Source #
pollCounts predicate transform fold stream counts those elements in the
 stream that pass the predicate. The resulting count stream is sent to
 another thread which transforms it using transform and then folds it using
 fold.  The thread is automatically cleaned up if the stream stops or
 aborts due to exception.
For example, to print the count of elements processed every second:
> S.drain $ S.pollCounts (const True) (S.rollingMap (-) . S.delayPost 1) (FL.drainBy print)
          $ S.enumerateFrom 0
Note: This may not work correctly on 32-bit machines.
Internal
Scanning
Left scans
scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #
Strict left scan. Like map, scanl' too is a one to one transformation,
 however it adds an extra element.
> S.toList $ S.scanl' (+) 0 $ fromList [1,2,3,4] [0,1,3,6,10]
> S.toList $ S.scanl' (flip (:)) [] $ S.fromList [1,2,3,4] [[],[1],[2,1],[3,2,1],[4,3,2,1]]
The output of scanl' is the initial value of the accumulator followed by
 all the intermediate steps and the final result of foldl'.
By streaming the accumulated state after each fold step, we can share the state across multiple stages of stream composition. Each stage can modify or extend the state, do some processing with it and emit it for the next stage, thus modularizing the stream processing. This can be useful in stateful or event-driven programming.
Consider the following monolithic example, computing the sum and the product
 of the elements in a stream in one go using a foldl':
> S.foldl' (\(s, p) x -> (s + x, p * x)) (0,1) $ S.fromList [1,2,3,4] (10,24)
Using scanl' we can make it modular by computing the sum in the first
 stage and passing it down to the next stage for computing the product:
> S.foldl' (\(_, p) (s, x) -> (s, p * x)) (0,1) $ S.scanl' (\(s, _) x -> (s + x, x)) (0,1) $ S.fromList [1,2,3,4] (10,24)
IMPORTANT: scanl' evaluates the accumulator to WHNF.  To avoid building
 lazy expressions inside the accumulator, it is recommended that a strict
 data structure is used for accumulator.
Since: 0.2.0
scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> t m b Source #
Like scanl' but with a monadic fold function.
Since: 0.4.0
postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #
Like scanl' but does not stream the initial value of the accumulator.
postscanl' f z xs = S.drop 1 $ S.scanl' f z xs
Since: 0.7.0
postscanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> t m b Source #
Like postscanl' but with a monadic step function.
Since: 0.7.0
prescanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b Source #
Like scanl' but does not stream the final value of the accumulator.
Internal
prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b Source #
Like postscanl' but with a monadic step function.
Internal
scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a Source #
Like scanl' but for a non-empty stream. The first element of the stream
 is used as the initial value of the accumulator. Does nothing if the stream
 is empty.
> S.toList $ S.scanl1 (+) $ fromList [1,2,3,4] [1,3,6,10]
Since: 0.6.0
scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a Source #
Like scanl1' but with a monadic step function.
Since: 0.6.0
Scan Using Fold
scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #
Scan a stream using the given monadic fold.
Since: 0.7.0
postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b Source #
Postscan a stream using the given monadic fold.
Since: 0.7.0
Concurrent Transformation
mkParallel :: (IsStream t, MonadAsync m) => t m a -> t m a Source #
Make the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer.
Internal
applyAsync :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b Source #
Same as |$.
Internal
(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> t m a -> t m b infixr 0 Source #
Parallel transform application operator; applies a stream transformation
 function t m a -> t m b to a stream t m a concurrently; the input stream
 is evaluated asynchronously in an independent thread yielding elements to a
 buffer and the transformation function runs in another thread consuming the
 input from the buffer.  |$ is just like regular function application
 operator $ except that it is concurrent.
If you read the signature as (t m a -> t m b) -> (t m a -> t m b) you can
 look at it as a transformation that converts a transform function to a
 buffered concurrent transform function.
The following code prints a value every second even though each stage adds a 1 second delay.
drain $
   S.mapM (\x -> threadDelay 1000000 >> print x)
     |$ S.repeatM (threadDelay 1000000 >> return 1)
Concurrent
Since: 0.3.0
(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b infixl 1 Source #
Parallel reverse function application operator for streams; just like the
 regular reverse function application operator & except that it is
 concurrent.
drain $
      S.repeatM (threadDelay 1000000 >> return 1)
   |& S.mapM (\x -> threadDelay 1000000 >> print x)
Concurrent
Since: 0.3.0
Filtering
filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #
Include only those elements that pass a predicate.
Since: 0.1.0
filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #
Same as filter but with a monadic predicate.
Since: 0.4.0
Mapping Filters
mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m)) => (a -> m (Maybe b)) -> t m a -> t m b Source #
Deleting Elements
deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a Source #
Deletes the first occurrence of the element in the stream that satisfies the given equality predicate.
> S.toList $ S.deleteBy (==) 3 $ S.fromList [1,3,3,5] [1,3,5]
Since: 0.6.0
uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a Source #
Drop repeated elements that are adjacent to each other.
Since: 0.6.0
Inserting Elements
intersperseM :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #
Generate a stream by inserting the result of a monadic action between consecutive elements of the given stream. Note that the monadic action is performed after the stream action before which its result is inserted.
> S.toList $ S.intersperseM (return ',') $ S.fromList "hello" "h,e,l,l,o"
Since: 0.5.0
intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a Source #
Generate a stream by inserting a given element between consecutive elements of the given stream.
> S.toList $ S.intersperse ',' $ S.fromList "hello" "h,e,l,l,o"
Since: 0.7.0
intersperseSuffix :: (IsStream t, MonadAsync m) => m a -> t m a -> t m a Source #
Insert a monadic action after each element in the stream.
Internal
intersperseSuffixBySpan :: (IsStream t, MonadAsync m) => Int -> m a -> t m a -> t m a Source #
Like intersperseSuffix but intersperses a monadic action into the input
 stream after every n elements and after the last element.
> S.toList $ S.intersperseSuffixBySpan 2 (return ',') $ S.fromList "hello" "he,ll,o,"
Internal
interjectSuffix :: (IsStream t, MonadAsync m) => Double -> m a -> t m a -> t m a Source #
Intersperse a monadic action into the input stream after every n
 seconds.
> S.drain $ S.interjectSuffix 1 (putChar ',') $ S.mapM (\x -> threadDelay 1000000 >> putChar x) $ S.fromList "hello" "h,e,l,l,o"
Internal
delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a Source #
Introduces a delay of specified seconds after each element of a stream.
Internal
Indexing
indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a) Source #
indexed = S.postscanl' (\(i, _) x -> (i + 1, x)) (-1,undefined) indexed = S.zipWith (,) (S.enumerateFrom 0)
Pair each element in a stream with its index, starting from index 0.
> S.toList $ S.indexed $ S.fromList "hello" [(0,h),(1,e),(2,l),(3,l),(4,o)]
Since: 0.6.0
indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a) Source #
indexedR n = S.postscanl' (\(i, _) x -> (i - 1, x)) (n + 1,undefined) indexedR n = S.zipWith (,) (S.enumerateFromThen n (n - 1))
Pair each element in a stream with its index, starting from the
 given index n and counting down.
> S.toList $ S.indexedR 10 $ S.fromList "hello" [(10,h),(9,e),(8,l),(7,l),(6,o)]
Since: 0.6.0
Reordering
reverse :: (IsStream t, Monad m) => t m a -> t m a Source #
Returns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.
Since 0.7.0 (Monad m constraint)
Since: 0.1.1
Parsing
splitParse :: (IsStream t, MonadThrow m) => Parser m a b -> t m a -> t m b Source #
Apply a Parse repeatedly on a stream and emit the parsed values in the
 output stream.
>>>S.toList $ S.splitParse (PR.take 2 $ PR.fromFold FL.sum) $ S.fromList [1..10]> [3,7,11,15,19]
>>>S.toList $ S.splitParse (PR.line FL.toList) $ S.fromList "hello\nworld"> ["hello\n","world"]
Trimming
take :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #
Take first n elements from the stream and discard the rest.
Since: 0.1.0
takeByTime :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a Source #
takeByTime duration yields stream elements upto specified time
 duration. The duration starts when the stream is evaluated for the first
 time, before the first element is yielded. The time duration is checked
 before generating each element, if the duration has expired the stream
 stops.
The total time taken in executing the stream is guaranteed to be at least
 duration, however, because the duration is checked before generating an
 element, the upper bound is indeterminate and depends on the time taken in
 generating and processing the last element.
No element is yielded if the duration is zero. At least one element is yielded if the duration is non-zero.
Internal
takeWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #
End the stream as soon as the predicate fails on an element.
Since: 0.1.0
takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #
Same as takeWhile but with a monadic predicate.
Since: 0.4.0
drop :: (IsStream t, Monad m) => Int -> t m a -> t m a Source #
Discard first n elements from the stream and take the rest.
Since: 0.1.0
dropByTime :: (MonadIO m, IsStream t, TimeUnit64 d) => d -> t m a -> t m a Source #
dropByTime duration drops stream elements until specified duration has
 passed.  The duration begins when the stream is evaluated for the first
 time. The time duration is checked after generating a stream element, the
 element is yielded if the duration has expired otherwise it is dropped.
The time elapsed before starting to generate the first element is at most
 duration, however, because the duration expiry is checked after the
 element is generated, the lower bound is indeterminate and depends on the
 time taken in generating an element.
All elements are yielded if the duration is zero.
Internal
dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a Source #
Drop elements in the stream as long as the predicate succeeds and then take the rest of the stream.
Since: 0.1.0
dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a Source #
Same as dropWhile but with a monadic predicate.
Since: 0.4.0
Breaking
chunksOf :: (IsStream t, Monad m) => Int -> Fold m a b -> t m a -> t m b Source #
Group the input stream into groups of n elements each and then fold each
 group using the provided fold function.
> S.toList $ S.chunksOf 2 FL.sum (S.enumerateFromTo 1 10) [3,7,11,15,19]
This can be considered as an n-fold version of ltake where we apply
 ltake repeatedly on the leftover stream until the stream exhausts.
Since: 0.7.0
chunksOf2 :: (IsStream t, Monad m) => Int -> m c -> Fold2 m c a b -> t m a -> t m b Source #
Internal
arraysOf :: (IsStream t, MonadIO m, Storable a) => Int -> t m a -> t m (Array a) Source #
arraysOf n stream groups the elements in the input stream into arrays of
 n elements each.
Same as the following but may be more efficient:
arraysOf n = S.chunksOf n (A.writeN n)
Internal
intervalsOf :: (IsStream t, MonadAsync m) => Double -> Fold m a b -> t m a -> t m b Source #
Group the input stream into windows of n second each and then fold each
 group using the provided fold function.
Since: 0.7.0
Searching
findIndices :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m Int Source #
Find all the indices where the element in the stream satisfies the given predicate.
Since: 0.5.0
elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int Source #
Find all the indices where the value of the element in the stream is equal to the given value.
Since: 0.5.0
Splitting
Streams can be sliced into segments in space or in time. We use the
 term chunk to refer to a spatial length of the stream (spatial window)
 and the term session to refer to a length in time (time window).
splitOn :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #
Split on an infixed separator element, dropping the separator. Splits the
 stream on separator elements determined by the supplied predicate, separator
 is considered as infixed between two segments, if one side of the separator
 is missing then it is parsed as an empty stream.  The supplied Fold is
 applied on the split segments. With - representing non-separator elements
 and . as separator, splitOn splits as follows:
"--.--" => "--" "--" "--." => "--" "" ".--" => "" "--"
splitOn (== x) is an inverse of intercalate (S.yield x)
Let's use the following definition for illustration:
splitOn' p xs = S.toList $ S.splitOn p (FL.toList) (S.fromList xs)
>>>splitOn' (== '.') ""[""]
>>>splitOn' (== '.') "."["",""]
>>>splitOn' (== '.') ".a"> ["","a"]
>>>splitOn' (== '.') "a."> ["a",""]
>>>splitOn' (== '.') "a.b"> ["a","b"]
>>>splitOn' (== '.') "a..b"> ["a","","b"]
Since: 0.7.0
splitOnSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #
Like splitOn but the separator is considered as suffixed to the segments
 in the stream. A missing suffix at the end is allowed. A separator at the
 beginning is parsed as empty segment.  With - representing elements and
 . as separator, splitOnSuffix splits as follows:
"--.--." => "--" "--" "--.--" => "--" "--" ".--." => "" "--"
splitOnSuffix' p xs = S.toList $ S.splitSuffixBy p (FL.toList) (S.fromList xs)
>>>splitOnSuffix' (== '.') ""[]
>>>splitOnSuffix' (== '.') "."[""]
>>>splitOnSuffix' (== '.') "a"["a"]
>>>splitOnSuffix' (== '.') ".a"> ["","a"]
>>>splitOnSuffix' (== '.') "a."> ["a"]
>>>splitOnSuffix' (== '.') "a.b"> ["a","b"]
>>>splitOnSuffix' (== '.') "a.b."> ["a","b"]
>>>splitOnSuffix' (== '.') "a..b.."> ["a","","b",""]
lines = splitOnSuffix (== '\n')
Since: 0.7.0
splitWithSuffix :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #
Like splitOnSuffix but keeps the suffix attached to the resulting
 splits.
splitWithSuffix' p xs = S.toList $ S.splitWithSuffix p (FL.toList) (S.fromList xs)
>>>splitWithSuffix' (== '.') ""[]
>>>splitWithSuffix' (== '.') "."["."]
>>>splitWithSuffix' (== '.') "a"["a"]
>>>splitWithSuffix' (== '.') ".a"> [".","a"]
>>>splitWithSuffix' (== '.') "a."> ["a."]
>>>splitWithSuffix' (== '.') "a.b"> ["a.","b"]
>>>splitWithSuffix' (== '.') "a.b."> ["a.","b."]
>>>splitWithSuffix' (== '.') "a..b.."> ["a.",".","b.","."]
Since: 0.7.0
wordsBy :: (IsStream t, Monad m) => (a -> Bool) -> Fold m a b -> t m a -> t m b Source #
Like splitOn after stripping leading, trailing, and repeated separators.
 Therefore, ".a..b." with . as the separator would be parsed as
 ["a","b"].  In other words, its like parsing words from whitespace
 separated text.
wordsBy' p xs = S.toList $ S.wordsBy p (FL.toList) (S.fromList xs)
>>>wordsBy' (== ',') ""> []
>>>wordsBy' (== ',') ","> []
>>>wordsBy' (== ',') ",a,,b,"> ["a","b"]
words = wordsBy isSpace
Since: 0.7.0
splitOnSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #
Like splitOn but the separator is a sequence of elements instead of a
 single element.
For illustration, let's define a function that operates on pure lists:
splitOnSeq' pat xs = S.toList $ S.splitOnSeq (A.fromList pat) (FL.toList) (S.fromList xs)
>>>splitOnSeq' "" "hello"> ["h","e","l","l","o"]
>>>splitOnSeq' "hello" ""> [""]
>>>splitOnSeq' "hello" "hello"> ["",""]
>>>splitOnSeq' "x" "hello"> ["hello"]
>>>splitOnSeq' "h" "hello"> ["","ello"]
>>>splitOnSeq' "o" "hello"> ["hell",""]
>>>splitOnSeq' "e" "hello"> ["h","llo"]
>>>splitOnSeq' "l" "hello"> ["he","","o"]
>>>splitOnSeq' "ll" "hello"> ["he","o"]
splitOnSeq is an inverse of intercalate. The following law always holds:
intercalate . splitOn == id
The following law holds when the separator is non-empty and contains none of the elements present in the input lists:
splitOn . intercalate == id
Internal
splitOnSuffixSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #
Like splitSuffixBy but the separator is a sequence of elements, instead
 of a predicate for a single element.
splitSuffixOn_ pat xs = S.toList $ S.splitSuffixOn (A.fromList pat) (FL.toList) (S.fromList xs)
>>>splitSuffixOn_ "." ""[""]
>>>splitSuffixOn_ "." "."[""]
>>>splitSuffixOn_ "." "a"["a"]
>>>splitSuffixOn_ "." ".a"> ["","a"]
>>>splitSuffixOn_ "." "a."> ["a"]
>>>splitSuffixOn_ "." "a.b"> ["a","b"]
>>>splitSuffixOn_ "." "a.b."> ["a","b"]
>>>splitSuffixOn_ "." "a..b.."> ["a","","b",""]
lines = splitSuffixOn "\n"
Internal
splitBySeq :: (IsStream t, MonadAsync m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #
Like splitOnSeq but splits the separator as well, as an infix token.
splitOn'_ pat xs = S.toList $ S.splitOn' (A.fromList pat) (FL.toList) (S.fromList xs)
>>>splitOn'_ "" "hello"> ["h","","e","","l","","l","","o"]
>>>splitOn'_ "hello" ""> [""]
>>>splitOn'_ "hello" "hello"> ["","hello",""]
>>>splitOn'_ "x" "hello"> ["hello"]
>>>splitOn'_ "h" "hello"> ["","h","ello"]
>>>splitOn'_ "o" "hello"> ["hell","o",""]
>>>splitOn'_ "e" "hello"> ["h","e","llo"]
>>>splitOn'_ "l" "hello"> ["he","l","","l","o"]
>>>splitOn'_ "ll" "hello"> ["he","ll","o"]
Internal
splitWithSuffixSeq :: (IsStream t, MonadIO m, Storable a, Enum a, Eq a) => Array a -> Fold m a b -> t m a -> t m b Source #
Like splitSuffixOn but keeps the suffix intact in the splits.
splitSuffixOn'_ pat xs = S.toList $ FL.splitSuffixOn' (A.fromList pat) (FL.toList) (S.fromList xs)
>>>splitSuffixOn'_ "." ""[""]
>>>splitSuffixOn'_ "." "."["."]
>>>splitSuffixOn'_ "." "a"["a"]
>>>splitSuffixOn'_ "." ".a"> [".","a"]
>>>splitSuffixOn'_ "." "a."> ["a."]
>>>splitSuffixOn'_ "." "a.b"> ["a.","b"]
>>>splitSuffixOn'_ "." "a.b."> ["a.","b."]
>>>splitSuffixOn'_ "." "a..b.."> ["a.",".","b.","."]
Internal
splitInnerBy :: (IsStream t, Monad m) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a) Source #
splitInnerBy splitter joiner stream splits the inner containers f a of
 an input stream t m (f a) using the splitter function. Container
 elements f a are collected until a split occurs, then all the elements
 before the split are joined using the joiner function.
For example, if we have a stream of Array Word8, we may want to split the
 stream into arrays representing lines separated by '\n' byte such that the
 resulting stream after a split would be one array for each line.
CAUTION! This is not a true streaming function as the container size after the split and merge may not be bounded.
Internal
splitInnerBySuffix :: (IsStream t, Monad m, Eq (f a), Monoid (f a)) => (f a -> m (f a, Maybe (f a))) -> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a) Source #
Like splitInnerBy but splits assuming the separator joins the segment in
 a suffix style.
Internal
Grouping
groups :: (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b Source #
groups = groupsBy (==) groups = groupsByRolling (==)
Groups contiguous spans of equal elements together in individual groups.
>>>S.toList $ S.groups FL.toList $ S.fromList [1,1,2,2]> [[1,1],[2,2]]
Since: 0.7.0
groupsBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b Source #
groupsBy cmp f $ S.fromList [a,b,c,...] assigns the element a to the
 first group, if a `cmp` b is True then b is also assigned to the same
 group.  If a `cmp` c is True then c is also assigned to the same
 group and so on. When the comparison fails a new group is started. Each
 group is folded using the fold f and the result of the fold is emitted in
 the output stream.
>>>S.toList $ S.groupsBy (>) FL.toList $ S.fromList [1,3,7,0,2,5]> [[1,3,7],[0,2,5]]
Since: 0.7.0
groupsByRolling :: (IsStream t, Monad m) => (a -> a -> Bool) -> Fold m a b -> t m a -> t m b Source #
Unlike groupsBy this function performs a rolling comparison of two
 successive elements in the input stream. groupsByRolling cmp f $ S.fromList
 [a,b,c,...] assigns the element a to the first group, if a `cmp` b is
 True then b is also assigned to the same group.  If b `cmp` c is
 True then c is also assigned to the same group and so on. When the
 comparison fails a new group is started. Each group is folded using the fold
 f.
>>>S.toList $ S.groupsByRolling (\a b -> a + 1 == b) FL.toList $ S.fromList [1,2,3,7,8,9]> [[1,2,3],[7,8,9]]
Since: 0.7.0
Group map
rollingMapM :: (IsStream t, Monad m) => (a -> a -> m b) -> t m a -> t m b Source #
Like rollingMap but with an effectful map function.
Internal
rollingMap :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b Source #
Apply a function on every two successive elements of a stream. If the stream consists of a single element the output is an empty stream.
Internal
Windowed Classification
Split the stream into windows or chunks in space or time. Each window can be associated with a key, all events associated with a particular key in the window can be folded to a single result. The stream is split into windows of specified size, the window can be terminated early if the closing flag is specified in the input stream.
The term "chunk" is used for a space window and the term "session" is used for a time window.
Tumbling Windows
A new window starts after the previous window is finished.
Arguments
| :: (IsStream t, MonadAsync m, Ord k) | |
| => Double | timer tick in seconds | 
| -> Double | session timeout in seconds | 
| -> Bool | reset the timeout when an event is received | 
| -> (Int -> m Bool) | predicate to eject sessions based on session count | 
| -> Fold m a (Either b b) | Fold to be applied to session events | 
| -> t m (k, a, AbsTime) | session key, data, timestamp | 
| -> t m (k, b) | session key, fold result | 
classifySessionsBy tick timeout idle pred f stream groups timestamped
 events in an input event stream into sessions based on a session key. Each
 element in the stream is an event consisting of a triple (session key,
 sesssion data, timestamp).  session key is a key that uniquely identifies
 the session.  All the events belonging to a session are folded using the
 fold f until the fold returns a Left result or a timeout has occurred.
 The session key and the result of the fold are emitted in the output stream
 when the session is purged.
When idle is False, timeout is the maximum lifetime of a session in
 seconds, measured from the timestamp of the first event in that session.
 When idle is True then the timeout is an idle timeout, it is reset after
 every event received in the session.
timestamp in an event characterizes the time when the input event was
 generated, this is an absolute time measured from some Epoch.  The notion
 of current time is maintained by a monotonic event time clock using the
 timestamps seen in the input stream. The latest timestamp seen till now is
 used as the base for the current time.  When no new events are seen, a timer
 is started with a tick duration specified by tick. This timer is used to
 detect session timeouts in the absence of new events.
The predicate pred is invoked with the current session count, if it
 returns True a session is ejected from the session cache before inserting
 a new session. This could be useful to alert or eject sessions when the
 number of sessions becomes too high.
Internal
Arguments
| :: (IsStream t, MonadAsync m, Ord k) | |
| => Double | time window size | 
| -> (Int -> m Bool) | predicate to eject sessions on session count | 
| -> Fold m a (Either b b) | Fold to be applied to session events | 
| -> t m (k, a, AbsTime) | session key, data, timestamp | 
| -> t m (k, b) | 
Split the stream into fixed size time windows of specified interval in
 seconds. Within each such window, fold the elements in sessions identified
 by the session keys. The fold result is emitted in the output stream if the
 fold returns a Left result or if the time window ends.
Session timestamp in the input stream is an absolute time from some epoch,
 characterizing the time when the input element was generated.  To detect
 session window end, a monotonic event time clock is maintained synced with
 the timestamps with a clock resolution of 1 second.
If the ejection predicate returns True, the session with the longest
 lifetime is ejected before inserting a new session.
classifySessionsOf interval pred = classifySessionsBy 1 interval False pred
Internal
Keep Alive Windows
The window size is extended if an event arrives within the specified window size. This can represent sessions with idle or inactive timeout.
classifyKeepAliveSessions Source #
Arguments
| :: (IsStream t, MonadAsync m, Ord k) | |
| => Double | session inactive timeout | 
| -> (Int -> m Bool) | predicate to eject sessions on session count | 
| -> Fold m a (Either b b) | Fold to be applied to session payload data | 
| -> t m (k, a, AbsTime) | session key, data, timestamp | 
| -> t m (k, b) | 
Like classifySessionsOf but the session is kept alive if an event is
 received within the session window. The session times out and gets closed
 only if no event is received within the specified session window size.
If the ejection predicate returns True, the session that was idle for
 the longest time is ejected before inserting a new session.
classifyKeepAliveSessions timeout pred = classifySessionsBy 1 timeout True pred
Internal
Sliding Window Buffers
Combining Streams
Appending
append :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Append the outputs of two streams, yielding all the elements from the first stream and then yielding all the elements from the second stream.
IMPORTANT NOTE: This could be 100x faster than serial/<> for appending a
 few (say 100) streams because it can fuse via stream fusion. However, it
 does not scale for a large number of streams (say 1000s) and becomes
 qudartically slow. Therefore use this for custom appending of a few streams
 but use concatMap or 'concatMapWith serial' for appending n streams or
 infinite containers of streams.
Internal
Interleaving
interleave :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. If any of the streams finishes early the other stream continues alone until it too finishes.
>>>:set -XOverloadedStrings>>>interleave "ab" ",,,," :: SerialT Identity CharfromList "a,b,,,">>>interleave "abcd" ",," :: SerialT Identity CharfromList "a,b,cd"
interleave is dual to interleaveMin, it can be called interleaveMax.
Do not use at scale in concatMapWith.
Internal
interleaveMin :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. The output stops as soon as any of the two streams finishes, discarding the remaining part of the other stream. The last element of the resulting stream would be from the longer stream.
>>>:set -XOverloadedStrings>>>interleaveMin "ab" ",,,," :: SerialT Identity CharfromList "a,b,">>>interleaveMin "abcd" ",," :: SerialT Identity CharfromList "a,b,c"
interleaveMin is dual to interleave.
Do not use at scale in concatMapWith.
Internal
interleaveSuffix :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. As soon as the first stream finishes, the output stops, discarding the remaining part of the second stream. In this case, the last element in the resulting stream would be from the second stream. If the second stream finishes early then the first stream still continues to yield elements until it finishes.
>>>:set -XOverloadedStrings>>>interleaveSuffix "abc" ",,,," :: SerialT Identity CharfromList "a,b,c,">>>interleaveSuffix "abc" "," :: SerialT Identity CharfromList "a,bc"
interleaveSuffix is a dual of interleaveInfix.
Do not use at scale in concatMapWith.
Internal
interleaveInfix :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Interleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream and ending at the first stream. If the second stream is longer than the first, elements from the second stream are infixed with elements from the first stream. If the first stream is longer then it continues yielding elements even after the second stream has finished.
>>>:set -XOverloadedStrings>>>interleaveInfix "abc" ",,,," :: SerialT Identity CharfromList "a,b,c">>>interleaveInfix "abc" "," :: SerialT Identity CharfromList "a,bc"
interleaveInfix is a dual of interleaveSuffix.
Do not use at scale in concatMapWith.
Internal
wSerialFst :: IsStream t => t m a -> t m a -> t m a Source #
Like wSerial but stops interleaving as soon as the first stream stops.
Since: 0.7.0
wSerialMin :: IsStream t => t m a -> t m a -> t m a Source #
Like wSerial but stops interleaving as soon as any of the two streams
 stops.
Since: 0.7.0
Scheduling
roundrobin :: (IsStream t, Monad m) => t m b -> t m b -> t m b Source #
Schedule the execution of two streams in a fair round-robin manner,
 executing each stream once, alternately. Execution of a stream may not
 necessarily result in an output, a stream may chose to Skip producing an
 element until later giving the other stream a chance to run. Therefore, this
 combinator fairly interleaves the execution of two streams rather than
 fairly interleaving the output of the two streams. This can be useful in
 co-operative multitasking without using explicit threads. This can be used
 as an alternative to async.
Do not use at scale in concatMapWith.
Internal
Parallel
parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
Like parallel but stops the output as soon as the first stream stops.
Internal
parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a Source #
Like parallel but stops the output as soon as any of the two streams
 stops.
Internal
Merging
mergeBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #
Merge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.
If the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order.
> S.toList $ S.mergeBy compare (S.fromList [1,3,5]) (S.fromList [2,4,6,8]) [1,2,3,4,5,6,8]
Since: 0.6.0
mergeByM :: (IsStream t, Monad m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #
Like mergeBy but with a monadic comparison function.
Merge two streams randomly:
> randomly _ _ = randomIO >>= x -> return $ if x then LT else GT > S.toList $ S.mergeByM randomly (S.fromList [1,1,1,1]) (S.fromList [2,2,2,2]) [2,1,2,2,2,1,1,1]
Merge two streams in a proportion of 2:1:
proportionately m n = do
 ref <- newIORef $ cycle $ concat [replicate m LT, replicate n GT]
 return $ \_ _ -> do
     r <- readIORef ref
     writeIORef ref $ tail r
     return $ head r
main = do
 f <- proportionately 2 1
 xs <- S.toList $ S.mergeByM f (S.fromList [1,1,1,1,1,1]) (S.fromList [2,2,2])
 print xs
[1,1,2,1,1,2,1,1,2]
Since: 0.6.0
mergeAsyncBy :: (IsStream t, MonadAsync m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a Source #
Like mergeBy but merges concurrently (i.e. both the elements being
 merged are generated concurrently).
Since: 0.6.0
mergeAsyncByM :: (IsStream t, MonadAsync m) => (a -> a -> m Ordering) -> t m a -> t m a -> t m a Source #
Like mergeByM but merges concurrently (i.e. both the elements being
 merged are generated concurrently).
Since: 0.6.0
Zipping
zipWith :: (IsStream t, Monad m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #
Zip two streams serially using a pure zipping function.
> S.toList $ S.zipWith (+) (S.fromList [1,2,3]) (S.fromList [4,5,6]) [5,7,9]
Since: 0.1.0
zipWithM :: (IsStream t, Monad m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #
Like zipWith but using a monadic zipping function.
Since: 0.4.0
zipAsyncWith :: (IsStream t, MonadAsync m) => (a -> b -> c) -> t m a -> t m b -> t m c Source #
Like zipWith but zips concurrently i.e. both the streams being zipped
 are generated concurrently.
Since: 0.1.0
zipAsyncWithM :: (IsStream t, MonadAsync m) => (a -> b -> m c) -> t m a -> t m b -> t m c Source #
Like zipWithM but zips concurrently i.e. both the streams being zipped
 are generated concurrently.
Since: 0.4.0
Folding Containers of Streams
foldMapWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> (a -> t m b) -> f a -> t m b Source #
forEachWith :: (IsStream t, Foldable f) => (t m b -> t m b -> t m b) -> f a -> (a -> t m b) -> t m b Source #
Like foldMapWith but with the last two arguments reversed i.e. the
 monadic streaming function is the last argument.
Equivalent to:
forEachWith = flip S.foldMapWith
Since: 0.1.0 (Streamly)
Folding Streams of Streams
concat :: (IsStream t, Monad m) => t m (t m a) -> t m a Source #
Flatten a stream of streams to a single stream.
concat = concatMap id
Internal
concatMap :: (IsStream t, Monad m) => (a -> t m b) -> t m a -> t m b Source #
Map a stream producing function on each element of the stream and then flatten the results into a single stream.
concatMap =concatMapWithserialconcatMap f =concatMapM(return . f)
Since: 0.6.0
concatMapM :: (IsStream t, Monad m) => (a -> m (t m b)) -> t m a -> t m b Source #
Map a stream producing monadic function on each element of the stream
 and then flatten the results into a single stream. Since the stream
 generation function is monadic, unlike concatMap, it can produce an
 effect at the beginning of each iteration of the inner loop.
Since: 0.6.0
concatMapWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m b) -> t m a -> t m b Source #
concatMapWith merge map stream is a two dimensional looping combinator.
 The first argument specifies a merge or concat function that is used to
 merge the streams generated by applying the second argument i.e. the map
 function to each element of the input stream. The concat function could be
 serial, parallel, async, ahead or any other zip or merge function
 and the second argument could be any stream generation function using a
 seed.
Compare foldMapWith
Since: 0.7.0
Flattening Using Unfolds
concatUnfoldInterleave :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #
Like concatUnfold but interleaves the streams in the same way as
 interleave behaves instead of appending them.
Internal
concatUnfoldRoundrobin :: (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b Source #
Like concatUnfold but executes the streams in the same way as
 roundrobin.
Internal
Feedback Loops
concatMapIterateWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m a) -> t m a -> t m a Source #
Like iterateM but using a stream generator function.
Internal
concatMapTreeWith :: IsStream t => (forall c. t m c -> t m c -> t m c) -> (a -> t m (Either a b)) -> t m (Either a b) -> t m (Either a b) Source #
Traverse a forest with recursive tree structures whose non-leaf nodes are
 of type a and leaf nodes are of type b, flattening all the trees into
 streams and combining the streams into a single stream consisting of both
 leaf and non-leaf nodes.
concatMapTreeWith is a generalization of concatMap, using a recursive
 feedback loop to append the non-leaf nodes back to the input stream enabling
 recursive traversal.  concatMap flattens a single level nesting whereas
 concatMapTreeWith flattens a recursively nested structure.
Traversing a directory tree recursively is a canonical use case of
 concatMapTreeWith.
concatMapTreeWith combine f xs = concatMapIterateWith combine g xs
     where
     g (Left tree)  = f tree
     g (Right leaf) = nil
Internal
Arguments
| :: (IsStream t, MonadAsync m) | |
| => (forall x. t m x -> t m x -> t m x) | |
| -> (a -> t m (Either b c)) | |
| -> (b -> t m a) | feedback function to feed  | 
| -> t m a | |
| -> t m c | 
Flatten a stream with a feedback loop back into the input.
For example, exceptions generated by the output stream can be fed back to the input to take any corrective action. The corrective action may be to retry the action or do nothing or log the errors. For the retry case we need a feedback loop.
Internal
concatMapTreeYieldLeavesWith :: (IsStream t, MonadAsync m) => (forall x. t m x -> t m x -> t m x) -> (a -> t m (Either a b)) -> t m a -> t m b Source #
Concat a stream of trees, generating only leaves.
Compare with concatMapTreeWith. While the latter returns all nodes in the
 tree, this one returns only the leaves.
Traversing a directory tree recursively and yielding on the files  is a
 canonical use case of concatMapTreeYieldLeavesWith.
concatMapTreeYieldLeavesWith combine f = concatMapLoopWith combine f yield
Internal
mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a Source #
Iterate a lazy function f of the shape `m a -> t m a` until it gets
 fully defined i.e. becomes independent of its argument action, then return
 the resulting value of the function (`t m a`).
It can be used to construct a stream that uses a cyclic definition. For example:
import Streamly.Internal.Prelude as S
import System.IO.Unsafe (unsafeInterleaveIO)
main = do
    S.mapM_ print $ S.mfix $ x -> do
      a <- S.fromList [1,2]
      b <- S.fromListM [return 3, unsafeInterleaveIO (fmap fst x)]
      return (a, b)
Note that the function f must be lazy in its argument, that's why we use
 unsafeInterleaveIO because IO monad is strict.
Internal
Inserting Streams in Streams
gintercalate :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #
interleaveInfix followed by unfold and concat.
Internal
gintercalateSuffix :: (IsStream t, Monad m) => Unfold m a c -> t m a -> Unfold m b c -> t m b -> t m c Source #
interleaveSuffix followed by unfold and concat.
Internal
intercalate :: (IsStream t, Monad m) => b -> Unfold m b c -> t m b -> t m c Source #
intersperse followed by unfold and concat.
unwords = intercalate " " UF.fromList
>>>intercalate " " UF.fromList ["abc", "def", "ghi"]> "abc def ghi"
Internal
intercalateSuffix :: (IsStream t, Monad m) => b -> Unfold m b c -> t m b -> t m c Source #
intersperseSuffix followed by unfold and concat.
unlines = intercalateSuffix "\n" UF.fromList
>>>intercalate "\n" UF.fromList ["abc", "def", "ghi"]> "abc\ndef\nghi\n"
Internal
interpose :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #
Unfold the elements of a stream, intersperse the given element between the unfolded streams and then concat them into a single stream.
unwords = S.interpose ' '
Internal
interposeSuffix :: (IsStream t, Monad m) => c -> Unfold m b c -> t m b -> t m c Source #
Unfold the elements of a stream, append the given element after each unfolded stream and then concat them into a single stream.
unlines = S.interposeSuffix '\n'
Internal
Exceptions
before :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #
Run a side effect before the stream yields its first element.
Since: 0.7.0
after :: (IsStream t, Monad m) => m b -> t m a -> t m a Source #
Run a side effect whenever the stream stops normally.
Prefer afterIO over this as the after action in this combinator is not
 executed if the unfold is partially evaluated lazily and then garbage
 collected.
Since: 0.7.0
afterIO :: (IsStream t, MonadIO m, MonadBaseControl IO m) => m b -> t m a -> t m a Source #
Run a side effect whenever the stream stops normally or is garbage collected after a partial lazy evaluation.
Internal
bracket :: (IsStream t, MonadCatch m) => m b -> (b -> m c) -> (b -> t m a) -> t m a Source #
Run the first action before the stream starts and remember its output, generate a stream using the output, run the second action using the remembered value as an argument whenever the stream ends normally or due to an exception.
Prefer bracketIO over this as the after action in this combinator is not
 executed if the unfold is partially evaluated lazily and then garbage
 collected.
Since: 0.7.0
bracketIO :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> (b -> m c) -> (b -> t m a) -> t m a Source #
Run the first action before the stream starts and remember its output, generate a stream using the output, run the second action using the remembered value as an argument whenever the stream ends normally, due to an exception or if it is garbage collected after a partial lazy evaluation.
Internal
onException :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a Source #
Run a side effect whenever the stream aborts due to an exception.
Since: 0.7.0
finally :: (IsStream t, MonadCatch m) => m b -> t m a -> t m a Source #
Run a side effect whenever the stream stops normally or aborts due to an exception.
Prefer finallyIO over this as the after action in this combinator is not
 executed if the unfold is partially evaluated lazily and then garbage
 collected.
Since: 0.7.0
finallyIO :: (IsStream t, MonadAsync m, MonadCatch m) => m b -> t m a -> t m a Source #
Run a side effect whenever the stream stops normally, aborts due to an exception or if it is garbage collected after a partial lazy evaluation.
Internal
handle :: (IsStream t, MonadCatch m, Exception e) => (e -> t m a) -> t m a -> t m a Source #
When evaluating a stream if an exception occurs, stream evaluation aborts and the specified exception handler is run with the exception as argument.
Since: 0.7.0
Generalize Inner Monad
hoist :: (Monad m, Monad n) => (forall x. m x -> n x) -> SerialT m a -> SerialT n a Source #
Transform the inner monad of a stream using a natural transformation.
Internal
generally :: (IsStream t, Monad m) => t Identity a -> t m a Source #
Generalize the inner monad of the stream from Identity to any monad.
Internal
Transform Inner Monad
liftInner :: (Monad m, IsStream t, MonadTrans tr, Monad (tr m)) => t m a -> t (tr m) a Source #
Lift the inner monad of a stream using a monad transformer.
Internal
usingReaderT :: (Monad m, IsStream t) => r -> (t (ReaderT r m) a -> t (ReaderT r m) a) -> t m a -> t m a Source #
Run a stream transformation using a given environment.
Internal
runReaderT :: (IsStream t, Monad m) => s -> t (ReaderT s m) a -> t m a Source #
Evaluate the inner monad of a stream as ReaderT.
Internal
usingStateT :: Monad m => s -> (SerialT (StateT s m) a -> SerialT (StateT s m) a) -> SerialT m a -> SerialT m a Source #
Run a stateful (StateT) stream transformation using a given state.
This is supported only for SerialT as concurrent state updation may not be
 safe.
Internal
Diagnostics
inspectMode :: IsStream t => t m a -> t m a Source #
Print debug information about an SVar when the stream ends
Internal
Deprecated
once :: (Monad m, IsStream t) => m a -> t m a Source #
Deprecated: Please use yieldM instead.
Same as yieldM
Since: 0.2.0
each :: (IsStream t, Foldable f) => f a -> t m a Source #
Deprecated: Please use fromFoldable instead.
Same as fromFoldable.
Since: 0.1.0
scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b Source #
Deprecated: Please use scanl followed by map instead.
Strict left scan with an extraction function. Like scanl', but applies a
 user supplied extraction function (the third argument) at each step. This is
 designed to work with the foldl library. The suffix x is a mnemonic for
 extraction.
Since: 0.7.0 (Monad m constraint)
Since 0.2.0
foldx :: Monad m => (x -> a -> x) -> x -> (x -> b) -> SerialT m a -> m b Source #
Deprecated: Please use foldl' followed by fmap instead.
Strict left fold with an extraction function. Like the standard strict
 left fold, but applies a user supplied extraction function (the third
 argument) to the folded value at the end. This is designed to work with the
 foldl library. The suffix x is a mnemonic for extraction.
Since: 0.2.0
foldxM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> SerialT m a -> m b Source #
Deprecated: Please use foldlM' followed by fmap instead.
Like foldx, but with a monadic step function.
Since: 0.2.0
foldr1 :: Monad m => (a -> a -> a) -> SerialT m a -> m (Maybe a) Source #
Deprecated: Use foldrM instead.
Lazy right fold for non-empty streams, using first element as the starting
 value. Returns Nothing if the stream is empty.
Since: 0.5.0
runStream :: Monad m => SerialT m a -> m () Source #
Deprecated: Please use "drain" instead
Run a stream, discarding the results. By default it interprets the stream
 as SerialT, to run other types of streams use the type adapting
 combinators for example runStream . .asyncly
Since: 0.2.0
runN :: Monad m => Int -> SerialT m a -> m () Source #
Deprecated: Please use "drainN" instead
runN n = runStream . take n
Run maximum up to n iterations of a stream.
Since: 0.6.0
runWhile :: Monad m => (a -> Bool) -> SerialT m a -> m () Source #
Deprecated: Please use "drainWhile" instead
runWhile p = runStream . takeWhile p
Run a stream as long as the predicate holds true.
Since: 0.6.0