h&h      !"#$%&'()*+,-./0123456789:;< = > ? @A B C D E F G H I J K L M N O P Q R S T U V W X Y Z [ \ ] ^ _ ` a b c d e f g h i j k l m n o p q r s t u v w x y z { | } ~                                                                                !(c) 2019 Composewell TechnologiesBSD3streamly@composewell.comreleasedGHC Safe-Inferred! $%'.15789:  !" "! !(c) 2021 Composewell Technologies BSD-3-Clausestreamly@composewell.comreleasedGHC Safe-Inferred! $%'.15789:'()*()*'!(c) 2017 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:3streamlyA monad that can perform concurrent or parallel IO operations. Streams that can be composed concurrently require the underlying monad to be 3.5streamlyWhen we run computations concurrently, we completely isolate the state of the concurrent computations from the parent computation. The invariant is that we should never be running two concurrent computations in the same thread without using the runInIO function. Also, we should never be running a concurrent computation in the parent thread, otherwise it may affect the state of the parent which is against the defined semantics of concurrent execution. 01234567 34012567!(c) 2017 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:8streamlyFork a thread to run the given computation, installing the provided exception handler. Lifted to any monad with 'MonadRunInIO m' capability.TODO: the RunInIO argument can be removed, we can directly pass the action as "mrun action" instead.9streamly Similar to 8>, but has a "bound" boolean parameter for specifying whether  should be used instead of .:streamly:= lifted to any monad with 'MonadBaseControl IO m' capability.;streamlyFork a thread that is automatically killed as soon as the reference to the returned threadId is garbage collected.89:;89:; &(c) 2018-2019 Composewell TechnologiesBSD3streamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:<=>?<=>? !(c) 2017 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:I)AstreamlyAn abstract type for specifying the configuration parameters of a Channel. Use Config -> Config modifier functions to modify the default configuration. See the individual modifier documentation for default values.BstreamlySpecify when the Channel should stop.Cstreamly Stop when the first stream ends.DstreamlyStop when all the streams end.EstreamlyStop when any one stream ends.Fstreamly6Specifies the stream yield rate in yields per second (Hertz*). We keep accumulating yield credits at I. At any point of time we allow only as many yields as we have accumulated as per I since the start of time. If the consumer or the producer is slower or faster, the actual rate may fall behind or exceed I. We try to recover the gap between the two by increasing or decreasing the pull rate from the producer. However, if the gap becomes more than K$ we try to recover only as much as K.H puts a bound on how low the instantaneous rate can go when recovering the rate gap. In other words, it determines the maximum yield latency. Similarly, J puts a bound on how high the instantaneous rate can go when recovering the rate gap. In other words, it determines the minimum yield latency. We reduce the latency by increasing concurrency, therefore we can say that it puts an upper bound on concurrency.If the I; is 0 or negative the stream never yields a value. If the K/ is 0 or negative we do not attempt to recover.HstreamlyThe lower rate limitIstreamly"The target rate we want to achieveJstreamlyThe upper rate limitKstreamlyMaximum slack from the goalWstreamly Rate control.\streamly LOCKINGUnlocked access. Modified by the consumer thread and unsafely read by the worker threads]streamlyActual latency/througput as seen from the consumer side, we count the yields and the time it took to generates those yields. This is used to increase or decrease the number of workers needed to achieve the desired rate. The idle time of workers is adjusted in this, so that we only account for the rate when the consumer actually demands data. XXX interval latency is enough, we can move this under diagnostics build [LOCKING] Unlocked access. Modified by the consumer thread and unsafely read by the worker threads_streamlyAfter how many yields the worker should update the latency information. If the latency is high, this count is kept lower and vice-versa. XXX If the latency suddenly becomes too high this count may remain too high for long time, in such cases the consumer can change it. 0 means no latency computation XXX this is derivable from workerMeasuredLatency, can be removed. [LOCKING] Unlocked access. Modified by the consumer thread and unsafely read by the worker threads`streamlyThis is in progress latency stats maintained by the workers which we empty into workerCollectedLatency stats at certain intervals - whenever we process the stream elements yielded in this period. The first count is all yields, the second count is only those yields for which the latency was measured to be non-zero (note that if the timer resolution is low the measured latency may be zero e.g. on JS platform). [LOCKING] Locked access. Modified by the consumer thread as well as worker threads. Workers modify it periodically based on workerPollingInterval and not on every yield to reduce the locking overhead. (allYieldCount, yieldCount, timeTaken)astreamlyThis is the second level stat which is an accmulation from workerPendingLatency stats. We keep accumulating latencies in this bucket until we have stats for a sufficient period and then we reset it to start collecting for the next period and retain the computed average latency for the last period in workerMeasuredLatency. [LOCKING] Unlocked access. Modified by the consumer thread and unsafely read by the worker threads (allYieldCount, yieldCount, timeTaken)bstreamlyLatency as measured by workers, aggregated for the last period. [LOCKING] Unlocked access. Modified by the consumer thread and unsafely read by the worker threadsgstreamlyWe measure the individual worker latencies to estimate the number of workers needed or the amount of time we have to sleep between dispatches to achieve a particular rate when controlled pace mode it used.istreamly0 means unlimitedjstreamly-total number of yields by the worker till nowkstreamlyyieldCount at start, timestamplstreamly7Events that a child thread may send to a parent thread.pstreamlyChannel driver throws this exception to all active workers to clean up the channel.wstreamlyA magical value for the buffer size arrived at by running the smallest possible task and measuring the optimal value of the buffer for that. This is obviously dependent on hardware, this figure is based on a 2.2GHz intel core-i7 processor.xstreamlyThe fields prefixed by an _ are not to be accessed or updated directly but via smart accessor APIs. Use get/set routines instead of directly accessing the Config fields{streamlySpecify the maximum number of threads that can be spawned by the channel. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500.When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.}streamlySpecify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.CAUTION! using an unbounded }: value (i.e. a negative value) coupled with an unbounded { value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when  is used in  ZipAsyncM streams as  in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.streamly0Specify the stream evaluation rate of a channel.A  value means there is no smart rate control, concurrent execution blocks only if { or } is reached, or there are no more concurrent tasks to execute. This is the default.When rate (throughput) is specified, concurrent production may be ramped up or down automatically to achieve the specified stream throughput. The specific behavior for different styles of F% specifications is documented under F. The effective maximum production rate achieved by a channel is governed by:The { limitThe } limit5The maximum rate that the stream producer can achieve5The maximum rate that the stream consumer can achieve$Maximum production rate is given by:!rate = \frac{maxThreads}{latency}7If we know the average latency of the tasks we can set { accordingly.streamly"Print debug information about the Channel when the stream ends.streamlyBy default, processing of output from the worker threads is given priority over dispatching new workers. More workers are dispatched only when there is no output to process. When  is set to , workers are dispatched aggresively as long as there is more work to do irrespective of whether there is output pending to be processed by the stream consumer. However, dispatching may stop if { or } is reached.Note:8 This option has no effect when rate has been specified.Note: Not supported with .streamlySpecify when the Channel should stop.streamlyWhen enabled the streams may be evaluated cocnurrently but the results are produced in the same sequence as a serial evaluation would produce.Note: Not supported with .streamlyInterleave the streams fairly instead of prioritizing the left stream. This schedules all streams in a round robin fashion over limited number of threads.Note:. Can only be used on finite number of streams.Note: Not supported with .streamly/Spawn bound threads (i.e., spawn threads using forkOS instead of forkIO). The default value is .parList modifier = Stream.parConcat modifier . Stream.fromListstreamlyLike concat but works on a list of streams.parListLazy = Stream.parList idstreamlyLike  but interleaves the streams fairly instead of prioritizing the left stream. This schedules all streams in a round robin fashion over limited number of threads.=parListInterleaved = Stream.parList (Stream.interleaved True)streamlyLike  but with  on.5parListOrdered = Stream.parList (Stream.ordered True)streamlyLike  but with  on.1parListEager = Stream.parList (Stream.eager True)streamlyLike 8 but stops the output as soon as the first stream stops.parListEagerFst = Stream.parList (Stream.eager True . Stream.stopWhen Stream.FirstStops)streamlyLike ? but stops the output as soon as any of the two streams stops. Definition:parListEagerMin = Stream.parList (Stream.eager True . Stream.stopWhen Stream.AnyStops)streamlyApply an argument stream to a function stream concurrently. Uses a shared channel for all individual applications within a stream application.streamly Definition:parMapM modifier f = Stream.parConcatMap modifier (Stream.fromEffect . f)For example, the following finishes in 3 seconds (as opposed to 6 seconds) because all actions run in parallel. Even though results are available out of order they are ordered due to the config option:f x = delay x >> return xStream.fold Fold.toList $ Stream.parMapM (Stream.ordered True) f $ Stream.fromList [3,2,1]1 sec2 sec3 sec[3,2,1]streamly Definition:1parSequence modifier = Stream.parMapM modifier idUseful idioms:6parFromListM = Stream.parSequence id . Stream.fromListparFromFoldableM = Stream.parSequence id . StreamK.toStream . StreamK.fromFoldablestreamlyEvaluates the streams being zipped in separate threads than the consumer. The zip function is evaluated in the consumer thread.parZipWithM cfg f m1 m2 = Stream.zipWithM f (Stream.parEval cfg m1) (Stream.parEval cfg m2)Multi-stream concurrency options won't apply here, see the notes in .If you want to evaluate the zip function as well in a separate thread, you can use a  on .streamlyparZipWith cfg f = Stream.parZipWithM cfg (\a b -> return $ f a b)m1 = Stream.fromList [1,2,3]m2 = Stream.fromList [4,5,6]8Stream.fold Fold.toList $ Stream.parZipWith id (,) m1 m2[(1,4),(2,5),(3,6)]streamlyLike mergeByM- but evaluates both the streams concurrently. Definition:parMergeByM cfg f m1 m2 = Stream.mergeByM f (Stream.parEval cfg m1) (Stream.parEval cfg m2)streamlyLike mergeBy- but evaluates both the streams concurrently. Definition:parMergeBy cfg f = Stream.parMergeByM cfg (\a b -> return $ f a b)streamlySame as  concatIterate but concurrent. Pre-releasestreamly Definition:7parRepeatM cfg = Stream.parSequence cfg . Stream.repeatGenerate a stream by repeatedly executing a monadic action forever.streamly>Generate a stream by concurrently performing a monadic action n times. Definition:parReplicateM cfg n = Stream.parSequence cfg . Stream.replicate n Example,  in the following example executes all the replicated actions concurrently, thus taking only 1 second:=Stream.fold Fold.drain $ Stream.parReplicateM id 10 $ delay 1... streamlyReturns an entangled pair of a callback and a stream i.e. whenever the callback is called a value appears in the stream. The stream is infinite, there is no way for the callback to indicate that it is done now.The callback queues a value to a concurrent channel associated with the stream. The stream can be evaluated safely in any thread. Pre-releasestreamlyfromCallback f creates an entangled pair of a callback and a stream i.e. whenever the callback is called a value appears in the stream. The function f is invoked with the callback as argument, and the stream is returned. f would store the callback for calling it later for generating values in the stream.The callback queues a value to a concurrent channel associated with the stream. The stream can be evaluated safely in any thread. Pre-releasestreamly!parTapCount predicate fold stream? taps the count of those elements in the stream that pass the  predicate+. The resulting count stream is sent to a fold( running concurrently in another thread.For example, to print the count of elements processed every second:9rate = Stream.rollingMap2 (flip (-)) . Stream.delayPost 12report = Stream.fold (Fold.drainMapM print) . rate,tap = Stream.parTapCount (const True) report:go = Stream.fold Fold.drain $ tap $ Stream.enumerateFrom 0Note: This may not work correctly on 32-bit machines because of Int overflow. Pre-releasestreamlySame as  . Deprecated.3ABECDFGHJKIx{}6!(c) 2019 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:$streamlyLike  but can use 3 separate cleanup actions depending on the mode of termination: When the stream stops normally$When the stream is garbage collected'When the stream encounters an exception.bracket3 before onStop onGC onException action runs action using the result of before. If the stream stops, onStop1 action is executed, if the stream is abandoned onGC5 is executed, if the stream encounters an exception  onException is executed.,The exception is not caught, it is rethrown. Pre-releasestreamlyRun the alloc action IO b with async exceptions disabled but keeping blocking operations interruptible (see 78). Use the output b+ of the IO action as input to the function b -> Stream m a to generate an output stream.b is usually a resource under the IO monad, e.g. a file handle, that requires a cleanup after use. The cleanup action b -> m c, runs whenever (1) the stream ends normally, (2) due to a sync or async exception or, (3) if it gets garbage collected after a partial lazy evaluation. The exception is not caught, it is rethrown. only guarantees that the cleanup action runs, and it runs with async exceptions enabled. The action must ensure that it can successfully cleanup the resource in the face of sync or async exceptions.When the stream ends normally or on a sync exception, cleanup action runs immediately in the current thread context, whereas in other cases it runs in the GC context, therefore, cleanup may be delayed until the GC gets to run. See also: bracket_Inhibits stream fusionstreamlyRun the action m b whenever the stream  Stream m a stops normally, aborts due to an exception or if it is garbage collected after a partial lazy evaluation.$The semantics of running the action m b; are similar to the cleanup action semantics described in .finally action xs = Stream.bracket (return ()) (const action) (const xs) See also finally_Inhibits stream fusionstreamlyRun the action m b whenever the stream  Stream m a stops normally, or if it is garbage collected after a partial lazy evaluation.The semantics of the action m b4 are similar to the semantics of cleanup action in . See also after_streamlySee 9:streamlyretry takes 3 arguments A map m whose keys are exceptions and values are the number of times to retry the action given that the exception occurs. A handler han that decides how to handle an exception when the exception cannot be retried.8The stream itself that we want to run this mechanism on.0When evaluating a stream if an exception occurs, The stream evaluation abortsThe exception is looked up in m 0 then,i. The value is decreased by 1.ii. The stream is resumed from where the exception was called, retrying the action.b. If the exception exists and the mapped value is == 0 then the stream evaluation stops.c. If the exception does not exist then we handle the exception using han.Internal streamlybeforestreamlyon normal stopstreamly on exceptionstreamly&on GC without normal stop or exceptionstreamlytry (exception handling)streamlystream generatorstreamly!map from exception to retry countstreamly ZipStream m amkZipStream = ZipStream1unZipStream :: ZipStream m a -> Stream.Stream m a#unZipStream (ZipStream strm) = strm/deriving instance IsList (ZipStream Identity a)deriving instance a ~ GHC.Types.Char => IsString (ZipStream Identity a)?deriving instance GHC.Classes.Eq a => Eq (ZipStream Identity a)deriving instance GHC.Classes.Ord a => Ord (ZipStream Identity a).instance Show a => Show (ZipStream Identity a) where {{-# INLINE show #-}; show (ZipStream strm) = show strm}.instance Read a => Read (ZipStream Identity a) where {{-# INLINE readPrec #-}; readPrec = fmap ZipStream readPrec})instance Monad m => Functor (ZipStream m) where {{-# INLINE fmap #-};= fmap f (ZipStream strm) = ZipStream (fmap f strm)}-instance Monad m => Applicative (ZipStream m) where {{-# INLINE pure #-};, pure = ZipStream . Stream.repeat; {-# INLINE (<*>) #-}; (<*>) (ZipStream strm1) (ZipStream strm2) = ZipStream (zipApply strm1 strm2)}streamly:Create a type with specific stream combination properties.4expr <- runQ (mkCrossType "Parallel" "parBind" True)putStrLn $ pprint expr3newtype Parallel m a = Parallel (Stream.Stream m a)/mkParallel :: Stream.Stream m a -> Parallel m amkParallel = Parallel/unParallel :: Parallel m a -> Stream.Stream m a!unParallel (Parallel strm) = strm(instance Monad m => Functor (Parallel m) where {{-# INLINE fmap #-};; fmap f (Parallel strm) = Parallel (fmap f strm)}2instance Stream.MonadAsync m => Monad (Parallel m) where {{-# INLINE (>>=) #-}; (>>=) (Parallel strm1) f = let f1 a = unParallel (f a) in Parallel (parBind strm1 f1)}8instance Stream.MonadAsync m => Applicative (Parallel m) where {{-# INLINE pure #-};- pure = Parallel . Stream.fromPure; {-# INLINE (<*>) #-}; (<*>) = ap}instance (Monad (Parallel m), MonadIO m) => MonadIO (Parallel m)! where {{-# INLINE liftIO #-};< liftIO = Parallel . (Stream.fromEffect . liftIO)}instance (Monad (Parallel m),2 MonadThrow m) => MonadThrow (Parallel m)! where {{-# INLINE throwM #-};< throwM = Parallel . (Stream.fromEffect . throwM)}streamlyName of the typestreamlyFunction to use for (<*>)streamly5 if (<*>) requires MonadAsync constraint (concurrent)streamlyName of the typestreamlyFunction to use for (>>=)streamly5 if (>>=) requires MonadAsync constraint (concurrent)      !(c) 2022 Composewell Technologies BSD-3-Clausestreamly@composewell.comreleasedGHC Safe-Inferred! $%'.15789:ͪ      !(c) 2017 Composewell TechnologiesBSD3streamly@composewell.com experimentalGHC Safe-Inferred" $%'.15789:ڒstreamly5An interleaving serial IO stream of elements of type a. See ! documentation for more details.Since: 0.2.0 (Streamly)streamlyFor  streams: (<>) = ;< --   (>>=) = flip . ;= ;< --    Note that   is associative only if we disregard the ordering of elements in the resulting stream. A single   bind behaves like a for loop::{+IsStream.toList $ IsStream.fromWSerial $ do8 x <- IsStream.fromList [1,2] -- foreach x in stream return x:}[1,2]2Nested monad binds behave like interleaved nested for loops::{+IsStream.toList $ IsStream.fromWSerial $ do7 x <- IsStream.fromList [1,2] -- foreach x in stream7 y <- IsStream.fromList [3,4] -- foreach y in stream return (x, y):}[(1,3),(2,3),(1,4),(2,4)]It is a result of interleaving all the nested iterations corresponding to element 1 in the first stream with all the nested iterations of element 2:!import Streamly.Prelude (wSerial)IsStream.toList $ IsStream.fromList [(1,3),(1,4)] `IsStream.wSerial` IsStream.fromList [(2,3),(2,4)][(1,3),(2,3),(1,4),(2,4)]The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of .Since: 0.2.0 (Streamly)streamly'A serial IO stream of elements of type a. See ! documentation for more details.Since: 0.2.0 (Streamly)streamlyFor  streams: (<>) = ;> --   (>>=) = flip . ;= ;> --    A single   bind behaves like a for loop::{IsStream.toList $ do8 x <- IsStream.fromList [1,2] -- foreach x in stream return x:}[1,2]&Nested monad binds behave like nested for loops::{IsStream.toList $ do7 x <- IsStream.fromList [1,2] -- foreach x in stream7 y <- IsStream.fromList [3,4] -- foreach y in stream return (x, y):}[(1,3),(1,4),(2,3),(2,4)]Since: 0.2.0 (Streamly)streamly6Generate an infinite stream by repeating a pure value.streamly  map = fmap Same as  . 5> S.toList $ S.map (+1) $ S.fromList [1,2,3] [2,3,4] streamlyInterleaves two streams, yielding one element from each stream alternately. When one stream stops the rest of the other stream is used in the output stream.This gives exponential priority to earlier streams than the ones joining later. Because of exponential weighting it can be used with  concatMapWith. Not fusedstreamlyBuild a stream by unfolding a monadic step function starting from a seed. The step function returns the next element in the stream and the next seed value. When it is done it returns # and the stream ends. For example, let f b = if b > 3 then return Nothing else print b >> return (Just (b, b + 1)) in drain $ unfoldrM f 0   0 1 2 3  Pre-release556?!(c) 2017 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:streamlyWrite a stream to an   in a non-blocking manner. The stream can then be read back from the SVar using . streamlyPull a stream from an SVar.streamlyGenerate a stream from an SVar. An unevaluated stream can be pushed to an SVar using . As we pull a stream from the SVar the input stream gets evaluated concurrently. The evaluation depends on the SVar style and the configuration parameters e.g. using the maxBuffer/maxThreads combinators.streamlyLike 5 but generates a StreamD style stream instead of CPS.@!(c) 2017 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:+streamlyFold the supplied stream to the SVar asynchronously using Parallel concurrency style. {-# INLINE [1] toSVarParallel #-} streamly,Pull a stream from an SVar to fold it. Like fromSVar except that it does not drive the evaluation of the stream. It just pulls whatever is available on the SVar. Also, when the fold stops it sends a notification to the stream pusher/producer. No exceptions are expected to be propagated from the stream pusher to the fold puller.streamlyCreate a Fold style SVar that runs a supplied fold function as the consumer. Any elements sent to the SVar are consumed by the supplied fold function. streamlyLike  : but generates a StreamD style stream instead of StreamK.streamlyLike  except that it uses a  instead of a fold function.streamlyPoll for events sent by the fold consumer to the stream pusher. The fold consumer can send a Stop event or an exception. When a Stop$ is received this function returns <. If an exception is recieved then it throws the exception.streamlyPush values from a stream to a fold worker via an SVar. Before pushing a value to the SVar it polls for events received from the fold consumer. If a stop event is received then it returns  otherwise false. Propagates exceptions received from the fold consumer.streamlyTap a stream and send the elements to the specified SVar in addition to yielding them again. The SVar runs a fold consumer. Elements are tapped and sent to the SVar until the fold finishes. Any exceptions from the fold evaluation are propagated in the current thread.  ------input stream---------output stream-----> /|\ | exceptions | | input | \|/ ----SVar | Fold !(c) 2022 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:  !(c) 2017 Composewell TechnologiesBSD3streamly@composewell.com experimentalGHC Safe-Inferred" $%'.15789:; streamly4A parallely composing IO stream of elements of type a. See  documentation for more details.Since: 0.2.0 (Streamly)streamlyFor  streams: (<>) = ;A (>>=) = flip . ;= ;A See ;B,  is similar except that all iterations are strictly concurrent while in AsyncT? it depends on the consumer demand and available threads. See parallel for more details.Since: 0.1.0 (Streamly)5Since: 0.7.0 (maxBuffer applies to ParallelT streams)streamlyXXX we can implement it more efficienty by directly implementing instead of combining streams using parallel.streamlyLike parallel8 but stops the output as soon as the first stream stops. Pre-releasestreamlyLike parallel? but stops the output as soon as any of the two streams stops. Pre-releasestreamlyLike  mkParallel but uses StreamK internally. Pre-releasestreamlySame as  mkParallel but for StreamD stream.streamlyRedirect a copy of the stream to a supplied fold and run it concurrently in an independent thread. The fold may buffer some elements. The buffer size is determined by the prevailing ;C setting.  StreamK m a -> m b | -----stream m a ---------------stream m a-----  > S.drain $ S.tapAsync (S.mapM_ print) (S.enumerateFromTo 1 2) 1 2 Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained. Compare with tap. Pre-releasestreamlyLike tapAsync but uses a  instead of a fold function.streamlyGenerates a callback and a stream pair. The callback returned is used to queue values to the stream. The stream is infinite, there is no way for the callback to indicate that it is done now. Pre-release  !(c) 2017 Composewell TechnologiesBSD3streamly@composewell.com experimentalGHC Safe-Inferred" $%'.15789: streamlyA round robin parallely composing IO stream of elements of type a. See  documentation for more details.Since: 0.2.0 (Streamly)streamlyFor  streams: (<>) = ;D (>>=) = flip . ;= ;D  A single   bind behaves like a for loop with iterations of the loop executed concurrently a la the wAsync combinator, producing results and side effects of iterations out of order::{&Stream.toList $ Stream.fromWAsync $ do6 x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x:}1 sec2 sec[1,2]&Nested monad binds behave like nested for? loops with nested iterations executed concurrently, a la the wAsync combinator::{ &Stream.toList $ Stream.fromWAsync $ do5 x <- Stream.fromList [1,2] -- foreach x in stream5 y <- Stream.fromList [2,4] -- foreach y in stream% Stream.fromEffect $ delay (x + y):}3 sec4 sec5 sec6 sec [3,4,5,6]The behavior can be explained as follows. All the iterations corresponding to the element 1$ in the first stream constitute one 8 output stream and all the iterations corresponding to 2 constitute another > output stream and these two output streams are merged using wAsync.The W in the name stands for wide or breadth wise scheduling in contrast to the depth wise scheduling behavior of .Since: 0.2.0 (Streamly)streamlyA demand driven left biased parallely composing IO stream of elements of type a. See  documentation for more details.Since: 0.2.0 (Streamly)streamlyFor  streams: (<>) = ;E (>>=) = flip . ;= ;E  A single   bind behaves like a for loop with iterations of the loop executed concurrently a la the async combinator, producing results and side effects of iterations out of order::{%Stream.toList $ Stream.fromAsync $ do6 x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x:}1 sec2 sec[1,2]&Nested monad binds behave like nested for? loops with nested iterations executed concurrently, a la the async combinator::{ %Stream.toList $ Stream.fromAsync $ do5 x <- Stream.fromList [1,2] -- foreach x in stream5 y <- Stream.fromList [2,4] -- foreach y in stream% Stream.fromEffect $ delay (x + y):}3 sec4 sec5 sec6 sec [3,4,5,6]The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one output stream and all the iterations corresponding to 2 constitute another output stream and these two output streams are merged using async.Since: 0.1.0 (Streamly)streamlyGenerate a stream asynchronously to keep it buffered, lazily consume from the buffer. Pre-release streamly;Create a new SVar and enqueue one stream computation on it. streamly/Join two computations on the currently running   queue for concurrent execution. When we are using parallel composition, an SVar is passed around as a state variable. We try to schedule a new parallel computation on the SVar passed to us. The first time, when no SVar exists, a new SVar is created. Subsequently,   may get called when a computation already scheduled on the SVar is further evaluated. For example, when (a parallel b) is evaluated it calls a   to put a and b! on the current scheduler queue.The   required by the current composition context is passed as one of the parameters. If the scheduling and composition style of the new computation being scheduled is different than the style of the current SVar, then we create a new SVar and schedule it on that. The newly created SVar joins as one of the computations on the current SVar queue.+Cases when we need to switch to a new SVar:(x parallel y) parallel (t parallel1 u) -- all of them get scheduled on the same SVar(x parallel y) parallel (t async u) -- t and u get scheduled on a new child SVar because of the scheduling policy change.if we adapt a stream of type async to a stream of type Parallel1, we create a new SVar at the transitioning bind.When the stream is switching from disjunctive composition to conjunctive composition and vice-versa we create a new SVar to isolate the scheduling of the two.streamlyXXX we can implement it more efficienty by directly implementing instead of combining streams using async.streamlyXXX we can implement it more efficienty by directly implementing instead of combining streams using wAsync.!(c) 2017 Composewell TechnologiesBSD3streamly@composewell.com experimentalGHC Safe-Inferred" $%'.15789: streamly'A serial IO stream of elements of type a" with concurrent lookahead. See  documentation for more details.Since: 0.3.0 (Streamly)streamlyFor  streams: (<>) = ;F (>>=) = flip . ;= ;F  A single   bind behaves like a for loop with iterations executed concurrently, ahead of time, producing side effects of iterations out of order, but results in order::{%Stream.toList $ Stream.fromAhead $ do6 x <- Stream.fromList [2,1] -- foreach x in stream Stream.fromEffect $ delay x:}1 sec2 sec[2,1]&Nested monad binds behave like nested for loops with nested iterations executed concurrently, ahead of time::{ %Stream.toList $ Stream.fromAhead $ do5 x <- Stream.fromList [1,2] -- foreach x in stream5 y <- Stream.fromList [2,4] -- foreach y in stream% Stream.fromEffect $ delay (x + y):}3 sec4 sec5 sec6 sec [3,5,4,6]The behavior can be explained as follows. All the iterations corresponding to the element 1 in the first stream constitute one output stream and all the iterations corresponding to 2 constitute another output stream and these two output streams are merged using ahead.Since: 0.3.0 (Streamly)streamlyXXX we can implement it more efficienty by directly implementing instead of combining streams using ahead.G!(c) 2020 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:*n streamlytime since last event streamlytime as per last event We can use the Map size instead of maintaining a count, but if we have to switch to HashMap then it can be useful. streamly$total number of sessions in progress streamlyheap for timeouts streamlyStored sessions for keys streamlyCompleted sessionsstreamlyGenerate a stream by running an action periodically at the specified time interval.streamly%Generate a tick stream consisting of () elements, each tick is generated after the specified time delay given in seconds.#ticks = Stream.periodic (return ())streamly=Generate a tick stream, ticks are generated at the specified F. The rate is adaptive, the tick generation speed can be increased or decreased at different times to achieve the specified rate. The specific behavior for different styles of F$ specifications is documented under F. The effective maximum rate achieved by a stream is governed by the processor speed.'tickStream = Stream.repeatM (return ())>ticksRate r = Stream.parEval (Stream.rate (Just r)) tickStreamstreamly?Intersperse a monadic action into the input stream after every n seconds. Definition:interject n f xs = Stream.parListEagerFst [xs, Stream.periodic f n]Example:s = Stream.fromList "hello">input = Stream.mapM (\x -> threadDelay 1000000 >> putChar x) sStream.fold Fold.drain $ Stream.interject (putChar ',') 1.05 input h,e,l,l,ostreamlytakeInterval interval/ runs the stream only upto the specified time interval in seconds.The interval starts when the stream is evaluated for the first time.streamlyTake time interval i" seconds at the end of the stream.1O(n) space, where n is the number elements taken. UnimplementedstreamlydropInterval interval drops all the stream elements that are generated before the specified interval in seconds has passed.The interval begins when the stream is evaluated for the first time.streamlyDrop time interval i" seconds at the end of the stream.3O(n) space, where n is the number elements dropped. Unimplementedstreamly'Group the input stream into windows of n second each and then fold each group using the provided fold function.twoPerSec = Stream.parEval (Stream.constRate 2) $ Stream.enumerateFrom 16intervals = Stream.intervalsOf 1 Fold.toList twoPerSec1Stream.fold Fold.toList $ Stream.take 2 intervals [...,...]streamlyLike chunksOf but if the chunk is not completed within the specified time interval then emit whatever we have collected till now. The chunk timeout is reset whenever a chunk is emitted. The granularity of the clock is 100 ms.4s = Stream.delayPost 0.3 $ Stream.fromList [1..1000]f = Stream.fold (Fold.drainMapM print) $ Stream.groupsOfTimeout 5 1 Fold.toList s Pre-releasestreamly?classifySessionsBy tick keepalive predicate timeout fold stream classifies an input event stream consisting of (timestamp, (key, value)) into sessions based on the key, folding all the values corresponding to the same key into a session using the supplied fold.When the fold terminates or a timeout occurs, a tuple consisting of the session key and the folded value is emitted in the output stream. The timeout is measured from the first event in the session. If the  keepalive option is set to : the timeout is reset to 0 whenever an event is received.The  timestamp in the input stream is an absolute time from some epoch, characterizing the time when the input event was generated. The notion of current time is maintained by a monotonic event time clock using the timestamps seen in the input stream. The latest timestamp seen till now is used as the base for the current time. When no new events are seen, a timer is started with a clock resolution of tick seconds. This timer is used to detect session timeouts in the absence of new events.To ensure an upper bound on the memory used the number of sessions can be limited to an upper bound. If the ejection  predicate returns , the oldest session is ejected before inserting a new session.When the stream ends any buffered sessions are ejected immediately.If a session key is received even after a session has finished, another session is created for that key.:{ "Stream.fold (Fold.drainMapM print) $ Stream.classifySessionsBy 1 False (const (return False)) 3 (Fold.take 3 Fold.toList) $ Stream.timestamped $ Stream.delay 0.19 $ Stream.fromList ((,) <$> [1,2,3] <*> ['a','b','c']):} (1,"abc") (2,"abc") (3,"abc") Pre-releasestreamlySame as < with a timer tick of 1 second and keepalive option set to . 6classifyKeepAliveSessions = classifySessionsBy 1 True  Pre-releasestreamlySame as < with a timer tick of 1 second and keepalive option set to .6classifySessionsOf = Stream.classifySessionsBy 1 False Pre-releasestreamlyContinuously evaluate the input stream and sample the last event in each time window of n seconds.This is also known as throttle in some libraries.sampleIntervalEnd n = Stream.catMaybes . Stream.intervalsOf n Fold.lateststreamlyLike sampleInterval1 but samples at the beginning of the time window.sampleIntervalStart n = Stream.catMaybes . Stream.intervalsOf n Fold.onestreamlySample one event at the end of each burst of events. A burst is a group of events close together in time, it ends when an event is spaced by more than the specified time interval (in seconds) from the previous event.This is known as debounce in some libraries.The clock granularity is 10 ms.streamlyLike  but samples the event at the beginning of the burst instead of at the end of it.streamlyEvaluate the input stream continuously and keep only the oldest n elements in the buffer, discard the new ones when the buffer is full. When the output stream is evaluated the collected buffer is streamed and the buffer starts filling again. UnimplementedstreamlyEvaluate the input stream continuously and keep only the latest n elements in a ring buffer, keep discarding the older ones to make space for the new ones. When the output stream is evaluated the buffer collected till now is streamed and it starts filling again. UnimplementedstreamlyAlways produce the latest available element from the stream without any delay. The stream is continuously evaluated at the highest possible rate and only the latest element is retained for sampling. Unimplementedstreamlytimer tick in secondsstreamly)reset the timer when an event is receivedstreamly2predicate to eject sessions based on session countstreamlysession timeout in secondsstreamly"Fold to be applied to session datastreamly'timestamp, (session key, session data)streamlysession key, fold resultstreamlytimer tick in secondsstreamly)reset the timer when an event is receivedstreamly2predicate to eject sessions based on session countstreamlysession timeout in secondsstreamly"Fold to be applied to session datastreamly×tamp, (session key, session data)streamlysession key, fold resultstreamly,predicate to eject sessions on session countstreamlysession inactive timeoutstreamly*Fold to be applied to session payload datastreamly×tamp, (session key, session data)streamly,predicate to eject sessions on session countstreamlytime window sizestreamly"Fold to be applied to session datastreamly×tamp, (session key, session data)!(c) 2022 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:+M3ABECDFGHJKIx{}Ax{}FGKIJHBCDE3H!(c) 2022 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:. 3ABCDEFGKIJH{} 43A{}FGKIJHBCDE!(c) 2017 Composewell TechnologiesBSD3streamly@composewell.com experimentalGHC Safe-Inferred" $%'.15789:3_streamly>An IO stream whose applicative instance zips streams serially.Since: 0.2.0 (Streamly)streamlystreamlyFor  streams: (<>) = ;> ( *) = !'Streamly.Prelude.serial.zipWith' id 8Applicative evaluates the streams being zipped serially:s1 = Stream.fromFoldable [1, 2]s2 = Stream.fromFoldable [3, 4]s3 = Stream.fromFoldable [5, 6]Stream.toList $ Stream.fromZipSerial $ (,,) <$> s1 <*> s2 <*> s3[(1,3,5),(2,4,6)]Since: 0.2.0 (Streamly) streamlyAn IO stream whose applicative instance zips streams concurrently. Note that it uses the default concurrency options.(s = ZipConcurrent $ D.fromList [1, 2, 3]x = (,,) <$> s <*> s <*> s'D.fold Fold.toList (getZipConcurrent x)[(1,1,1),(2,2,2),(3,3,3)]  !(c) 2017 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred" $%'.15789:9Zstreamly>An IO stream whose applicative instance zips streams wAsyncly.Since: 0.2.0 (Streamly)streamlyFor  streams: (<>) = ;> ( *) = &'Streamly.Prelude.serial.zipAsyncWith' id Applicative evaluates the streams being zipped concurrently, the following would take half the time that it would take in serial zipping:6s = Stream.fromFoldableM $ Prelude.map delay [1, 1, 1]5Stream.toList $ Stream.fromZipAsync $ (,) <$> s <*> s...[(1,1),(1,1),(1,1)]Since: 0.2.0 (Streamly)streamlyLike  zipAsyncWith% but with a monadic zipping function.streamlyLike zipWith but zips concurrently i.e. both the streams being zipped are evaluated concurrently using the  ParallelT concurrent evaluation style. The maximum number of elements of each stream evaluated in advance can be controlled by  maxBuffer.The stream ends if stream a or stream b ends. However, if stream b, ends while we are still evaluating stream a and waiting for a result then stream will not end until after the evaluation of stream a finishes. This behavior can potentially be changed in future to end the stream immediately as soon as any of the stream end is detected.I!(c) 2017 Composewell TechnologiesBSD3streamly@composewell.com experimentalGHC Safe-Inferred" $%'.15789:NstreamlyClass of types that can represent a stream of elements of some type a in some monad m.Since: 0.2.0 (Streamly)streamlyConstructs a stream by adding a monadic action at the head of an existing stream. For example: > toList $ getLine `consM` getLine `consM` nil hello world ["hello","world"] Concurrent (do not use  to construct infinite streams)streamlyOperator equivalent of . We can read it as "parallel colon" to remember that | comes before :. > toList $ getLine |: getLine |: nil hello world ["hello","world"]  let delay = threadDelay 1000000 >> print 1 drain $ fromSerial $ delay |: delay |: delay |: nil drain $ fromParallel $ delay |: delay |: delay |: nil Concurrent (do not use  to construct infinite streams)streamlyAdapt any specific stream type to any other specific stream type.Since: 0.1.0 (Streamly)streamly?Adapt a polymorphic consM operation to a StreamK cons operation streamly Compare two streams for equality streamlyCompare two streamsstreamly  fromList =       Construct a stream from a list of pure values. This is more efficient than   for serial streams. streamly5Convert a stream into a list in the underlying monad.streamlyBuild a stream from an SVar, a stop continuation, a singleton stream continuation and a yield continuation.streamlyLike #, but with a monadic step function.streamlyStrict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction. streamlyStrict left associative fold.streamlyFold a stream by providing an SVar, a stop continuation, a singleton continuation and a yield continuation. The stream would share the current SVar passed via the State.streamlyFold a stream by providing a State, stop continuation, a singleton continuation and a yield continuation. The stream will not use the SVar passed via State.streamly(Fix the type of a polymorphic stream as .Since: 0.1.0 (Streamly)streamly(Fix the type of a polymorphic stream as .Since: 0.2.0 (Streamly)streamly(Fix the type of a polymorphic stream as .Since: 0.1.0 (Streamly)streamly(Fix the type of a polymorphic stream as .Since: 0.2.0 (Streamly)streamly(Fix the type of a polymorphic stream as .Since: 0.3.0 (Streamly)streamly(Fix the type of a polymorphic stream as .Since: 0.1.0 (Streamly)streamly(Fix the type of a polymorphic stream as .Since: 0.2.0 (Streamly)streamly(Fix the type of a polymorphic stream as .Since: 0.2.0 (Streamly)streamlyConstruct a stream by adding a pure value at the head of an existing stream. For serial streams this is the same as (return a) `consM` r but more efficient. For concurrent streams this is not concurrent whereas  is concurrent. For example: 2> toList $ 1 `cons` 2 `cons` 3 `cons` nil [1,2,3] streamlyOperator equivalent of . &> toList $ 1 .: 2 .: 3 .: nil [1,2,3] streamly$concatMapWith mixer generator stream0 is a two dimensional looping combinator. The  generator function is used to generate streams from the elements in the input stream and the mixer* function is used to merge those streams.Note we can merge streams concurrently by using a concurrent merge function. Since: 0.7.0Since: 0.8.0 (signature change)streamly A variant of foldMap9 that allows you to map a monadic streaming action on a   container and then fold it using the specified stream merge operation. concatMapFoldableWith async return [1..3]Equivalent to: concatMapFoldableWith f g = Prelude.foldr (f . g) S.nil concatMapFoldableWith f g xs = S.concatMapWith f g (S.fromFoldable xs) ;Since: 0.8.0 (Renamed foldMapWith to concatMapFoldableWith)Since: 0.1.0 (Streamly)streamlyLike  but with the last two arguments reversed i.e. the monadic streaming function is the last argument.Equivalent to: concatForFoldableWith f xs g = Prelude.foldr (f . g) D.nil xs concatForFoldableWith f = flip (D.concatMapFoldableWith f) ;Since: 0.8.0 (Renamed forEachWith to concatForFoldableWith)Since: 0.1.0 (Streamly)streamly A variant of JK that allows you to fold a   container of streams using the specified stream sum operation. concatFoldableWith async $ map return [1..3]Equivalent to: concatFoldableWith f = Prelude.foldr f D.nil concatFoldableWith f = D.concatMapFoldableWith f id 5Since: 0.8.0 (Renamed foldWith to concatFoldableWith)Since: 0.1.0 (Streamly)>#@     5555L!(c) 2019 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:TstreamlyTransform the inner monad of a stream using a natural transformation. Internalstreamly.Generalize the inner monad of the stream from  to any monad. InternalstreamlyLift the inner monad m of a stream t m a to tr m using the monad transformer tr.streamly(Evaluate the inner monad of a stream as  .streamly6Run a stream transformation using a given environment. See also: MN Internalstreamly(Evaluate the inner monad of a stream as  .This is supported only for / as concurrent state updation may not be safe. 2evalStateT s = Stream.map snd . Stream.runStateT s InternalstreamlyRun a stateful (StateT) stream transformation using a given state.This is supported only for / as concurrent state updation may not be safe. .usingStateT s f = evalStateT s . f . liftInner See also: scanl' Internalstreamly(Evaluate the inner monad of a stream as  > and emit the resulting state and value pair after each step.This is supported only for / as concurrent state updation may not be safe.O!(c) 2019 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:i streamlyRun the action m b, before the stream yields its first element.7Same as the following but more efficient due to fusion:+before action xs = Stream.nilM action <> xsbefore action xs = Stream.concatMap (const xs) (Stream.fromEffect action)streamlyLike , with following differences:action m b won't run if the stream is garbage collected after partial evaluation.Monad m( does not require any other constraints.%has slightly better performance than ..Same as the following, but with stream fusion: &after_ action xs = xs <> 'nilM' action Pre-releasestreamlyRun the action m b whenever the stream t m a stops normally, or if it is garbage collected after a partial lazy evaluation.The semantics of the action m b4 are similar to the semantics of cleanup action in . See also streamlyRun the action m b if the stream aborts due to an exception. The exception is not caught, simply rethrown.Inhibits stream fusionstreamlyLike  with following differences:action m b won't run if the stream is garbage collected after partial evaluation.does not require a 3 constraint.%has slightly better performance than .Inhibits stream fusion Pre-releasestreamlyRun the action m b whenever the stream t m a stops normally, aborts due to an exception or if it is garbage collected after a partial lazy evaluation.$The semantics of running the action m b; are similar to the cleanup action semantics described in . See also Inhibits stream fusionstreamlyLike  but with following differences: alloc action m b# runs with async exceptions enabledcleanup action b -> m c won't run if the stream is garbage collected after partial evaluation.does not require a 3 constraint.%has slightly better performance than .Inhibits stream fusion Pre-releasestreamlyRun the alloc action m b with async exceptions disabled but keeping blocking operations interruptible (see 78). Use the output b as input to  b -> t m a to generate an output stream.b0 is usually a resource under the state of monad m, e.g. a file handle, that requires a cleanup after use. The cleanup action b -> m c, runs whenever the stream ends normally, due to a sync or async exception or if it gets garbage collected after a partial lazy evaluation. only guarantees that the cleanup action runs, and it runs with async exceptions enabled. The action must ensure that it can successfully cleanup the resource in the face of sync or async exceptions.When the stream ends normally or on a sync exception, cleanup action runs immediately in the current thread context, whereas in other cases it runs in the GC context, therefore, cleanup may be delayed until the GC gets to run. See also: Inhibits stream fusionstreamlyLike  but can use separate cleanup actions depending on the mode of termination. .bracket' before onStop onGC onException action runs action using the result of before. If the stream stops, onStop1 action is executed, if the stream is abandoned onGC5 is executed, if the stream encounters an exception  onException is executed. Pre-releasestreamlyLike  but the exception handler is also provided with the stream that generated the exception as input. The exception handler can thus re-evaluate the stream to retry the action that failed. The exception handler can again call * on it to retry the action multiple times.This is highly experimental. In a stream of actions we can map the stream with a retry combinator to retry each action on failure.Inhibits stream fusion Pre-releasestreamlyWhen evaluating a stream if an exception occurs, stream evaluation aborts and the specified exception handler is run with the exception as argument.Inhibits stream fusionstreamlyretry takes 3 arguments A map m whose keys are exceptions and values are the number of times to retry the action given that the exception occurs. A handler han that decides how to handle an exception when the exception cannot be retried.8The stream itself that we want to run this mechanism on.0When evaluating a stream if an exception occurs, The stream evaluation abortsThe exception is looked up in m 0 then,i. The value is decreased by 1.ii. The stream is resumed from where the exception was called, retrying the action.b. If the exception exists and the mapped value is == 0 then the stream evaluation stops.c. If the exception does not exist then we handle the exception using han.Internalstreamly!map from exception to retry countstreamly>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom (0 :: Int) [0,1,2,3] For   types, enumeration is numerically stable. However, no overflow or underflow checks are performed. >>> Stream.toList $ Stream.take 4 $ Stream.enumerateFrom 1.1 [1.1,2.1,3.1,4.1] streamly3Generate a finite stream starting with the element from(, enumerating the type up to the value to. If to is smaller than from# then an empty stream is returned. <>>> Stream.toList $ Stream.enumerateFromTo 0 4 [0,1,2,3,4] For  3 types, the last element is equal to the specified to5 value after rounding to the nearest integral value. >>> Stream.toList $ Stream.enumerateFromTo 1.1 4 [1.1,2.1,3.1,4.1] >>> Stream.toList $ Stream.enumerateFromTo 1.1 4.6 [1.1,2.1,3.1,4.1,5.1] streamlyenumerateFromThen from then, generates a stream whose first element is from, the second element is then3 and the successive elements are in increments of  then - from. Enumeration can occur downwards or upwards depending on whether then comes before or after from. For   types the stream ends when   is reached, for unbounded types it keeps enumerating infinitely. >>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 2 [0,2,4,6] >>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThen 0 (-2) [0,-2,-4,-6] streamly enumerateFromThenTo from then to3 generates a finite stream whose first element is from, the second element is then3 and the successive elements are in increments of  then - from up to to. Enumeration can occur downwards or upwards depending on whether then comes before or after from. >>> Stream.toList $ Stream.enumerateFromThenTo 0 2 6 [0,2,4,6] >>> Stream.toList $ Stream.enumerateFromThenTo 0 (-2) (-6) [0,-2,-4,-6] streamly#enumerateFromStepIntegral from step6 generates an infinite stream whose first element is from3 and the successive elements are in increments of step.CAUTION: This function is not safe for finite integral types. It does not check for overflow, underflow or bounds. >>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromStepIntegral 0 2 [0,2,4,6] >>> Stream.toList $ Stream.take 3 $ Stream.enumerateFromStepIntegral 0 (-2) [0,-2,-4] streamly Enumerate an   type. enumerateFromIntegral from, generates a stream whose first element is from3 and the successive elements are in increments of 1+. The stream is bounded by the size of the   type. >>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromIntegral (0 :: Int) [0,1,2,3] streamly Enumerate an   type in steps. $enumerateFromThenIntegral from then+ generates a stream whose first element is from, the second element is then2 and the successive elements are in increments of  then - from,. The stream is bounded by the size of the   type. >>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenIntegral (0 :: Int) 2 [0,2,4,6] >>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenIntegral (0 :: Int) (-2) [0,-2,-4,-6] streamly Enumerate an   type up to a given limit. enumerateFromToIntegral from to3 generates a finite stream whose first element is from. and successive elements are in increments of 1 up to to. >>> Stream.toList $ Stream.enumerateFromToIntegral 0 4 [0,1,2,3,4] streamly Enumerate an  % type in steps up to a given limit. (enumerateFromThenToIntegral from then to3 generates a finite stream whose first element is from, the second element is then3 and the successive elements are in increments of  then - from up to to. >>> Stream.toList $ Stream.enumerateFromThenToIntegral 0 2 6 [0,2,4,6] >>> Stream.toList $ Stream.enumerateFromThenToIntegral 0 (-2) (-6) [0,-2,-4,-6] streamly&Numerically stable enumeration from a   number in steps of size 1. enumerateFromFractional from, generates a stream whose first element is from2 and the successive elements are in increments of 12. No overflow or underflow checks are performed.This is the equivalent to   for   types. For example: >>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromFractional 1.1 [1.1,2.1,3.1,4.1] streamly&Numerically stable enumeration from a   number in steps. %enumerateFromThenFractional from then, generates a stream whose first element is from, the second element is then3 and the successive elements are in increments of  then - from2. No overflow or underflow checks are performed.This is the equivalent of   for   types. For example: >>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenFractional 1.1 2.1 [1.1,2.1,3.1,4.1] >>> Stream.toList $ Stream.take 4 $ Stream.enumerateFromThenFractional 1.1 (-2.1) [1.1,-2.1,-5.300000000000001,-8.500000000000002] streamly&Numerically stable enumeration from a   number to a given limit. !enumerateFromToFractional from to3 generates a finite stream whose first element is from. and successive elements are in increments of 1 up to to.This is the equivalent of   for   types. For example: >>> Stream.toList $ Stream.enumerateFromToFractional 1.1 4 [1.1,2.1,3.1,4.1] >>> Stream.toList $ Stream.enumerateFromToFractional 1.1 4.6 [1.1,2.1,3.1,4.1,5.1] 7Notice that the last element is equal to the specified to. value after rounding to the nearest integer.streamly&Numerically stable enumeration from a  ( number in steps up to a given limit. *enumerateFromThenToFractional from then to3 generates a finite stream whose first element is from, the second element is then3 and the successive elements are in increments of  then - from up to to.This is the equivalent of   for   types. For example: >>> Stream.toList $ Stream.enumerateFromThenToFractional 0.1 2 6 [0.1,2.0,3.9,5.799999999999999] >>> Stream.toList $ Stream.enumerateFromThenToFractional 0.1 (-2) (-6) [0.1,-2.0,-4.1000000000000005,-6.200000000000001] streamly for   types not larger than  .streamly for   types not larger than  .streamly for   types not larger than  .Note: We convert the   to   and enumerate the  ,. If a type is bounded but does not have a   instance then we can go on enumerating it beyond the legal values of the type, resulting in the failure of   when converting back to  . Therefore we require a  / instance for this function to be safely used.streamly "enumerate = enumerateFrom minBound Enumerate a   type from its   to  streamly &enumerateTo = enumerateFromTo minBound Enumerate a   type from its   to specified value.streamly 4enumerateFromBounded = enumerateFromTo from maxBound for     types.Q!(c) 2017 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789: streamlySpecify the maximum number of threads that can be spawned concurrently for any concurrent combinator in a stream. A value of 0 resets the thread limit to default, a negative value means there is no limit. The default value is 1500.  does not affect  ParallelT5 streams as they can use unbounded number of threads.When the actions in a stream are IO bound, having blocking IO calls, this option can be used to control the maximum number of in-flight IO requests. When the actions are CPU bound this option can be used to control the amount of CPU used by the stream.Since: 0.4.0 (Streamly)streamlySpecify the maximum size of the buffer for storing the results from concurrent computations. If the buffer becomes full we stop spawning more concurrent tasks until there is space in the buffer. A value of 0 resets the buffer size to default, a negative value means there is no limit. The default value is 1500.CAUTION! using an unbounded : value (i.e. a negative value) coupled with an unbounded  value is a recipe for disaster in presence of infinite streams, or very large streams. Especially, it must not be used when  is used in  ZipAsyncM streams as  in applicative zip streams generates an infinite stream causing unbounded concurrent generation with no limit on the buffer or threads.Since: 0.4.0 (Streamly)streamly&Specify the pull rate of a stream. A  value resets the rate to default which is unlimited. When the rate is specified, concurrent production may be ramped up or down automatically to achieve the specified yield rate. The specific behavior for different styles of $$ specifications is documented under $. The effective maximum production rate achieved by a stream is governed by:The  limitThe  limit5The maximum rate that the stream producer can achieve5The maximum rate that the stream consumer can achieveSince: 0.5.0 (Streamly)streamlySame as )rate (Just $ Rate (r/2) r (2*r) maxBound)Specifies the average production rate of a stream in number of yields per second (i.e. Hertz). Concurrent production is ramped up or down automatically to achieve the specified average yield rate. The rate can go down to half of the specified rate on the lower side and double of the specified rate on the higher side.Since: 0.5.0 (Streamly)streamlySame as %rate (Just $ Rate r r (2*r) maxBound)Specifies the minimum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go below the specified rate, even though it may possibly go above it at times, the upper limit is double of the specified rate.Since: 0.5.0 (Streamly)streamlySame as %rate (Just $ Rate (r/2) r r maxBound)Specifies the maximum rate at which the stream should yield values. As far as possible the yield rate would never be allowed to go above the specified rate, even though it may possibly go below it at times, the lower limit is half of the specified rate. This can be useful in applications where certain resource usage must not be allowed to go beyond certain limits.Since: 0.5.0 (Streamly)streamlySame as rate (Just $ Rate r r r 0)Specifies a constant yield rate. If for some reason the actual rate goes above or below the specified rate we do not try to recover it by increasing or decreasing the rate in future. This can be useful in applications like graphics frame refresh where we need to maintain a constant refresh rate.Since: 0.5.0 (Streamly) streamlySpecify the average latency, in nanoseconds, of a single threaded action in a concurrent composition. Streamly can measure the latencies, but that is possible only after at least one task has completed. This combinator can be used to provide a latency hint so that rate control using  can take that into account right from the beginning. When not specified then a default behavior is chosen which could be too slow or too fast, and would be restricted by any other control parameters configured. A value of 0 indicates default behavior, a negative value means there is no limit i.e. zero latency. This would normally be useful only in high latency and high throughput cases.streamly:Print debug information about an SVar when the stream ends Pre-release R!(c) 2017 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789: streamly fromPure a = a `cons` nil ,Create a singleton stream from a pure value.?The following holds in monadic streams, but not in Zip streams: -fromPure = pure fromPure = fromEffect . pure In Zip applicative streams  is not the same as  because in that case  is equivalent to   instead.  and ( are equally efficient, in other cases  may be slightly more efficient than the other equivalent definitions.(Since: 0.8.0 (Renamed yield to fromPure)streamlySame as streamly fromEffect m = m `consM` nil 0Create a singleton stream from a monadic action. <> Stream.toList $ Stream.fromEffect getLine hello ["hello"] +Since: 0.8.0 (Renamed yieldM to fromEffect)streamlySame as streamlyrepeatM = fix . consMrepeatM = cycle1 . fromEffectGenerate a stream by repeatedly executing a monadic action forever.:{ repeatAsync =6 Stream.repeatM (threadDelay 1000000 >> print 1) & Stream.take 10 & Stream.fromAsync & Stream.drain:}&Concurrent, infinite (do not use with  fromParallel)streamly timesWith g returns a stream of time value tuples. The first component of the tuple is an absolute time reference (epoch) denoting the start of the stream and the second component is a time relative to the reference. The argument g specifies the granularity of the relative time in seconds. A lower granularity clock gives higher precision but is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.'import Control.Concurrent (threadDelay)import Streamly.Internal.Data.Stream.IsStream.Common as Stream (timesWith)Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.timesWith 0.01(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)).Note: This API is not safe on 32-bit machines. Pre-releasestreamlyabsTimesWith g returns a stream of absolute timestamps using a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ absTimesWith 0.01*AbsTime (TimeSpec {sec = ..., nsec = ...})*AbsTime (TimeSpec {sec = ..., nsec = ...})*AbsTime (TimeSpec {sec = ..., nsec = ...}).Note: This API is not safe on 32-bit machines. Pre-releasestreamlyrelTimesWith g returns a stream of relative time values starting from 0, using a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. Any granularity lower than 1 ms is treated as 1 ms.Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesWith 0.01RelTime64 (NanoSecond64 ...)RelTime64 (NanoSecond64 ...)RelTime64 (NanoSecond64 ...).Note: This API is not safe on 32-bit machines. Pre-releasestreamly'We can create higher order folds using . We can fold a number of streams to a given fold efficiently with full stream fusion. For example, to fold a list of streams on the same sum fold: 7concatFold = Prelude.foldl Stream.foldContinue Fold.sum .fold f = Fold.extractM . Stream.foldContinue fInternalstreamly&Fold a stream using the supplied left  and reducing the resulting expression strictly at each step. The behavior is similar to foldl'. A  can terminate early without consuming the full stream. See the documentation of individual s for termination behavior.3Stream.fold Fold.sum (Stream.enumerateFromTo 1 100)5050Folds never fail, therefore, they produce a default value even when no input is provided. It means we can always fold an empty stream and get a valid result. For example:Stream.fold Fold.sum Stream.nil0 However, foldMany< on an empty stream results in an empty stream. Therefore,  Stream.fold f is not the same as  Stream.head . Stream.foldMany f. )fold f = Stream.parse (Parser.fromFold f)streamly  map = fmap Same as  . 5> D.toList $ D.map (+1) $ D.fromList [1,2,3] [2,3,4] streamly+scanlMAfter' accumulate initial done stream is like scanlM'( except that it provides an additional done function to be applied on the accumulator when the stream stops. The result of done is also emitted in the stream.This function can be used to allocate a resource in the beginning of the scan and release it when the stream ends or to flush the internal state of the scan at the end. Pre-releasestreamlyLike  postscanl'5 but with a monadic step function and a monadic seed.:postscanlM' f z xs = Stream.drop 1 $ Stream.scanlM' f z xs Since: 0.7.0Since: 0.8.0 (signature change)streamly A stateful  , equivalent to a left scan, more like mapAccumL. Hopefully, this is a better alternative to scan. Separation of state from the output makes it easier to think in terms of a shared state, and also makes it easier to keep the state fully strict and the output lazy. See also: scanlM' Pre-releasestreamly Take first n/ elements from the stream and discard the rest.streamly> return ',') $ Stream.fromList "hello"h.,e.,l.,l.,o"h,e,l,l,o"Be careful about the order of effects. In the above example we used trace after the intersperse, if we use it before the intersperse the output would be he.l.l.o."h,e,l,l,o".Stream.toList $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.trace putChar $ Stream.fromList "hello"he.l.l.o."h,e,l,l,o"streamly?Intersperse a monadic action into the input stream after every n seconds. > import Control.Concurrent (threadDelay) > Stream.drain $ Stream.interjectSuffix 1 (putChar ',') $ Stream.mapM (x -> threadDelay 1000000 >> putChar x) $ Stream.fromList "hello" h,e,l,l,o  Pre-releasestreamlyReturns the elements of the stream in reverse order. The stream must be finite. Note that this necessarily buffers the entire stream in memory.5reverse = Stream.foldlT (flip Stream.cons) Stream.nil Since 0.7.0 (Monad m constraint) Since: 0.1.1streamlyLike & but several times faster, requires a   instance. Pre-releasestreamlyMake the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it terminates if the buffer is full and a worker thread is kicked off again to evaluate the remaining stream when there is space in the buffer. The consumer consumes the stream lazily from the buffer.Since: 0.2.0 (Streamly)streamlyMake the stream producer and consumer run concurrently by introducing a buffer between them. The producer thread evaluates the input stream until the buffer fills, it blocks if the buffer is full until there is space in the buffer. The consumer consumes the stream lazily from the buffer. mkParallel = IsStream.fromStreamD . mkParallelD . IsStream.toStreamD Pre-releasestreamlyLike parallel8 but stops the output as soon as the first stream stops. Pre-releasestreamlyMap a stream producing monadic function on each element of the stream and then flatten the results into a single stream. Since the stream generation function is monadic, unlike , it can produce an effect at the beginning of each iteration of the inner loop.streamlyMap a stream producing function on each element of the stream and then flatten the results into a single stream.,concatMap f = Stream.concatMapM (return . f)2concatMap f = Stream.concatMapWith Stream.serial f*concatMap f = Stream.concat . Stream.map fstreamlyGiven a stream value in the underlying monad, lift and join the underlying monad with the stream monad.+concatM = Stream.concat . Stream.fromEffectStream.toList $ Stream.foldManyPost f $ Stream.fromList [1..9] [3,7,11,15,9]?Stream.toList $ Stream.foldManyPost f $ Stream.fromList [1..10][3,7,11,15,19,0] Pre-releasestreamlyLike splitOn but the separator is a sequence of elements instead of a single element.For illustration, let's define a function that operates on pure lists:splitOnSeq' pat xs = Stream.toList $ Stream.splitOnSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)splitOnSeq' "" "hello"["h","e","l","l","o"]splitOnSeq' "hello" ""[""]splitOnSeq' "hello" "hello"["",""]splitOnSeq' "x" "hello" ["hello"]splitOnSeq' "h" "hello" ["","ello"]splitOnSeq' "o" "hello" ["hell",""]splitOnSeq' "e" "hello" ["h","llo"]splitOnSeq' "l" "hello" ["he","","o"]splitOnSeq' "ll" "hello" ["he","o"] is an inverse of  intercalate!. The following law always holds: intercalate . splitOnSeq == idThe following law holds when the separator is non-empty and contains none of the elements present in the input lists: splitOnSeq . intercalate == idsplitOnSeq pat f = Stream.foldManyPost (Fold.takeEndBySeq_ pat f) Pre-releasestreamlyLike & but using a monadic zipping function.streamlyStream a( is evaluated first, followed by stream b, the resulting elements a and b are then zipped using the supplied zip function and the result c is yielded to the consumer. If stream a or stream b) ends, the zipped stream ends. If stream b ends first, the element a$ from previous evaluation of stream a is discarded. > D.toList $ D.zipWith (+) (D.fromList [1,2,3]) (D.fromList [4,5,6]) [5,7,9] "S!(c) 2017 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:streamlyUse a   to transform a stream. Pre-releasestreamly Right fold to a streaming monad. $foldrS Stream.cons Stream.nil === id can be used to perform stateless stream to stream transformations like map and filter in general. It can be coupled with a scan to perform stateful transformations. However, note that the custom map and filter routines can be much more efficient than this due to better stream fusion.Stream.toList $ Stream.foldrS Stream.cons Stream.nil $ Stream.fromList [1..5] [1,2,3,4,5]%Find if any element in the stream is :Stream.toList $ Stream.foldrS (\x xs -> if odd x then (Stream.fromPure True) else xs) (Stream.fromPure False) $ (Stream.fromList (2:4:5:undefined) :: Stream.SerialT IO Int)[True]:Map (+2) on odd elements and filter out the even elements:Stream.toList $ Stream.foldrS (\x xs -> if odd x then (x + 2) `Stream.cons` xs else xs) Stream.nil $ (Stream.fromList [1..5] :: Stream.SerialT IO Int)[3,5,7]foldrM% can also be represented in terms of ., however, the former is much more efficient: foldrM f z s = runIdentityT $ foldrS (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s Pre-releasestreamlyRight fold to a transformer monad. This is the most general right fold function.  is a special case of  , however ' implementation can be more efficient: foldrS = foldrT foldrM f z s = runIdentityT $ foldrT (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s can be used to translate streamly streams to other transformer monads e.g. to a different streaming type. Pre-releasestreamly mapM f = sequence . map f Apply a monadic function to each element of the stream and replace it with the output of the resulting action. >>> drain $ Stream.mapM putStr $ Stream.fromList ["a", "b", "c"] abc >>> :{ drain $ Stream.replicateM 10 (return 1) & (fromSerial . Stream.mapM (x -> threadDelay 1000000 >> print x)) :} 1 ... 1 > drain $ Stream.replicateM 10 (return 1) & (fromAsync . Stream.mapM (x -> threadDelay 1000000 >> print x)) Concurrent (do not use with  fromParallel on infinite streams)streamly sequence = mapM id Replace the elements of a stream of monadic actions with the outputs of those actions. >>> drain $ Stream.sequence $ Stream.fromList [putStr "a", putStr "b", putStrLn "c"] abc >>> :{ drain $ Stream.replicateM 3 (return $ threadDelay 1000000 >> print 1) & (fromSerial . Stream.sequence) :} 1 1 1 >>> :{ drain $ Stream.replicateM 3 (return $ threadDelay 1000000 >> print 1) & (fromAsync . Stream.sequence) :} 1 1 1 Concurrent (do not use with  fromParallel on infinite streams)streamly-Tap the data flowing through a stream into a . For example, you may add a tap to log the contents flowing through the stream. The fold is used only for effects, its result is discarded.  Fold m a b | -----stream m a ---------------stream m a----- Stream.drain $ Stream.tap (Fold.drainBy print) (Stream.enumerateFromTo 1 2)12 Compare with .streamlytapOffsetEvery offset n taps every n&th element in the stream starting at offset. offset can be between 0 and n - 1. Offset 0 means start at the first element in the stream. If the offset is outside this range then offset   n is used as offset.Stream.drain $ Stream.tapOffsetEvery 0 2 (Fold.rmapM print Fold.toList) $ Stream.enumerateFromTo 0 10[0,2,4,6,8,10]streamlyRedirect a copy of the stream to a supplied fold and run it concurrently in an independent thread. The fold may buffer some elements. The buffer size is determined by the prevailing  setting.  Stream m a -> m b | -----stream m a ---------------stream m a-----  >>> Stream.drain $ Stream.tapAsync (Fold.drainBy print) (Stream.enumerateFromTo 1 2) 1 2 Exceptions from the concurrently running fold are propagated to the current computation. Note that, because of buffering in the fold, exceptions may be delayed and may not correspond to the current element being processed in the parent stream, but we guarantee that before the parent stream stops the tap finishes and all exceptions from it are drained. Stream.drain $ Stream.distributeAsync_ [Stream.mapM_ print, Stream.mapM_ print] (Stream.enumerateFromTo 1 2) 1 2 1 2  )distributeAsync_ = flip (foldr tapAsync)  Pre-releasestreamly*pollCounts predicate transform fold stream4 counts those elements in the stream that pass the  predicate. The resulting count stream is sent to another thread which transforms it using  transform and then folds it using fold. The thread is automatically cleaned up if the stream stops or aborts due to exception.For example, to print the count of elements processed every second: > Stream.drain $ Stream.pollCounts (const True) (Stream.rollingMap (-) . Stream.delayPost 1) (FLold.drainBy print) $ Stream.enumerateFrom 0 5Note: This may not work correctly on 32-bit machines. Pre-releasestreamlyApply a monadic function to each element flowing through the stream and discard the results.>Stream.drain $ Stream.trace print (Stream.enumerateFromTo 1 2)12 Compare with .streamlyPerform a side effect before yielding each element of the stream and discard the results. >>> Stream.drain $ Stream.trace_ (print "got here") (Stream.enumerateFromTo 1 2) "got here" "got here" Same as  but always serial. See also:  Pre-releasestreamly+Scan a stream using the given monadic fold.Stream.toList $ Stream.takeWhile (< 10) $ Stream.scan Fold.sum (Stream.fromList [1..10]) [0,1,3,6]streamlyLike  but restarts scanning afresh when the scanning fold terminates. Pre-releasestreamly/Postscan a stream using the given monadic fold.The following example extracts the input stream up to a point where the running average of elements is no more than 10:import Data.Maybe (fromJust)let avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length):{ Stream.toList $ Stream.map (fromJust . fst)( $ Stream.takeWhile (\(_,x) -> x <= 10) $ Stream.postscan (Fold.tee Fold.last avg) (Stream.enumerateFromTo 1.0 100.0):}[1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]streamly3Strict left scan with an extraction function. Like , but applies a user supplied extraction function (the third argument) at each step. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction. Since 0.2.0!Since: 0.7.0 (Monad m constraint)streamlyLike 5 but with a monadic step function and a monadic seed. Since: 0.4.0Since: 0.8.0 (signature change)streamlyStrict left scan. Like ,  too is a one to one transformation, however it adds an extra element. >>> Stream.toList $ Stream.scanl' (+) 0 $ fromList [1,2,3,4] [0,1,3,6,10]  >>> Stream.toList $ Stream.scanl' (flip (:)) [] $ Stream.fromList [1,2,3,4] [[],[1],[2,1],[3,2,1],[4,3,2,1]] The output of  is the initial value of the accumulator followed by all the intermediate steps and the final result of foldl'.By streaming the accumulated state after each fold step, we can share the state across multiple stages of stream composition. Each stage can modify or extend the state, do some processing with it and emit it for the next stage, thus modularizing the stream processing. This can be useful in stateful or event-driven programming.Consider the following monolithic example, computing the sum and the product of the elements in a stream in one go using a foldl': >>> Stream.foldl' ((s, p) x -> (s + x, p * x)) (0,1) $ Stream.fromList  10,241,2,3,4 Using scanl' we can make it modular by computing the sum in the first stage and passing it down to the next stage for computing the product: >>> :{ Stream.foldl' ((_, p) (s, x) -> (s, p * x)) (0,1) $ Stream.scanl' ((s, _) x -> (s + x, x)) (0,1) $ Stream.fromList [1,2,3,4] :} (10,24)  IMPORTANT:  evaluates the accumulator to WHNF. To avoid building lazy expressions inside the accumulator, it is recommended that a strict data structure is used for accumulator.)scanl' step z = scan (Fold.foldl' step z)>scanl' f z xs = scanlM' (\a b -> return (f a b)) (return z) xs1scanl' f z xs = z `Stream.cons` postscanl' f z xs See also:  usingStateTstreamlyLike : but does not stream the initial value of the accumulator.1postscanl' step z = postscan (Fold.foldl' step z)postscanl' f z = postscanlM' (\a b -> return (f a b)) (return z)8postscanl' f z xs = Stream.drop 1 $ Stream.scanl' f z xsstreamlyLike scanl' but does not stream the final value of the accumulator. Pre-releasestreamlyLike prescanl' but with a monadic step function and a monadic seed. Pre-releasestreamlyLike " but with a monadic step function.streamlyLike  but for a non-empty stream. The first element of the stream is used as the initial value of the accumulator. Does nothing if the stream is empty. >>> Stream.toList $ Stream.scanl1' (+) $ fromList [1,2,3,4] [1,3,6,10] streamly Modify a t m a -> t m a1 stream transformation that accepts a predicate (a -> b) to accept  ((s, a) -> b)$ instead, provided a transformation t m a -> t m (s, a)*. Convenient to filter with index or time. filterWithIndex = with indexed filter filterWithAbsTime = with timestamped filter filterWithRelTime = with timeIndexed filter  Pre-releasestreamly2Include only those elements that pass a predicate.streamlySame as  but with a monadic predicate.streamlyDrop repeated elements that are adjacent to each other using the supplied comparison function.@uniq = uniqBy (==)#To strip duplicate path separators:  f x y = x ==  ? && x == y Stream.toList $ Stream.uniqBy f $ Stream.fromList "/a/b" "ab" Space: O(1) See also: . Pre-releasestreamly7Drop repeated elements that are adjacent to each other.streamlyStrip all leading and trailing occurrences of an element passing a predicate and make all other consecutive occurrences uniq. 9prune p = dropWhileAround p $ uniqBy (x y -> p x && p y)  > Stream.prune isSpace (Stream.fromList " hello world! ") "hello world!" Space: O(1) Unimplementedstreamly"Emit only repeated elements, once. Unimplementedstreamly.Drop repeated elements anywhere in the stream.*Caution: not scalable for infinite streamsSee also: nubWindowBy UnimplementedstreamlyDeletes the first occurrence of the element in the stream that satisfies the given equality predicate. >>> Stream.toList $ Stream.deleteBy (==) 3 $ Stream.fromList [1,3,3,5] [1,3,5] streamlyEvaluate the input stream continuously and keep only the oldest n elements in the buffer, discard the new ones when the buffer is full. When the output stream is evaluated it consumes the values from the buffer in a FIFO manner. UnimplementedstreamlyEvaluate the input stream continuously and keep only the latest n elements in a ring buffer, keep discarding the older ones to make space for the new ones. When the output stream is evaluated it consumes the values from the buffer in a FIFO manner. UnimplementedstreamlyLike  but samples at uniform intervals to match the consumer rate. Note that  leads to non-uniform sampling depending on the consumer pattern. UnimplementedstreamlySame as  but with a monadic predicate.streamlyTake n# elements at the end of the stream.1O(n) space, where n is the number elements taken. UnimplementedstreamlyTake time interval i" seconds at the end of the stream.1O(n) space, where n is the number elements taken. UnimplementedstreamlyTake all consecutive elements at the end of the stream for which the predicate is true.1O(n) space, where n is the number elements taken. UnimplementedstreamlyLike  and  combined.>O(n) space, where n is the number elements taken from the end. UnimplementedstreamlyDrop elements in the stream as long as the predicate succeeds and then take the rest of the stream.streamlySame as  but with a monadic predicate.streamlyDrop n# elements at the end of the stream.3O(n) space, where n is the number elements dropped. UnimplementedstreamlyDrop time interval i" seconds at the end of the stream.3O(n) space, where n is the number elements dropped. UnimplementedstreamlyDrop all consecutive elements at the end of the stream for which the predicate is true.3O(n) space, where n is the number elements dropped. UnimplementedstreamlyLike  and  combined.O(n) space, where n is the number elements dropped from the end. UnimplementedstreamlyinsertBy cmp elem stream inserts elem before the first element in stream that is less than elem when compared using cmp. insertBy cmp x = mergeBy cmp (fromPure x) >>> Stream.toList $ Stream.insertBy compare 2 $ Stream.fromList [1,3,5] [1,2,3,5] streamly Stream.toList $ Stream.intersperseMWith 2 (return ',') $ Stream.fromList "hello" "he,ll,o"  UnimplementedstreamlyInsert an effect and its output after consuming an element of a stream.Stream.toList $ Stream.trace putChar $ intersperseMSuffix (putChar '.' >> return ',') $ Stream.fromList "hello"h.,e.,l.,l.,o.,"h,e,l,l,o," Pre-releasestreamly>> Stream.mapM_ putChar $ Stream.intersperseMSuffix_ (threadDelay 1000000) $ Stream.fromList "hello" hello  Pre-releasestreamlyLike  but intersperses an effectful action into the input stream after every n% elements and after the last element.Stream.toList $ Stream.intersperseMSuffixWith 2 (return ',') $ Stream.fromList "hello" "he,ll,o," Pre-releasestreamly=Insert a side effect before consuming an element of a stream.Stream.toList $ Stream.trace putChar $ Stream.intersperseMPrefix_ (putChar '.' >> return ',') $ Stream.fromList "hello".h.e.l.l.o"hello"Same as  but may be concurrent. Concurrent Pre-releasestreamlyIntroduce a delay of specified seconds before consuming an element of the stream except the first one.Stream.mapM_ print $ Stream.timestamped $ Stream.delay 1 $ Stream.enumerateFromTo 1 3.(AbsTime (TimeSpec {sec = ..., nsec = ...}),1).(AbsTime (TimeSpec {sec = ..., nsec = ...}),2).(AbsTime (TimeSpec {sec = ..., nsec = ...}),3)streamlyIntroduce a delay of specified seconds after consuming an element of a stream.Stream.mapM_ print $ Stream.timestamped $ Stream.delayPost 1 $ Stream.enumerateFromTo 1 3.(AbsTime (TimeSpec {sec = ..., nsec = ...}),1).(AbsTime (TimeSpec {sec = ..., nsec = ...}),2).(AbsTime (TimeSpec {sec = ..., nsec = ...}),3) Pre-releasestreamlyIntroduce a delay of specified seconds before consuming an element of a stream.Stream.mapM_ print $ Stream.timestamped $ Stream.delayPre 1 $ Stream.enumerateFromTo 1 3.(AbsTime (TimeSpec {sec = ..., nsec = ...}),1).(AbsTime (TimeSpec {sec = ..., nsec = ...}),2).(AbsTime (TimeSpec {sec = ..., nsec = ...}),3) Pre-releasestreamlyBuffer until the next element in sequence arrives. The function argument determines the difference in sequence numbers. This could be useful in implementing sequenced streams, for example, TCP reassembly. Unimplementedstreamly indexed = Stream.postscanl' (\(i, _) x -> (i + 1, x)) (-1,undefined) indexed = Stream.zipWith (,) (Stream.enumerateFrom 0)Pair each element in a stream with its index, starting from index 0.8Stream.toList $ Stream.indexed $ Stream.fromList "hello")[(0,'h'),(1,'e'),(2,'l'),(3,'l'),(4,'o')]streamly indexedR n = Stream.postscanl' (\(i, _) x -> (i - 1, x)) (n + 1,undefined) indexedR n = Stream.zipWith (,) (Stream.enumerateFromThen n (n - 1))Pair each element in a stream with its index, starting from the given index n and counting down. t m b to a stream t m a concurrently; the input stream is evaluated asynchronously in an independent thread yielding elements to a buffer and the transformation function runs in another thread consuming the input from the buffer. 5 is just like regular function application operator   except that it is concurrent.If you read the signature as $(t m a -> t m b) -> (t m a -> t m b) you can look at it as a transformation that converts a transform function to a buffered concurrent transform function.The following code prints a value every second even though each stage adds a 1 second delay.:{Stream.drain $5 Stream.mapM (\x -> threadDelay 1000000 >> print x)= |$ Stream.replicateM 3 (threadDelay 1000000 >> return 1):}111 ConcurrentSince: 0.3.0 (Streamly)streamlySame as .InternalstreamlySame as  but with arguments reversed.(|&) = flip (|$) ConcurrentSince: 0.3.0 (Streamly)$+,-./01T!(c) 2017 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:Astreamly Convert an  - into a stream by supplying it an input seed.Stream.drain $ Stream.unfold Unfold.replicateM (3, putStrLn "hello")hellohellohello Since: 0.7.0streamly Convert an  ' with a closed input end into a stream. Pre-releasestreamly:{unfoldr step s = case step s of Nothing -> Stream.nil5 Just (a, b) -> a `Stream.cons` unfoldr step b:}Build a stream by unfolding a pure step function step starting from a seed s. The step function returns the next element in the stream and the next seed value. When it is done it returns # and the stream ends. For example,:{ let f b = if b > 2 then Nothing else Just (b, b + 1)%in Stream.toList $ Stream.unfoldr f 0:}[0,1,2]streamlyBuild a stream by unfolding a monadic step function starting from a seed. The step function returns the next element in the stream and the next seed value. When it is done it returns # and the stream ends. For example,:{ let f b = if b > 2 then return Nothing% else return (Just (b, b + 1))&in Stream.toList $ Stream.unfoldrM f 0:}[0,1,2]When run concurrently, the next unfold step can run concurrently with the processing of the output of the previous step. Note that more than one step cannot run concurrently as the next step depends on the output of the previous step.:{ let f b = if b > 2 then return Nothing< else threadDelay 1000000 >> return (Just (b, b + 1))in Stream.toList $ Stream.delay 1 $ Stream.fromAsync $ Stream.unfoldrM f 0:}[0,1,2] Concurrent Since: 0.1.0streamly6Generate an infinite stream by repeating a pure value.streamly+replicate n = Stream.take n . Stream.repeatGenerate a stream of length n by repeating a value n times.streamly-replicateM n = Stream.take n . Stream.repeatM1Generate a stream by performing a monadic action n times. Same as:%pr n = threadDelay 1000000 >> print n'This runs serially and takes 3 seconds:=Stream.drain $ Stream.fromSerial $ Stream.replicateM 3 $ pr 1111/This runs concurrently and takes just 1 second:=Stream.drain $ Stream.fromAsync $ Stream.replicateM 3 $ pr 1111 Concurrentstreamlytimes returns a stream of time value tuples with clock of 10 ms granularity. The first component of the tuple is an absolute time reference (epoch) denoting the start of the stream and the second component is a time relative to the reference.Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.times(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))(AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...)).Note: This API is not safe on 32-bit machines. Pre-releasestreamlyabsTimes returns a stream of absolute timestamps using a clock of 10 ms granularity.Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.absTimes*AbsTime (TimeSpec {sec = ..., nsec = ...})*AbsTime (TimeSpec {sec = ..., nsec = ...})*AbsTime (TimeSpec {sec = ..., nsec = ...}).Note: This API is not safe on 32-bit machines. Pre-releasestreamlyrelTimes returns a stream of relative time values starting from 0, using a clock of granularity 10 ms.Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimesRelTime64 (NanoSecond64 ...)RelTime64 (NanoSecond64 ...)RelTime64 (NanoSecond64 ...).Note: This API is not safe on 32-bit machines. Pre-releasestreamly durations g returns a stream of relative time values measuring the time elapsed since the immediate predecessor element of the stream was generated. The first element of the stream is always 0.  durations uses a clock of granularity g specified in seconds. A low granularity clock is more expensive in terms of CPU usage. The minimum granularity is 1 millisecond. Durations lower than 1 ms will be 0..Note: This API is not safe on 32-bit machines. UnimplementedstreamlyGenerate ticks at the specified rate. The rate is adaptive, the tick generation speed can be increased or decreased at different times to achieve the specified rate. The specific behavior for different styles of $% specifications is documented under $. The effective maximum rate achieved by a stream is governed by the processor speed. UnimplementedstreamlyGenerate a singleton event at or after the specified absolute time. Note that this is different from a threadDelay, a threadDelay starts from the time when the action is evaluated, whereas if we use AbsTime based timeout it will immediately expire if the action is evaluated too late. Unimplementedstreamly/fromIndices f = fmap f $ Stream.enumerateFrom 0fromIndicesM f = let g i = f i `Stream.consM` g (i + 1) in g 0Generate an infinite stream, whose values are the output of a monadic function f7 applied on the corresponding index. Index starts at 0. Concurrentstreamly)iterate f x = x `Stream.cons` iterate f x!Generate an infinite stream with x as the first element and each successive element derived by applying the function f on the previous element.5Stream.toList $ Stream.take 5 $ Stream.iterate (+1) 1 [1,2,3,4,5]streamlyiterateM f m = m >>= \a -> return a `Stream.consM` iterateM f (f a)Generate an infinite stream with the first element generated by the action m and each successive element derived by applying the monadic function f on the previous element.%pr n = threadDelay 1000000 >> print n:{9Stream.iterateM (\x -> pr x >> return (x + 1)) (return 0) & Stream.take 3 & Stream.fromSerial & Stream.toList:}01[0,1,2]When run concurrently, the next iteration can run concurrently with the processing of the previous iteration. Note that more than one iteration cannot run concurrently as the next iteration depends on the output of the previous iteration.:{ 9Stream.iterateM (\x -> pr x >> return (x + 1)) (return 0) & Stream.delay 1 & Stream.take 3 & Stream.fromAsync & Stream.toList:}01... Concurrent Since: 0.1.2Since: 0.7.0 (signature change)streamly&We can define cyclic structures using let:'let (a, b) = ([1, b], head a) in (a, b) ([1,1],1) The function fix defined as:fix f = let x = f x in xensures that the argument of a function and its output refer to the same lazy value x* i.e. the same location in memory. Thus x can be defined in terms of itself, creating structures with cyclic references.f ~(a, b) = ([1, b], head a)fix f ([1,1],1)UV is essentially the same as fix but for monadic values.Using  for streams we can construct a stream in which each element of the stream is defined in a cyclic fashion. The argument of the function being fixed represents the current element of the stream which is being returned by the stream monad. Thus, we can use the argument to construct itself. Pre-releasestreamly3fromFoldable = Prelude.foldr Stream.cons Stream.nilConstruct a stream from a   containing pure values:streamly5fromFoldableM = Prelude.foldr Stream.consM Stream.nilConstruct a stream from a   containing monadic actions.%pr n = threadDelay 1000000 >> print nStream.drain $ Stream.fromSerial $ Stream.fromFoldableM $ map pr [1,2,3]123Stream.drain $ Stream.fromAsync $ Stream.fromFoldableM $ map pr [1,2,3].........Concurrent (do not use with  fromParallel on infinite containers)streamly fromListM = Stream.fromFoldableM-fromListM = Stream.sequence . Stream.fromList,fromListM = Stream.mapM id . Stream.fromList1fromListM = Prelude.foldr Stream.consM Stream.nilConstruct a stream from a list of monadic actions. This is more efficient than  for serial streams.streamly6Read lines from an IO Handle into a stream of Strings.streamlyTakes a callback setter function and provides it with a callback. The callback when invoked adds a value at the tail of the stream. Returns a stream of values generated by the callback. Pre-releasestreamly!Construct a stream by reading an Unboxed IORef repeatedly. Pre-release.W!(c) 2017 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:wj?streamlyDecompose a stream into its head and tail. If the stream is empty, returns &. If the stream is non-empty, returns  Just (a, ma), where a is the head of the stream and ma its tail.This can be used to do pretty much anything in an imperative manner, as it just breaks down the stream into individual elements and we can loop over them as we deem fit. For example, this can be used to convert a streamly stream into other stream types.:All the folds in this module can be expressed in terms of , however, this is generally less efficient than specific folds because it takes apart the stream one element at a time, therefore, does not take adavantage of stream fusion.streamly"Right associative/lazy pull fold. foldrM build final stream9 constructs an output structure using the step function build. build is invoked with the next input element and the remaining (lazy) tail of the output structure. It builds a lazy output expression using the two. When the "tail structure" in the output expression is evaluated it calls build( again thus lazily consuming the input stream. until either the output expression built by build is free of the "tail" or the input is exhausted in which case final is used as the terminating case for the output structure. For more details see the description in the previous section.%Example, determine if any element is   in a stream:Stream.foldrM (\x xs -> if odd x then return True else xs) (return False) $ Stream.fromList (2:4:5:undefined)True Since: 0.7.0 (signature changed) Since: 0.2.0 (signature changed) Since: 0.1.0streamlyRight fold, lazy for lazy monads and pure streams, and strict for strict monads.Please avoid using this routine in strict monads like IO unless you need a strict right fold. This is provided only for use in lazy monads (e.g. Identity) or pure streams. Note that with this signature it is not possible to implement a lazy foldr when the monad m is strict. In that case it would be strict in its accumulator and therefore would necessarily consume all its input.streamlyLazy right fold for non-empty streams, using first element as the starting value. Returns  if the stream is empty.streamlyLazy left fold to a stream.streamly&Lazy left fold to a transformer monad.!For example, to reverse a stream: D.toList $ D.foldlT (flip D.cons) D.nil $ (D.fromList [1..5] :: SerialT IO Int)streamlyStrict left fold with an extraction function. Like the standard strict left fold, but applies a user supplied extraction function (the third argument) to the folded value at the end. This is designed to work with the foldl library. The suffix x is a mnemonic for extraction.streamly#Left associative/strict push fold. foldl' reduce initial stream invokes reduce with the accumulator and the next input in the input stream, using initial as the initial value of the current value of the accumulator. When the input is exhausted the current value of the accumulator is returned. Make sure to use a strict data structure for accumulator to not build unnecessary lazy expressions unless that's what you want. See the previous section for more details.streamlyStrict left fold, for non-empty streams, using first element as the starting value. Returns  if the stream is empty.streamlyLike #, but with a monadic step function.streamlyLike " but with a monadic step function. Since: 0.2.0Since: 0.8.0 (signature change)streamly*Parse a stream using the supplied ParserD  .Internalstreamly"Parse a stream using the supplied  .Unlike folds, parsers may not always result in a valid output, they may result in an error. For example:4Stream.parse (Parser.takeEQ 1 Fold.drain) Stream.nilLeft (ParseError "takeEQ: Expecting exactly 1 elements, input terminated on 0")Note: *fold f = Stream.parse (Parser.fromFold f) parse p is not the same as head . parseMany p on an empty stream. Pre-releasestreamly "mapM_ = Stream.drain . Stream.mapMApply a monadic action to each element of the stream and discard the output of the action. This is not really a pure transformation operation but a transformation followed by fold.streamly >drain = mapM_ (\_ -> return ()) drain = Stream.fold Fold.drainRun a stream, discarding the results. By default it interprets the stream as , to run other types of streams use the type adapting combinators for example Stream.drain .  fromAsync.streamly drainN n = Stream.drain . Stream.take n drainN n = Stream.fold (Fold.take n Fold.drain)Run maximum up to n iterations of a stream.streamly runN n = runStream . take nRun maximum up to n iterations of a stream.streamly 0drainWhile p = Stream.drain . Stream.takeWhile p1Run a stream as long as the predicate holds true.streamly $runWhile p = runStream . takeWhile p1Run a stream as long as the predicate holds true.streamlyRun a stream, discarding the results. By default it interprets the stream as , to run other types of streams use the type adapting combinators for example  runStream .  fromAsync.streamly&Determine whether the stream is empty. null = Stream.fold Fold.nullstreamly0Extract the first element of the stream, if any. )head = (!! 0) head = Stream.fold Fold.onestreamlyExtract the first element of the stream, if any, otherwise use the supplied default value. It can help avoid one branch in high performance code. Pre-releasestreamly &tail = fmap (fmap snd) . Stream.uncons8Extract all but the first element of the stream, if any.streamly7Extract all but the last element of the stream, if any.streamly/Extract the last element of the stream, if any. last xs = xs !! (Stream.length xs - 1) last = Stream.fold Fold.laststreamly6Determine whether an element is present in the stream. elem = Stream.fold Fold.elemstreamly:Determine whether an element is not present in the stream. !notElem = Stream.fold Fold.lengthstreamly#Determine the length of the stream.streamly?Determine whether all elements of a stream satisfy a predicate. all = Stream.fold Fold.allstreamlyDetermine whether any of the elements of a stream satisfy a predicate. any = Stream.fold Fold.anystreamly8Determines if all elements of a boolean stream are True. and = Stream.fold Fold.andstreamlyDetermines whether at least one element of a boolean stream is True. or = Stream.fold Fold.orstreamlyDetermine the sum of all elements of a stream of numbers. Returns 0 when the stream is empty. Note that this is not numerically stable for floating point numbers. sum = Stream.fold Fold.sumstreamlyDetermine the product of all elements of a stream of numbers. Returns 1 when the stream is empty. "product = Stream.fold Fold.productstreamly3Fold a stream of monoid elements by appending them. "mconcat = Stream.fold Fold.mconcat Pre-releasestreamly  minimum = , compare minimum = Stream.fold Fold.minimum *Determine the minimum element in a stream.streamlyDetermine the minimum element in a stream using the supplied comparison function. &minimumBy = Stream.fold Fold.minimumBystreamly  maximum = , compare maximum = Stream.fold Fold.maximum *Determine the maximum element in a stream.streamlyDetermine the maximum element in a stream using the supplied comparison function. &maximumBy = Stream.fold Fold.maximumBystreamlyEnsures that all the elements of the stream are identical and then returns that unique element.streamly&Lookup the element at the given index.streamly!In a stream of (key-value) pairs (a, b), return the value b9 of the first pair where the key equals the given value a. lookup = snd <$> Stream.find ((==) . fst) lookup = Stream.fold Fold.lookupstreamlyLike " but with a non-monadic predicate. 8find p = findM (return . p) find = Stream.fold Fold.findstreamly=Returns the first element that satisfies the given predicate. findM = Stream.fold Fold.findMstreamly;Returns the first index that satisfies the given predicate. &findIndex = Stream.fold Fold.findIndexstreamlyReturns the first index where a given value is found in the stream. %elemIndex a = Stream.findIndex (== a)streamly toList = Stream.foldr (:) [] Convert a stream into a list in the underlying monad. The list can be consumed lazily in a lazy monad (e.g. Identity). In a strict monad (e.g. IO) the whole list is generated and buffered before it can be consumed.Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead.streamly (toListRev = Stream.foldl' (flip (:)) [] Convert a stream into a list in reverse order in the underlying monad.Warning! working on large lists accumulated as buffers in memory could be very inefficient, consider using Streamly.Array instead. Pre-releasestreamly #toHandle h = D.mapM_ $ hPutStrLn h *Write a stream of Strings to an IO Handle. streamly"Convert a stream to a pure stream. /toStream = Stream.foldr Stream.cons Stream.nil  Pre-releasestreamly3Convert a stream to a pure stream in reverse order. :toStreamRev = Stream.foldl' (flip Stream.cons) Stream.nil  Pre-releasestreamly m b to a stream t m a concurrently; The the input stream is evaluated asynchronously in an independent thread yielding elements to a buffer and the folding action runs in another thread consuming the input from the buffer.If you read the signature as  (t m a -> m b) -> (t m a -> m b) you can look at it as a transformation that converts a fold function to a buffered concurrent fold function.The . at the end of the operator is a mnemonic for termination of the stream.In the example below, each stage introduces a delay of 1 sec but output is printed every second because both stages are concurrent.'import Control.Concurrent (threadDelay)import Streamly.Prelude ((|$.)):{ Stream.foldlM' (\_ a -> threadDelay 1000000 >> print a) (return ())> |$. Stream.replicateM 3 (threadDelay 1000000 >> return 1):}111 ConcurrentSince: 0.3.0 (Streamly)streamlySame as .InternalstreamlySame as  but with arguments reversed. (|&.) = flip (|$.) ConcurrentSince: 0.3.0 (Streamly)streamlyReturns  if the first stream is the same as or a prefix of the second. A stream is a prefix of itself.Stream.isPrefixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)TruestreamlyReturns  if the first stream is an infix of the second. A stream is considered an infix of itself. Stream.isInfixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)TrueSpace: O(n) worst case where n is the length of the infix. Pre-release Requires   constraintstreamlyReturns  if the first stream is a suffix of the second. A stream is considered a suffix of itself.Stream.isSuffixOf (Stream.fromList "hello") (Stream.fromList "hello" :: SerialT IO Char)TrueSpace: O(n)-, buffers entire input stream and the suffix. Pre-release Suboptimal - Help wanted.streamlyReturns  if all the elements of the first stream occur, in order, in the second stream. The elements do not have to occur consecutively. A stream is a subsequence of itself.Stream.isSubsequenceOf (Stream.fromList "hlo") (Stream.fromList "hello" :: SerialT IO Char)TruestreamlystripPrefix prefix stream strips prefix from stream' if it is a prefix of stream. Returns  if the stream does not start with the given prefix, stripped stream otherwise. Returns Just nil, when the prefix is the same as the stream.See also "Streamly.Internal.Data.Stream.IsStream.Nesting.dropPrefix".Space: O(1)streamly.Drops the given suffix from a stream. Returns < if the stream does not end with the given suffix. Returns Just nil, when the suffix is the same as the stream.It may be more efficient to convert the stream to an Array and use stripSuffix on that especially if the elements have a Storable or Prim instance.See also "Streamly.Internal.Data.Stream.IsStream.Nesting.dropSuffix".Space: O(n)7, buffers the entire input stream as well as the suffix Pre-releasestreamly for appending a few (say 100) streams because it can fuse via stream fusion. However, it does not scale for a large number of streams (say 1000s) and becomes qudartically slow. Therefore use this for custom appending of a few streams but use ) or 'concatMapWith serial' for appending n, streams or infinite containers of streams. Pre-releasestreamlyAppends two streams sequentially, yielding all elements from the first stream, and then all elements from the second stream. import Streamly.Prelude (serial)stream1 = Stream.fromList [1,2]stream2 = Stream.fromList [3,4](Stream.toList $ stream1 `serial` stream2 [1,2,3,4]This operation can be used to fold an infinite lazy container of streams.Since: 0.2.0 (Streamly)streamlyInterleaves two streams, yielding one element from each stream alternately. When one stream stops the rest of the other stream is used in the output stream.!import Streamly.Prelude (wSerial)stream1 = Stream.fromList [1,2]stream2 = Stream.fromList [3,4]>Stream.toList $ Stream.fromWSerial $ stream1 `wSerial` stream2 [1,3,2,4]Note, for singleton streams  and  are identical.Note that this operation cannot be used to fold a container of infinite streams but it can be used for very large streams as the state that it needs to maintain is proportional to the logarithm of the number of streams.Since: 0.2.0 (Streamly)streamlyInterleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. If any of the streams finishes early the other stream continues alone until it too finishes.:set -XOverloadedStrings'import Data.Functor.Identity (Identity)=Stream.interleave "ab" ",,,," :: Stream.SerialT Identity CharfromList "a,b,,,"=Stream.interleave "abcd" ",," :: Stream.SerialT Identity CharfromList "a,b,cd" is dual to , it can be called  interleaveMax.%Do not use at scale in concatMapWith. Pre-releasestreamlyInterleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. As soon as the first stream finishes, the output stops, discarding the remaining part of the second stream. In this case, the last element in the resulting stream would be from the second stream. If the second stream finishes early then the first stream still continues to yield elements until it finishes.:set -XOverloadedStrings'import Data.Functor.Identity (Identity)Stream.interleaveSuffix "abc" ",,,," :: Stream.SerialT Identity CharfromList "a,b,c,"Stream.interleaveSuffix "abc" "," :: Stream.SerialT Identity CharfromList "a,bc" is a dual of .%Do not use at scale in concatMapWith. Pre-releasestreamlyInterleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream and ending at the first stream. If the second stream is longer than the first, elements from the second stream are infixed with elements from the first stream. If the first stream is longer then it continues yielding elements even after the second stream has finished.:set -XOverloadedStrings'import Data.Functor.Identity (Identity)Stream.interleaveInfix "abc" ",,,," :: Stream.SerialT Identity CharfromList "a,b,c"Stream.interleaveInfix "abc" "," :: Stream.SerialT Identity CharfromList "a,bc" is a dual of .%Do not use at scale in concatMapWith. Pre-releasestreamlyInterleaves the outputs of two streams, yielding elements from each stream alternately, starting from the first stream. The output stops as soon as any of the two streams finishes, discarding the remaining part of the other stream. The last element of the resulting stream would be from the longer stream.:set -XOverloadedStrings'import Data.Functor.Identity (Identity)Stream.interleaveMin "ab" ",,,," :: Stream.SerialT Identity CharfromList "a,b,"Stream.interleaveMin "abcd" ",," :: Stream.SerialT Identity CharfromList "a,b,c" is dual to .%Do not use at scale in concatMapWith. Pre-releasestreamlySchedule the execution of two streams in a fair round-robin manner, executing each stream once, alternately. Execution of a stream may not necessarily result in an output, a stream may chose to Skip producing an element until later giving the other stream a chance to run. Therefore, this combinator fairly interleaves the execution of two streams rather than fairly interleaving the output of the two streams. This can be useful in co-operative multitasking without using explicit threads. This can be used as an alternative to .%Do not use at scale in concatMapWith. Pre-releasestreamlyMerges two streams, both the streams may be evaluated concurrently, outputs from both are used as they arrive:import Streamly.Prelude (async)%stream1 = Stream.fromEffect (delay 4)%stream2 = Stream.fromEffect (delay 2)'Stream.toList $ stream1 `async` stream22 sec4 sec[2,4]Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:%stream3 = Stream.fromEffect (delay 1)7Stream.toList $ stream1 `async` stream2 `async` stream3...[1,2,4]With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3...[2,1,4](With a single thread, it becomes serial:Stream.toList $ Stream.maxThreads 1 $ stream1 `async` stream2 `async` stream3...[4,2,1]Only streams are scheduled for async evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently.In the following example, both the streams are scheduled for concurrent evaluation but each individual stream is evaluated serially:stream1 = Stream.fromListM $ Prelude.map delay [3,3] -- SerialT IO Intstream2 = Stream.fromListM $ Prelude.map delay [1,1] -- SerialT IO Int3Stream.toList $ stream1 `async` stream2 -- IO [Int]... [1,1,3,3]If total threads are 2, the third stream is scheduled only after one of the first two has finished: stream3 = Stream.fromListM $ Prelude.map delay [2,2] -- SerialT IO Int Stream.toList $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3 -- IO [Int]... [1,1,3,2,3,2]Thus  goes deep in first few streams rather than going wide in all streams. It prefers to evaluate the leftmost streams as much as possible. Because of this behavior,  can be safely used to fold an infinite lazy container of streams.Since: 0.2.0 (Streamly)streamlyFor singleton streams,  is the same as . See  for singleton stream behavior. For multi-element streams, while  is left biased i.e. it tries to evaluate the left side stream as much as possible, 5 tries to schedule them both fairly. In other words,  goes deep while = goes wide. However, outputs are always used as they arrive.With a single thread,  starts behaving like  while  starts behaving like .'import Streamly.Prelude (async, wAsync)!stream1 = Stream.fromList [1,2,3]!stream2 = Stream.fromList [4,5,6]Stream.toList $ Stream.fromAsync $ Stream.maxThreads 1 $ stream1 `async` stream2 [1,2,3,4,5,6]Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 1 $ stream1 `wAsync` stream2 [1,4,2,5,3,6]8With two threads available, and combining three streams:!stream3 = Stream.fromList [7,8,9]Stream.toList $ Stream.fromAsync $ Stream.maxThreads 2 $ stream1 `async` stream2 `async` stream3[1,2,3,4,5,6,7,8,9]Stream.toList $ Stream.fromWAsync $ Stream.maxThreads 2 $ stream1 `wAsync` stream2 `wAsync` stream3[1,4,2,7,5,3,8,6,9]This operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams in a round robin manner. Note that WSerialT and single threaded WAsyncT both interleave streams but the exact scheduling is slightly different in both cases.Since: 0.2.0 (Streamly)streamlyAppends two streams, both the streams may be evaluated concurrently but the outputs are used in the same order as the corresponding actions in the original streams, side effects will happen in the order in which the streams are evaluated:(import Streamly.Prelude (ahead, SerialT)7stream1 = Stream.fromEffect (delay 4) :: SerialT IO Int7stream2 = Stream.fromEffect (delay 2) :: SerialT IO Int3Stream.toList $ stream1 `ahead` stream2 :: IO [Int]2 sec4 sec[4,2]Multiple streams can be combined. With enough threads, all of them can be scheduled simultaneously:%stream3 = Stream.fromEffect (delay 1)7Stream.toList $ stream1 `ahead` stream2 `ahead` stream31 sec2 sec4 sec[4,2,1]With 2 threads, only two can be scheduled at a time, when one of those finishes, the third one gets scheduled:Stream.toList $ Stream.maxThreads 2 $ stream1 `ahead` stream2 `ahead` stream32 sec1 sec4 sec[4,2,1]Only streams are scheduled for ahead evaluation, how actions within a stream are evaluated depends on the stream type. If it is a concurrent stream they will be evaluated concurrently. It may not make much sense combining serial streams using . can be safely used to fold an infinite lazy container of streams.Since: 0.3.0 (Streamly)streamlyLike ;E except that the execution is much more strict. There is no limit on the number of threads. While ;E may not schedule a stream if there is no demand from the consumer,  always evaluates both the streams immediately. The only limit that applies to  is ;C:. Evaluation may block if the output buffer becomes full."import Streamly.Prelude (parallel)stream = Stream.fromEffect (delay 2) `parallel` Stream.fromEffect (delay 1) Stream.toList stream -- IO [Int]1 sec2 sec[1,2] guarantees that all the streams are scheduled for execution immediately, therefore, we could use things like starting timers inside the streams and relying on the fact that all timers were started at the same time.Unlike  this operation cannot be used to fold an infinite lazy container of streams, because it schedules all the streams strictly concurrently.Since: 0.2.0 (Streamly)streamlyLike ? but stops the output as soon as any of the two streams stops. Pre-releasestreamlyLike % but with a monadic zipping function.streamlyLike  but zips concurrently i.e. both the streams being zipped are evaluated concurrently using the  ParallelT concurrent evaluation style. The maximum number of elements of each stream evaluated in advance can be controlled by  maxBuffer.The stream ends if stream a or stream b ends. However, if stream b, ends while we are still evaluating stream a and waiting for a result then stream will not end until after the evaluation of stream a finishes. This behavior can potentially be changed in future to end the stream immediately as soon as any of the stream end is detected.streamlyMerge two streams using a comparison function. The head elements of both the streams are compared and the smaller of the two elements is emitted, if both elements are equal then the element from the first stream is used first.If the streams are sorted in ascending order, the resulting stream would also remain sorted in ascending order. >>> Stream.toList $ Stream.mergeBy compare (Stream.fromList [1,3,5]) (Stream.fromList [2,4,6,8]) [1,2,3,4,5,6,8]  See also: streamlyLike ( but with a monadic comparison function.Merge two streams randomly: > randomly _ _ = randomIO >>= x -> return $ if x then LT else GT > Stream.toList $ Stream.mergeByM randomly (Stream.fromList [1,1,1,1]) (Stream.fromList [2,2,2,2]) [2,1,2,2,2,1,1,1] )Merge two streams in a proportion of 2:1: >>> :{ do let proportionately m n = do ref <- newIORef $ cycle $ Prelude.concat [Prelude.replicate m LT, Prelude.replicate n GT] return $ _ _ -> do r <- readIORef ref writeIORef ref $ Prelude.tail r return $ Prelude.head r f <- proportionately 2 1 xs <- Stream.toList $ Stream.mergeByM f (Stream.fromList [1,1,1,1,1,1]) (Stream.fromList [2,2,2]) print xs :} [1,1,2,1,1,2,1,1,2]  See also: streamlyLike  but much faster, works best when merging statically known number of streams. When merging more than two streams try to merge pairs and pair pf pairs in a tree like structure. works better with variable number of streams being merged using .InternalstreamlyLike ; but stops merging as soon as any of the two streams stops. UnimplementedstreamlyLike 5 but stops merging as soon as the first stream stops. UnimplementedstreamlySame as   .Stream.toList $ Stream.merge (Stream.fromList [1,3,5]) (Stream.fromList [2,4,6,8])[1,2,3,4,5,6,8]InternalstreamlyLike  but merges concurrently (i.e. both the elements being merged are generated concurrently).streamlyLike  but merges concurrently (i.e. both the elements being merged are generated concurrently).streamlyLike  but uses an   for stream generation. Unlike  this can fuse the   code with the inner loop and therefore provide many times better performance.streamlyLike 1 but interleaves the streams in the same way as # behaves instead of appending them. Pre-releasestreamlyLike . but executes the streams in the same way as . Pre-releasestreamlyUnfold the elements of a stream, intersperse the given element between the unfolded streams and then concat them into a single stream. unwords = S.interpose ' ' Pre-releasestreamlyUnfold the elements of a stream, append the given element after each unfolded stream and then concat them into a single stream.  unlines = S.interposeSuffix '\n' Pre-releasestreamly followed by unfold and concat. Pre-releasestreamly intersperse followed by unfold and concat. intercalate unf a str = unfoldMany unf $ intersperse a str intersperse = intercalate (Unfold.function id) unwords = intercalate Unfold.fromList " "Stream.toList $ Stream.intercalate Unfold.fromList " " $ Stream.fromList ["abc", "def", "ghi"] "abc def ghi"streamly followed by unfold and concat. Pre-releasestreamlyintersperseMSuffix followed by unfold and concat. intercalateSuffix unf a str = unfoldMany unf $ intersperseMSuffix a str intersperseMSuffix = intercalateSuffix (Unfold.function id) unlines = intercalateSuffix Unfold.fromList "\n"Stream.toList $ Stream.intercalateSuffix Unfold.fromList "\n" $ Stream.fromList ["abc", "def", "ghi"]"abc\ndef\nghi\n"streamly/Flatten a stream of streams to a single stream. concat = concatMap id  Pre-releasestreamlyLike  concatMapWith but carries a state which can be used to share information across multiple steps of concat. concatSmapMWith combine f initial = concatMapWith combine id . smapM f initial  Pre-releasestreamlyCombine streams in pairs using a binary stream combinator, then combine the resulting streams in pairs recursively until we get to a single combined stream.>For example, you can sort a stream using merge sort like this:Stream.toList $ Stream.concatPairsWith (Stream.mergeBy compare) Stream.fromPure $ Stream.fromList [5,1,7,9,2] [1,2,5,7,9]-Caution: the stream of streams must be finite Pre-releasestreamlyLike iterateM> but iterates after mapping a stream generator on the output.Yield an input element in the output stream, map a stream generator on it and then do the same on the resulting stream. This can be used for a depth first traversal of a tree like structure. Note that iterateM is a special case of : iterateM f = iterateMapWith serial (fromEffect . f) . fromEffect It can be used to traverse a tree structure. For example, to list a directory tree: Stream.iterateMapWith Stream.serial (either Dir.toEither (const nil)) (fromPure (Left "tmp"))  Pre-releasestreamlySame as iterateMapWith Stream.serial* but more efficient due to stream fusion. UnimplementedstreamlyLike  iterateMap but carries a state in the stream generation function. This can be used to traverse graph like structures, we can remember the visited nodes in the state to avoid cycles.Note that a combination of  iterateMap and  usingState can also be used to traverse graphs. However, this function provides a more localized state instead of using a global state. See also: mfix Pre-releasestreamlyIn an   stream iterate on  s. This is a special case of : iterateMapLeftsWith combine f = iterateMapWith combine (either f (const nil)) To traverse a directory tree: iterateMapLeftsWith serial Dir.toEither (fromPure (Left "tmp"))  Pre-release5666666Y!(c) 2017 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:f* streamlytime since last event streamlytime as per last event We can use the Map size instead of maintaining a count, but if we have to switch to HashMap then it can be useful. streamly$total number of sessions in progress streamlyheap for timeouts streamlyStored sessions for keys streamlyCompleted sessionsstreamly-Drop prefix from the input stream if present.Space: O(1) UnimplementedstreamlyDrop all matching infix from the input stream if present. Infix stream may be consumed multiple times.Space: O(n)$ where n is the length of the infix. UnimplementedstreamlyDrop suffix from the input stream if present. Suffix stream may be consumed multiple times.Space: O(n)% where n is the length of the suffix. UnimplementedstreamlyApply a  repeatedly on a stream and emit the fold outputs in the output stream.1To sum every two contiguous elements in a stream:f = Fold.take 2 Fold.sum;Stream.toList $ Stream.foldMany f $ Stream.fromList [1..10][3,7,11,15,19]'On an empty stream the output is empty:6Stream.toList $ Stream.foldMany f $ Stream.fromList [][]Note Stream.foldMany (Fold.take 0)9 would result in an infinite loop in a non-empty stream.streamlyLike  but using the   type instead of . Pre-releasestreamlyApply a stream of folds to an input stream and emit the results in the output stream. Unimplementedstreamly8Iterate a fold generator on a stream. The initial value b is used to generate the first fold, the fold is applied on the stream and the result of the fold is used to generate the next fold and so on. >>> import Data.Monoid (Sum(..)) >>> f x = return (Fold.take 2 (Fold.sconcat x)) >>> s = Stream.map Sum $ Stream.fromList [1..10] >>> Stream.toList $ Stream.map getSum $ Stream.foldIterateM f (pure 0) s [3,10,21,36,55,55] This is the streaming equivalent of monad like sequenced application of folds where next fold is dependent on the previous fold. Pre-releasestreamlyLike  but using the   type instead. This could be much more efficient due to stream fusion.InternalstreamlyApply a   repeatedly on a stream and emit the parsed values in the output stream.(This is the streaming equivalent of the Z[ parse combinator.Stream.toList $ Stream.parseMany (Parser.takeBetween 0 2 Fold.sum) $ Stream.fromList [1..10],[Right 3,Right 7,Right 11,Right 15,Right 19] > Stream.toList $ Stream.parseMany (Parser.line Fold.toList) $ Stream.fromList "hello\nworld" ["hello\n","world"]  $foldMany f = parseMany (fromFold f) Known Issues: When the parser fails there is no way to get the remaining stream. Pre-releasestreamly*Same as parseMany but for StreamD streams.InternalstreamlyApply a stream of parsers to an input stream and emit the results in the output stream. Unimplementedstreamly!parseManyTill collect test stream tries the parser test on the input, if test fails it backtracks and tries collect, after collect succeeds test1 is tried again and so on. The parser stops when test succeeds. The output of test is discarded and the output of collect7 is emitted in the output stream. The parser fails if collect fails. UnimplementedstreamlyIterate a parser generating function on a stream. The initial value b is used to generate the first parser, the parser is applied on the stream and the result is used to generate the next parser and so on.import Data.Monoid (Sum(..))Stream.toList $ fmap getSum $ Stream.rights $ Stream.parseIterate (\b -> Parser.takeBetween 0 2 (Fold.sconcat b)) (Sum 0) $ fmap Sum $ Stream.fromList [1..10][3,10,21,36,55,55]This is the streaming equivalent of monad like sequenced application of parsers where next parser is dependent on the previous parser. Pre-releasestreamly'groupsBy cmp f $ S.fromList [a,b,c,...] assigns the element a to the first group, if  b `cmp` a is  then b* is also assigned to the same group. If  c `cmp` a is  then c is also assigned to the same group and so on. When the comparison fails a new group is started. Each group is folded using the fold f= and the result of the fold is emitted in the output stream.Stream.toList $ Stream.groupsBy (>) Fold.toList $ Stream.fromList [1,3,7,0,2,5][[1,3,7],[0,2,5]]streamlyUnlike groupsBy this function performs a rolling comparison of two successive elements in the input stream. /groupsByRolling cmp f $ S.fromList [a,b,c,...] assigns the element a to the first group, if  a `cmp` b is  then b) is also assigned to the same group. If  b `cmp` c is  then c is also assigned to the same group and so on. When the comparison fails a new group is started. Each group is folded using the fold f.Stream.toList $ Stream.groupsByRolling (\a b -> a + 1 == b) Fold.toList $ Stream.fromList [1,2,3,7,8,9][[1,2,3],[7,8,9]]streamly 4groups = groupsBy (==) groups = groupsByRolling (==)Groups contiguous spans of equal elements together in individual groups.Stream.toList $ Stream.groups Fold.toList $ Stream.fromList [1,1,2,2] [[1,1],[2,2]]streamlySplit on an infixed separator element, dropping the separator. The supplied  is applied on the split segments. Splits the stream on separator elements determined by the supplied predicate, separator is considered as infixed between two segments:splitOn' p xs = Stream.toList $ Stream.splitOn p Fold.toList (Stream.fromList xs)splitOn' (== '.') "a.b" ["a","b"];An empty stream is folded to the default value of the fold:splitOn' (== '.') ""[""]If one or both sides of the separator are missing then the empty segment on that side is folded to the default output of the fold:splitOn' (== '.') "."["",""]splitOn' (== '.') ".a"["","a"]splitOn' (== '.') "a."["a",""]splitOn' (== '.') "a..b" ["a","","b"]6splitOn is an inverse of intercalating single element: Stream.intercalate (Stream.fromPure '.') Unfold.fromList . Stream.splitOn (== '.') Fold.toList === id9Assuming the input stream does not contain the separator: Stream.splitOn (== '.') Fold.toList . Stream.intercalate (Stream.fromPure '.') Unfold.fromList === idstreamlySplit on a suffixed separator element, dropping the separator. The supplied " is applied on the split segments.splitOnSuffix' p xs = Stream.toList $ Stream.splitOnSuffix p Fold.toList (Stream.fromList xs)splitOnSuffix' (== '.') "a.b." ["a","b"]splitOnSuffix' (== '.') "a."["a"]2An empty stream results in an empty output stream:splitOnSuffix' (== '.') ""[]An empty segment consisting of only a suffix is folded to the default output of the fold:splitOnSuffix' (== '.') "."[""] splitOnSuffix' (== '.') "a..b.."["a","","b",""].A suffix is optional at the end of the stream:splitOnSuffix' (== '.') "a"["a"]splitOnSuffix' (== '.') ".a"["","a"]splitOnSuffix' (== '.') "a.b" ["a","b"] lines = splitOnSuffix (== '\n') is an inverse of intercalateSuffix with a single element: Stream.intercalateSuffix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnSuffix (== '.') Fold.toList === id9Assuming the input stream does not contain the separator: Stream.splitOnSuffix (== '.') Fold.toList . Stream.intercalateSuffix (Stream.fromPure '.') Unfold.fromList === idstreamlySplit on a prefixed separator element, dropping the separator. The supplied " is applied on the split segments. > splitOnPrefix' p xs = Stream.toList $ Stream.splitOnPrefix p (Fold.toList) (Stream.fromList xs) > splitOnPrefix' (==  ) ".a.b" ["a","b"] 4An empty stream results in an empty output stream:  > splitOnPrefix' (==   ) "" [] An empty segment consisting of only a prefix is folded to the default output of the fold: > splitOnPrefix' (==  !) "." [""] > splitOnPrefix' (==  -) ".a.b." ["a","b",""] > splitOnPrefix' (==  ) ".a..b" ["a","","b"] 4A prefix is optional at the beginning of the stream: > splitOnPrefix' (==  ") "a" ["a"] > splitOnPrefix' (==  ) "a.b" ["a","b"]  is an inverse of intercalatePrefix with a single element: Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnPrefix (== '.') Fold.toList === id9Assuming the input stream does not contain the separator: Stream.splitOnPrefix (== '.') Fold.toList . Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList === id UnimplementedstreamlyLike  after stripping leading, trailing, and repeated separators. Therefore, ".a..b." with  & as the separator would be parsed as  ["a","b"]. In other words, its like parsing words from whitespace separated text.wordsBy' p xs = Stream.toList $ Stream.wordsBy p Fold.toList (Stream.fromList xs)wordsBy' (== ',') ""[]wordsBy' (== ',') ","[]wordsBy' (== ',') ",a,,b," ["a","b"] words = wordsBy isSpacestreamlyLike 8 but keeps the suffix attached to the resulting splits.splitWithSuffix' p xs = Stream.toList $ splitWithSuffix p Fold.toList (Stream.fromList xs)splitWithSuffix' (== '.') ""[]splitWithSuffix' (== '.') "."["."]splitWithSuffix' (== '.') "a"["a"]splitWithSuffix' (== '.') ".a" [".","a"]splitWithSuffix' (== '.') "a."["a."]splitWithSuffix' (== '.') "a.b" ["a.","b"] splitWithSuffix' (== '.') "a.b." ["a.","b."]"splitWithSuffix' (== '.') "a..b.."["a.",".","b.","."]streamly'Split on any one of the given patterns. UnimplementedstreamlyLike 5 but splits the separator as well, as an infix token.splitOn'_ pat xs = Stream.toList $ Stream.splitBySeq (Array.fromList pat) Fold.toList (Stream.fromList xs)splitOn'_ "" "hello"!["h","","e","","l","","l","","o"]splitOn'_ "hello" ""[""]splitOn'_ "hello" "hello"["","hello",""]splitOn'_ "x" "hello" ["hello"]splitOn'_ "h" "hello"["","h","ello"]splitOn'_ "o" "hello"["hell","o",""]splitOn'_ "e" "hello"["h","e","llo"]splitOn'_ "l" "hello"["he","l","","l","o"]splitOn'_ "ll" "hello"["he","ll","o"] Pre-releasestreamlyLike  splitSuffixBy but the separator is a sequence of elements, instead of a predicate for a single element.splitOnSuffixSeq_ pat xs = Stream.toList $ Stream.splitOnSuffixSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)splitOnSuffixSeq_ "." ""[]splitOnSuffixSeq_ "." "."[""]splitOnSuffixSeq_ "." "a"["a"]splitOnSuffixSeq_ "." ".a"["","a"]splitOnSuffixSeq_ "." "a."["a"]splitOnSuffixSeq_ "." "a.b" ["a","b"]splitOnSuffixSeq_ "." "a.b." ["a","b"]splitOnSuffixSeq_ "." "a..b.."["a","","b",""] lines = splitOnSuffixSeq "\n" is an inverse of intercalateSuffix". The following law always holds: *intercalateSuffix . splitOnSuffixSeq == idThe following law holds when the separator is non-empty and contains none of the elements present in the input lists: 'splitSuffixOn . intercalateSuffix == idsplitOnSuffixSeq pat f = Stream.foldMany (Fold.takeEndBySeq_ pat f) Pre-releasestreamlyLike  but drops any empty splits. UnimplementedstreamlyLike + but keeps the suffix intact in the splits.splitWithSuffixSeq' pat xs = Stream.toList $ Stream.splitWithSuffixSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)splitWithSuffixSeq' "." ""[]splitWithSuffixSeq' "." "."["."]splitWithSuffixSeq' "." "a"["a"]splitWithSuffixSeq' "." ".a" [".","a"]splitWithSuffixSeq' "." "a."["a."]splitWithSuffixSeq' "." "a.b" ["a.","b"]splitWithSuffixSeq' "." "a.b." ["a.","b."] splitWithSuffixSeq' "." "a..b.."["a.",".","b.","."]splitWithSuffixSeq pat f = Stream.foldMany (Fold.takeEndBySeq pat f) Pre-releasestreamly)Split post any one of the given patterns. Unimplementedstreamly&Group the input stream into groups of n elements each and then fold each group using the provided fold function.Stream.toList $ Stream.chunksOf 2 Fold.sum (Stream.enumerateFromTo 1 10)[3,7,11,15,19]/This can be considered as an n-fold version of   where we apply  = repeatedly on the leftover stream until the stream exhausts. %chunksOf n f = foldMany (FL.take n f)streamlyarraysOf n stream9 groups the elements in the input stream into arrays of n elements each.0Same as the following but may be more efficient: )arraysOf n = Stream.foldMany (A.writeN n) Pre-releasestreamly'Group the input stream into windows of n second each and then fold each group using the provided fold function.Stream.toList $ Stream.take 5 $ Stream.intervalsOf 1 Fold.sum $ Stream.constRate 2 $ Stream.enumerateFrom 1[...,...,...,...,...]streamlyLike  but if the chunk is not completed within the specified time interval then emit whatever we have collected till now. The chunk timeout is reset whenever a chunk is emitted. The granularity of the clock is 100 ms.4s = Stream.delayPost 0.3 $ Stream.fromList [1..1000]f = Stream.mapM_ print $ Stream.chunksOfTimeout 5 1 Fold.toList s Pre-releasestreamly?classifySessionsBy tick keepalive predicate timeout fold stream classifies an input event stream consisting of (timestamp, (key, value)) into sessions based on the key, folding all the values corresponding to the same key into a session using the supplied fold.When the fold terminates or a timeout occurs, a tuple consisting of the session key and the folded value is emitted in the output stream. The timeout is measured from the first event in the session. If the  keepalive option is set to : the timeout is reset to 0 whenever an event is received.The  timestamp in the input stream is an absolute time from some epoch, characterizing the time when the input event was generated. The notion of current time is maintained by a monotonic event time clock using the timestamps seen in the input stream. The latest timestamp seen till now is used as the base for the current time. When no new events are seen, a timer is started with a clock resolution of tick seconds. This timer is used to detect session timeouts in the absence of new events.To ensure an upper bound on the memory used the number of sessions can be limited to an upper bound. If the ejection  predicate returns , the oldest session is ejected before inserting a new session.When the stream ends any buffered sessions are ejected immediately.If a session key is received even after a session has finished, another session is created for that key.:{ Stream.mapM_ print $ Stream.classifySessionsBy 1 False (const (return False)) 3 (Fold.take 3 Fold.toList) $ Stream.timestamped $ Stream.delay 0.19 $ Stream.fromList ((,) <$> [1,2,3] <*> ['a','b','c']):} (1,"abc") (2,"abc") (3,"abc") Pre-releasestreamlySame as < with a timer tick of 1 second and keepalive option set to . 6classifyKeepAliveSessions = classifySessionsBy 1 True  Pre-releasestreamlySame as < with a timer tick of 1 second and keepalive option set to . 0classifySessionsOf = classifySessionsBy 1 False  Pre-releasestreamly#splitInnerBy splitter joiner stream splits the inner containers f a of an input stream  t m (f a) using the splitter function. Container elements f a are collected until a split occurs, then all the elements before the split are joined using the joiner function.$For example, if we have a stream of  Array Word8, we may want to split the stream into arrays representing lines separated by 'n' byte such that the resulting stream after a split would be one array for each line.CAUTION! This is not a true streaming function as the container size after the split and merge may not be bounded. Pre-releasestreamlyLike  but splits assuming the separator joins the segment in a suffix style. Pre-releasestreamlytimer tick in secondsstreamly)reset the timer when an event is receivedstreamly2predicate to eject sessions based on session countstreamlysession timeout in secondsstreamly"Fold to be applied to session datastreamly×tamp, (session key, session data)streamlysession key, fold resultstreamlytimer tick in secondsstreamly)reset the timer when an event is receivedstreamly2predicate to eject sessions based on session countstreamlysession timeout in secondsstreamly"Fold to be applied to session datastreamly×tamp, (session key, session data)streamlysession key, fold resultstreamly,predicate to eject sessions on session countstreamlysession inactive timeoutstreamly*Fold to be applied to session payload datastreamly×tamp, (session key, session data)streamly,predicate to eject sessions on session countstreamlytime window sizestreamly"Fold to be applied to session datastreamly×tamp, (session key, session data)'\!(c) 2020 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:streamlysampleFromthen offset stride samples the element at offset- index and then every element at strides of stride.Stream.toList $ Stream.sampleFromThen 2 3 $ Stream.enumerateFromTo 0 10[2,5,8] Pre-releasestreamlyContinuously evaluate the input stream and sample the last event in time window of n seconds.This is also known as throttle in some libraries. sampleIntervalEnd n = Stream.catMaybes . Stream.intervalsOf n Fold.last  Pre-releasestreamlyLike sampleInterval1 but samples at the beginning of the time window. sampleIntervalStart n = Stream.catMaybes . Stream.intervalsOf n Fold.one  Pre-releasestreamlySample one event at the end of each burst of events. A burst is a group of events close together in time, it ends when an event is spaced by more than the specified time interval (in seconds) from the previous event.This is known as debounce in some libraries.The clock granularity is 10 ms. Pre-releasestreamlyLike  but samples the event at the beginning of the burst instead of at the end of it. Pre-releasestreamly;Sort the input stream using a supplied comparison function. O(n) spaceNote: this is not the fastest possible implementation as of now. Pre-releasestreamlyThis is the same as ]^ but less efficient.The second stream is evaluated multiple times. If the second stream is consume-once stream then it can be cached in an _` before calling this function. Caching may also improve performance if the stream is expensive to evaluate.Time: O(m x n) Pre-releasestreamlyFor all elements in t m a, for all elements in t m b if a and b are equal by the given equality pedicate then return the tuple (a, b).The second stream is evaluated multiple times. If the stream is a consume-once stream then the caller should cache it (e.g. in a _`) before calling this function. Caching may also improve performance if the stream is expensive to evaluate.For space efficiency use the smaller stream as the second stream.You should almost always use joinInnerMap instead of joinInner. joinInnerMap is an order of magnitude faster. joinInner may be used when the second stream is generated from a seed, therefore, need not be stored in memory and the amount of memory it takes is a concern.;Space: O(n) assuming the second stream is cached in memory.Time: O(m x n) Pre-releasestreamlyLike  but uses a Map for efficiency.If the input streams have duplicate keys, the behavior is undefined.For space efficiency use the smaller stream as the second stream. Space: O(n)Time: O(m + n) Pre-releasestreamlyLike " but works only on sorted streams. Space: O(1)Time: O(m + n) UnimplementedstreamlyLike joinLeft# but uses a hashmap for efficiency. Space: O(n)Time: O(m + n) Pre-releasestreamlyLike joinLeft" but works only on sorted streams. Space: O(1)Time: O(m + n) UnimplementedstreamlyLike  joinOuter but uses a Map for efficiency.Space: O(m + n)Time: O(m + n) Pre-releasestreamlyLike  joinOuter" but works only on sorted streams. Space: O(1)Time: O(m + n) Unimplementedstreamly is essentially a filtering operation that retains only those elements in the first stream that are present in the second stream.Stream.toList $ Stream.intersectBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3])[1,2,2]Stream.toList $ Stream.intersectBy (==) (Stream.fromList [2,1,1,3]) (Stream.fromList [1,2,2,4])[2,1,1]# is similar to but not the same as :Stream.toList $ fmap fst $ Stream.joinInner (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [2,1,1,3]) [1,1,2,2]Space: O(n) where n0 is the number of elements in the second stream.Time: O(m x n) where m4 is the number of elements in the first stream and n0 is the number of elements in the second stream. Pre-releasestreamlyLike 5 but works only on streams sorted in ascending order. Space: O(1) Time: O(m+n) Pre-releasestreamlyDelete first occurrences of those elements from the first stream that are present in the second stream. If an element occurs multiple times in the second stream as many occurrences of it are deleted from the first stream.Stream.toList $ Stream.differenceBy (==) (Stream.fromList [1,2,2]) (Stream.fromList [1,2,3])[2]The following laws hold: (s1 serial% s2) `differenceBy eq` s1 === s2 (s1 wSerial! s2) `differenceBy eq` s1 === s2 Same as the list ab operation.Space: O(m) where m/ is the number of elements in the first stream.Time: O(m x n) where m4 is the number of elements in the first stream and n0 is the number of elements in the second stream. Pre-releasestreamlyLike " but works only on sorted streams. Space: O(1) UnimplementedstreamlyThis is essentially an append operation that appends all the extra occurrences of elements from the second stream that are not already present in the first stream.Stream.toList $ Stream.unionBy (==) (Stream.fromList [1,2,2,4]) (Stream.fromList [1,1,2,3]) [1,2,2,4,3](Equivalent to the following except that s1 is evaluated only once: 9unionBy eq s1 s2 = s1 `serial` (s2 `differenceBy eq` s1)  Similar to  joinOuter but not the same. Space: O(n)Time: O(m x n) Pre-releasestreamlyLike " but works only on sorted streams. Space: O(1) Unimplemented!(c) 2017 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:#@$+/.,-#@$+,-./c!(c) 2019 Composewell TechnologiesBSD3streamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:09streamlyRun the alloc action a -> m c with async exceptions disabled but keeping blocking operations interruptible (see 78). Use the output c as input to  Unfold m c b to generate an output stream. When unfolding use the supplied try operation forall s. m s -> m (Either e s) to catch synchronous exceptions. If an exception occurs run the exception handling unfold Unfold m (c, e) b.The cleanup action c -> m d, runs whenever the stream ends normally, due to a sync or async exception or if it gets garbage collected after a partial lazy evaluation. See ) for the semantics of the cleanup action.6 can express all other exception handling combinators.Inhibits stream fusion Pre-releasestreamlyUnfold the input a using  Unfold m a b, run an action on a whenever the unfold stops normally, or if it is garbage collected after a partial lazy evaluation.The semantics of the action a -> m c1 are similar to the cleanup action semantics in . See also   Pre-releasestreamlyUnfold the input a using  Unfold m a b, run an action on a whenever the unfold stops normally, aborts due to an exception or if it is garbage collected after a partial lazy evaluation.The semantics of the action a -> m c1 are similar to the cleanup action semantics in . )finally release = bracket return release  See also  Inhibits stream fusion Pre-releasestreamlyRun the alloc action a -> m c with async exceptions disabled but keeping blocking operations interruptible (see 78). Use the output c as input to  Unfold m c b to generate an output stream.c0 is usually a resource under the state of monad m, e.g. a file handle, that requires a cleanup after use. The cleanup action c -> m d, runs whenever the stream ends normally, due to a sync or async exception or if it gets garbage collected after a partial lazy evaluation. only guarantees that the cleanup action runs, and it runs with async exceptions enabled. The action must ensure that it can successfully cleanup the resource in the face of sync or async exceptions.When the stream ends normally or on a sync exception, cleanup action runs immediately in the current thread context, whereas in other cases it runs in the GC context, therefore, cleanup may be delayed until the GC gets to run. See also:  , Inhibits stream fusion Pre-releasestreamlybeforestreamlyafter, on normal stop, or GCstreamly on exceptionstreamlytry (exception handling)streamly unfold to rund!(c) 2019 Composewell TechnologiesBSD3streamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:1streamlyInternalstreamlyInternal!(c) 2022 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:1!(c) 2020 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHCNone! $%'.15789:^!<streamlyAn Event generated by the file system. Use the accessor functions to examine the event. Pre-release streamlyA handle for a watch.streamly*What to do if a watch already exists when   or  is called for a path. Pre-releasestreamly"Do not set an existing setting to  only set to streamly/Replace the existing settings with new settingsstreamly Fail the APIstreamlyWatch configuration, used to specify the events of interest and the behavior of the watch. Pre-releasestreamlyWatch the whole directory tree recursively instead of watching just one level of directory.default: False Pre-releasestreamlyIf the pathname to be watched is a symbolic link then watch the target of the symbolic link instead of the symbolic link itself.Note that the path location in the events is through the original symbolic link path rather than the resolved path. default: True Pre-releasestreamlyIf an object moves out of the directory being watched then stop watching it. default: True Pre-releasestreamlyWhen adding a new path to the watch, specify what to do if a watch already exists on that path.default: FailIfExists Pre-releasestreamlyWatch the object only for one event and then remove it from the watch.default: False Pre-releasestreamlyWatch the object only if it is a directory. This provides a race-free way to ensure that the watched object is a directory.default: False Pre-releasestreamly1Report when the watched path itself gets deleted. default: True Pre-releasestreamly6Report when the watched root path itself gets renamed. default: True Pre-releasestreamlyReport when the watched root path itself gets deleted or renamed. default: True Pre-releasestreamlyReport when the metadata e.g. owner, permission modes, modifications times of an object changes. default: True Pre-releasestreamlyReport when a file is accessed. default: True Pre-releasestreamlyReport when a file is opened. default: True Pre-releasestreamly8Report when a file that was opened for writes is closed. default: True Pre-releasestreamly=Report when a file that was opened for not writing is closed. default: True Pre-releasestreamlyReport when a file is created. default: True Pre-releasestreamlyReport when a file is deleted. default: True Pre-releasestreamlyReport the source of a move. default: True Pre-releasestreamlyReport the target of a move. default: True Pre-releasestreamlyReport when a file is modified. default: True Pre-releasestreamlySet all tunable events  or . Equivalent to setting: setRootDeleted setRootMovedsetAttrsModified setAccessed setOpenedsetWriteClosedsetNonWriteClosed setCreated setDeleted setMovedFrom setMovedTo setModified Pre-releasestreamly'The default configuration settings are:     *The tunable events enabled by default are:setCreated TruesetDeleted TruesetMovedFrom TruesetMovedTo TruesetModified True Pre-release streamly Create a   handle. 9 can be used to add paths being monitored by this watch. Pre-release streamlyAdd a trailing "/" at the end of the path if there is none. Do not add a "/" if the path is empty.streamly!addToWatch cfg watch root subpath adds subpath- to the list of paths being monitored under root via the watch handle watch. root must be an absolute path and subpath must be relative to root. Pre-releasestreamly$Remove an absolute root path from a  , if a path was moved after adding you need to provide the original path which was used to add the Watch. Pre-release streamlyGiven a  and list of paths ("/" separated byte arrays) start monitoring the paths for file system events. Returns a   handle which can then be used to read the event stream or to close the watch. Pre-release streamlyClose a   handle. Pre-releasestreamlyStart monitoring a list of file system paths for file system events with the supplied configuration operation over the . The paths could be files or directories. When recursive mode is set and the path is a directory, the whole directory tree under it is watched recursively. Monitoring starts from the current time onwards. The paths are specified as UTF-8 encoded " of  .Non-existing Paths:< the API fails if a watch is started on a non-exsting path. Performance: Note that recursive watch on a large directory tree could be expensive. When starting a watch, the whole tree must be read and watches are started on each directory in the tree. The initial time to start the watch as well as the memory required is proportional to the number of directories in the tree.Bugs: When new directories are created under the tree they are added to the watch on receiving the directory create event. However, the creation of a dir and adding a watch for it is not atomic. The implementation takes care of this and makes sure that watches are added for all directories. However, In the mean time, the directory may have received more events which may get lost. Handling of any such lost events is yet to be implemented.See the Linux inotify man page for more details. watchwith ( True . $ False) [Array.fromList "dir"]  Pre-releasestreamlySame as  using  and recursive mode.2watchRecursive = watchWith (setRecursiveMode True)See ; for pitfalls and bugs when using recursive watch on Linux. Pre-releasestreamlySame as , using defaultConfig and non-recursive mode.watch = watchWith id Pre-releasestreamly(Get the watch root corresponding to the .Note that if a path was moved after adding to the watch, this will give the original path and not the new path after moving.TBD: we can possibly update the watch root on a move self event. Pre-releasestreamlyGet the file system object path for which the event is generated, relative to the watched root. The path is a "/" separated array of bytes. Pre-releasestreamlyGet the absolute file system object path for which the event is generated.When the watch root is a symlink, the absolute path returned is via the original symlink and not through the resolved path. Pre-releasestreamlyCookie is set when a rename occurs. The cookie value can be used to connect the  and  events, if both the events belong to the same move operation then they will have the same cookie value. Pre-releasestreamlyEvent queue overflowed (WD is invalid for this event) and we may have lost some events.. The user application must scan everything under the watched paths to know the current state. Pre-releasestreamly3A path was removed from the watch explicitly using  or automatically (file was deleted, or filesystem was unmounted).Note that in recursive watch mode all the subdirectories are watch roots, therefore, they will all generate this event.Occurs only for a watched path Pre-releasestreamlyWatched file/directory was itself deleted. (This event also occurs if an object is moved to another filesystem, since mv(1) in effect copies the file to the other filesystem and then deletes it from the original filesystem.) In addition, an  event will subsequently be generated for the watch descriptor.Note that in recursive watch mode all the subdirectories are watch roots, therefore, they will all generate this event.Occurs only for a watched path Pre-releasestreamly?Watched file/directory was itself moved within the file system.Note that in recursive watch mode all the subdirectories are watch roots, therefore, they will all generate this event.Occurs only for a watched path Pre-releasestreamlyFilesystem containing watched object was unmounted. In addition, an  event will subsequently be generated for the watch descriptor.Occurs only for a watched path Pre-releasestreamlyDetermine whether the event indicates a change of path of the monitored object itself. Note that the object may become unreachable or deleted after a change of path.Occurs only for a watched path Pre-releasestreamlyDetermine whether the event indicates inode metadata change for an object contained within the monitored path.Metadata change may include, permissions (e.g., chmod(2)), timestamps (e.g., utimensat(2)), extended attributes (setxattr(2)), link count (since Linux 2.6.25; e.g., for the target of link(2) and for unlink(2)), and user/group ID (e.g., chown(2))..Can occur for watched path or a file inside it Pre-releasestreamly&File was accessed (e.g. read, execve).3Occurs only for a file inside the watched directory Pre-releasestreamlyFile or directory was opened.3Occurs only for a file inside the watched directory Pre-releasestreamly#File opened for writing was closed.3Occurs only for a file inside the watched directory Pre-releasestreamly;File or directory opened for read but not write was closed..Can occur for watched path or a file inside it Pre-releasestreamlyFile/directory created in watched directory (e.g., open(2) O_CREAT, mkdir(2), link(2), symlink(2), bind(2) on a UNIX domain socket).6Occurs only for an object inside the watched directory Pre-releasestreamly.File/directory deleted from watched directory.6Occurs only for an object inside the watched directory Pre-releasestreamlyGenerated for the original path when an object is moved from under a monitored directory.6Occurs only for an object inside the watched directory Pre-releasestreamlyGenerated for the new path when an object is moved under a monitored directory.6Occurs only for an object inside the watched directory Pre-releasestreamlyGenerated for a path that is moved from or moved to the monitored directory.+isMoved ev = isMovedFrom ev || isMovedTo ev6Occurs only for an object inside the watched directory Pre-releasestreamlyDetermine whether the event indicates modification of an object within the monitored path. This event is generated only for files and not directories.6Occurs only for an object inside the watched directory Pre-releasestreamly4Determine whether the event is for a directory path. Pre-releasestreamly Convert an # record to a String representation.!(c) 2020 Composewell Technologies BSD-3-Clausestreamly@composewell.com pre-releaseGHC Safe-Inferred! $%'.15789:m streamlyStart monitoring a list of directories or symbolic links to directories for file system events. Monitoring starts from the current time onwards. The paths are specified as UTF-8 encoded " of  .If a watch root is a symbolic link then the target of the link is watched. Fails if the watched path does not exist. If the user does not have permissions (read and execute?) on the watch root then no events are generated. No events are generated if the watch root itself is renamed or deleted.This API watches for changes in the watch root directory only, any changes in the subdirectories of the watch root are not watched. However, on macOS the watch is always recursive, but do not rely on that behavior, it may change without notice in future. If you want to use recursive watch please use platform specific modules. Pre-release streamlyLike  except that if a watched path is a directory the whole directory tree under it is watched recursively. On Linux watchRecursive may be more expensive than . Pre-releasestreamlyGet the absolute path of the file system object for which the event is generated. The path is a UTF-8 encoded array of bytes.When the watch root is a symlink the behavior is different on different platforms:On Linux and Windows, the absolute path returned is via the original symlink.On macOS the absolute path returned is via the real path of the root after resolving the symlink.This API is subject to removal in future, to be replaced by a platform independent  getRelPath. Pre-releasestreamlyDetermine whether the event indicates creation of an object within the monitored path. This event is generated when any file system object is created.For hard links the behavior is different on different operating systems. On macOS hard linking does not generate a create event, it generates an isInodeAttrsChanged event on the directory instead (see the Darwin module). On Linux and Windows hard linking generates a create event. Pre-releasestreamlyDetermine whether the event indicates deletion of an object within the monitored path. On Linux and Windows hard link deletion generates a delete event.On Linux and Windows, this event does not occur when the watch root itself is deleted. On macOS it occurs on deleting the watch root when it is not a symbolic link. See also  isRootDeleted event for Linux. Pre-releasestreamlyDetermine whether the event indicates rename of an object within the monitored path. This event is generated when an object is renamed within the watched directory or if it is moved out of or in the watched directory. Moving hard links is no different than other types of objects. Pre-releasestreamlyDetermine whether the event indicates modification of an object within the monitored path. This event is generated on file modification on all platforms.On Linux and macOS this event is never generated for directories. On Windows (in recursive watch mode) this event is generated for directories as well when an object is created in or deleted from the directory. Pre-releasestreamlyAn event that indicates that some events before this may have been lost, therefore, we need to take some recovery action. Pre-releasestreamly Convert an  record to a String representation. Note that the output of this function may be different on different platforms because it may contain platform specific details.Internal  !(c) 2018 Composewell TechnologiesBSD3streamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:streamly$Specify the socket protocol details.streamly action socket runs the monadic computation action passing the socket handle to it. The handle will be closed on exit from , whether by normal termination or by raising an exception. If closing the handle raises an exception, then this exception will be raised by % rather than any exception raised by action.streamlyLike  but runs a streaming computation instead of a monadic computation.Inhibits stream fusionInternalstreamlyUnfold a three tuple (listenQLen, spec, addr) into a stream of connected protocol sockets corresponding to incoming connections.  listenQLen? is the maximum number of pending connections in the backlog. spec7 is the socket protocol and options specification and addr is the protocol address where the server listens for incoming connections.streamlyConnect to a remote host using the given socket specification and remote address. Returns a connected socket or throws an exception. Pre-releasestreamlyConnect to a remote host using the given socket specification, a local address to bind to and a remote address to connect to. Returns a connected socket or throws an exception. Pre-releasestreamlyStart a TCP stream server that listens for connections on the supplied server address specification (address family, local interface IP address and port). The server generates a stream of connected sockets. The first argument is the maximum number of pending connections in the backlog. Pre-releasestreamlyRead a byte array from a file handle up to a maximum of the requested size. If no data is available on the handle it blocks until some data becomes available. If data is available then it immediately returns that data without blocking.streamlyWrite an Array to a socket.streamlyreadChunksWith bufsize socket reads a stream of arrays from socket4. The maximum size of a single array is limited to bufsize. Pre-releasestreamlyRead a stream of byte arrays from a socket. The maximum size of a single array is limited to defaultChunkSize.3readChunks = Socket.readChunksWith defaultChunkSize Pre-releasestreamlyUnfold the tuple (bufsize, socket) into a stream of   arrays. Read requests to the socket are performed using a buffer of size bufsize. The size of an array in the resulting stream is always less than or equal to bufsize.streamlySame as streamly"Unfolds a socket into a stream of   arrays. Requests to the socket are performed using a buffer of size ef. The size of arrays in the resulting stream are therefore less than or equal to ef.streamlyGenerate a byte stream from a socket using a buffer of the given size. Pre-releasestreamly%Generate a byte stream from a socket.'read = Socket.readWith defaultChunkSize Pre-releasestreamlyUnfolds the tuple (bufsize, socket) into a byte stream, read requests to the socket are performed using buffers of bufsize.streamlySame as streamly Unfolds a   into a byte stream. IO requests to the socket are performed in sizes of ef.streamly%Write a stream of arrays to a handle.streamlyWrite a stream of arrays to a socket. Each array in the stream is written to the socket as a separate IO request.streamlywriteChunksWith bufsize socket writes a stream of arrays to socket3 after coalescing the adjacent arrays in chunks of bufsize. Multiple arrays are coalesed as long as the total size remains below the specified size. It never splits an array, if a single array is bigger than the specified size it emitted as it is.streamlySame as streamlyLike  but provides control over the write buffer. Output will be written to the IO device as soon as we collect the specified number of input elements.streamlyWrite a byte stream to a socket. Accumulates the input in chunks of specified number of bytes before writing.streamlySame as streamlyWrite a stream of   values. Keep buffering the  - values in an array. Write the array to the Handle as soon as a  is encountered or the buffer size exceeds the specified limit. Pre-releasestreamlyWrite a byte stream to a file handle. Combines the bytes in chunks of size up to  ? before writing. Note that the write behavior depends on the IOMode- and the current seek position of the handle.streamlyWrite a byte stream to a socket. Accumulates the input in chunks of up to   bytes before writing.)write = Socket.writeWith defaultChunkSize""!(c) 2019 Composewell TechnologiesBSD3streamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:streamlyUnfold a tuple (ipAddr, port)* into a stream of connected TCP sockets. ipAddr is the local IP address and port6 is the local port on which connections are accepted.streamlyLike  but binds on the IPv4 address 0.0.0.0 i.e. on all IPv4 addresses/interfaces of the machine and listens for TCP connections on the specified port.4acceptor = Unfold.first (0,0,0,0) TCP.acceptorOnAddrstreamlyLike ) but binds on the localhost IPv4 address  127.0.0.1. The server can only be accessed from the local host, it cannot be accessed from other hosts on the network.;acceptorLocal = Unfold.first (127,0,0,1) TCP.acceptorOnAddrstreamlyLike ; but with the ability to specify a list of socket options. Pre-releasestreamlyLike ) but binds on the specified IPv4 address.&acceptOnAddr = TCP.acceptOnAddrWith [] Pre-releasestreamly9Start a TCP stream server that binds on the IPV4 address 0.0.0.0 and listens for TCP connections from remote hosts on the specified server port. The server generates a stream of connected sockets.#accept = TCP.acceptOnAddr (0,0,0,0) Pre-releasestreamlyLike ) but binds on the localhost IPv4 address  127.0.0.1. The server can only be accessed from the local host, it cannot be accessed from other hosts on the network.*acceptLocal = TCP.acceptOnAddr (127,0,0,1) Pre-releasestreamlyConnect to the specified IP address and port number. Returns a connected socket or throws an exception.streamlyConnect to a remote host using IP address and port and run the supplied action on the resulting socket.  makes sure that the socket is closed on normal termination or in case of an exception. If closing the socket raises an exception, then this exception will be raised by . Pre-releasestreamly Transform an   from a   to an unfold from a remote IP address and port. The resulting unfold opens a socket, uses it using the supplied unfold and then makes sure that the socket is closed on normal termination or in case of an exception. If closing the socket raises an exception, then this exception will be raised by . Pre-releasestreamly addr port act opens a connection to the specified IPv4 host address and port and passes the resulting socket handle to the computation act*. The handle will be closed on exit from , whether by normal termination or by raising an exception. If closing the handle raises an exception, then this exception will be raised by % rather than any exception raised by act. Pre-releasestreamlyRead a stream from the supplied IPv4 host address and port number.streamlyRead a stream from the supplied IPv4 host address and port number. Pre-releasestreamlyWrite a stream of arrays to the supplied IPv4 host address and port number. Pre-releasestreamlyWrite a stream of arrays to the supplied IPv4 host address and port number.streamlyLike  but provides control over the write buffer. Output will be written to the IO device as soon as we collect the specified number of input elements. Pre-releasestreamlyLike  but provides control over the write buffer. Output will be written to the IO device as soon as we collect the specified number of input elements.streamlyWrite a stream to the supplied IPv4 host address and port number. Pre-releasestreamlyWrite a stream to the supplied IPv4 host address and port number.streamlySend an input stream to a remote host and produce the output stream from the host. The server host just acts as a transformation function on the input stream. Both sending and receiving happen asynchronously. Pre-release!(c) 2018 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:streamlyCanonical decomposition.streamlyCompatibility decomposition.streamly:Canonical decomposition followed by canonical composition.streamly>Compatibility decomposition followed by canonical composition.streamly8Select alphabetic characters in the ascii character set. Pre-release!(c) 2021 Composewell Technologies BSD-3-Clausestreamly@composewell.com experimentalGHC Safe-Inferred! $%'.15789:streamly5A space efficient, packed, unboxed Unicode container.g!(c) 2019 Composewell TechnologiesBSD3streamly@composewell.comreleasedGHC Safe-Inferred! $%'.15789:  !(c) 2018 Composewell Technologies BSD-3-Clausestreamly@composewell.comreleasedGHC Safe-Inferred! $%'.15789:;!(c) 2017 Composewell TechnologiesBSD3streamly@composewell.comreleasedGHC Safe-Inferred! $%'.15789:$+,-./3$+,-./3 hijhikhilhmnhophqrhqshtuhmvhmwhmxhmyhz{|}~|}ekeeeeeleee`                                                                 C                                     !!!!!!!!""""""""""" ##$$$$$$$$$$$$$$$$$% '''''''''''''''''''''''(((((((((((((((())))))))**.............................///////000011112344455555555555555555555555566666666:M>N<???@@@@@@BBGGGGGGGGGGGGGGGGGGGGGGIIIIIIIIIIkIIIIIIIIIIIIIIIIIIII=IIILLLLLLLLOOOOOOOOOOOO:PPPPPPPPPPPPPPPPPPPPQQCQQQQQQQQRRRRRRRRRRKRNRRRRRRRRRRRRRRRRRRRRRRRSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSTTTTTTTTTTTTTTTTTTTVTTTTTTWWW-WWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWWlWWWWWWWWWWWWWWXX>X<XXXXXXEXDXFXAXXXXXXXXXXXXXXXXXXXXXXXXXXXXYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY\\\\\\\\\\\\\\\\\\\\ccccddhhzh""-[l''                     +k+l++++-+K/ / / 0 0 5 5 5 5 5 5 56hz hz hz hz ? @@    G G G G G G e    K   - k                  l                                                                    IIhJ-IlIhJ IIIIKIII    h  h h  h  h  h h h h  h h Q h h h  hJh   h h h h h h h  h  h  hz   h   W Y Y Y Y Y Y   hz h ]]]     h     f streamly-0.10.1-inplaceStreamly.Data.Stream.MkType$Streamly.Internal.Data.Stream.Serial$Streamly.Internal.Data.Stream.MkType$Streamly.Internal.Control.ConcurrentStreamly.Data.Array.Foreign&Streamly.Internal.Data.Stream.IsStreamStreamly.Data.Fold.Tee$Streamly.Internal.Control.ForkLiftedStreamly.Internal.Data.AtomicsStreamly.Internal.Data.ChannelStreamly.Internal.Data.Cont#Streamly.Internal.Data.Fold.PreludeStreamly.Data.Fold.Prelude)Streamly.Internal.Data.IOFinalizer.Lifted$Streamly.Internal.Data.IsMap.HashMapStreamly.Internal.Data.SVar%Streamly.Internal.Data.Stream.Prelude"Streamly.Internal.Data.Stream.SVar&Streamly.Internal.Data.Stream.Parallel#Streamly.Internal.Data.Stream.Async#Streamly.Internal.Data.Stream.Ahead!Streamly.Internal.Data.Stream.Zip&Streamly.Internal.Data.Stream.ZipAsync%Streamly.Internal.Data.Unfold.Prelude(Streamly.Internal.FileSystem.Event.Linux"Streamly.Internal.FileSystem.Event Streamly.Internal.Network.Socket"Streamly.Internal.Network.Inet.TCPStreamly.Internal.Unicode.CharStreamly.Internal.Unicode.Utf8Streamly.Network.Socket$Streamly.Internal.Data.Channel.Types)Streamly.Internal.Data.Channel.Dispatcher%Streamly.Internal.Data.Channel.Worker!Streamly.Internal.Data.Fold.Async3Streamly.Internal.Data.Fold.Concurrent.Channel.Type.Streamly.Internal.Data.Fold.Concurrent.Channel&Streamly.Internal.Data.Fold.Concurrent"Streamly.Internal.Data.SVar.Worker$Streamly.Internal.Data.SVar.Dispatch Streamly.Internal.Data.SVar.Pull Streamly.Internal.Data.Fold.SVar$Streamly.Internal.Data.Stream.CommonPreludefoldr5Streamly.Internal.Data.Stream.Concurrent.Channel.Type;Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher;Streamly.Internal.Data.Stream.Concurrent.Channel.Operations9Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer;Streamly.Internal.Data.Stream.Concurrent.Channel.Interleave7Streamly.Internal.Data.Stream.Concurrent.Channel.Append0Streamly.Internal.Data.Stream.Concurrent.Channel(Streamly.Internal.Data.Stream.Concurrent$Streamly.Internal.Data.Stream.LiftedControl.ExceptionmaskStreamly.Internal.Data.StreamretryStreamly.PreludewSerial concatMapWithserial+Streamly.Internal.Data.Stream.SVar.Generate,Streamly.Internal.Data.Stream.SVar.EliminateparallelAsyncT maxBufferwAsyncasyncahead"Streamly.Internal.Data.Stream.TimeStreamly.Data.Stream.Prelude+Streamly.Internal.Data.Stream.IsStream.Type Data.Foldablefold+Streamly.Internal.Data.Stream.IsStream.LiftSerialmap0Streamly.Internal.Data.Stream.IsStream.Exception2Streamly.Internal.Data.Stream.IsStream.Enumeration2Streamly.Internal.Data.Stream.IsStream.Combinators-Streamly.Internal.Data.Stream.IsStream.Common0Streamly.Internal.Data.Stream.IsStream.Transform/Streamly.Internal.Data.Stream.IsStream.Generate Control.Monadmfix0Streamly.Internal.Data.Stream.IsStream.Eliminate-Streamly.Internal.Data.Stream.IsStream.Expand-Streamly.Internal.Data.Stream.IsStream.ReduceStreamly.Internal.Data.Parsermany*Streamly.Internal.Data.Stream.IsStream.TopStreamly.Internal.Data.Unfold outerProduct Data.ArrayArray Data.List//'Streamly.Internal.Data.Unfold.Exception"Streamly.Internal.Data.Unfold.SVar!Streamly.Internal.Data.Array.TypedefaultChunkSizeStreamly.Network.Inet.TCPbaseGHC.ExtsIsListfromListtoListGHC.ReadRead Data.StringIsStringControl.Monad.IO.ClassliftIOMonadIOData.Functor.IdentityIdentity readsPrecreadPrec readListPrecreadListGHC.Baseapexceptions-0.10.4Control.Monad.CatchthrowM MonadThrowtransformers-0.5.6.2Control.Monad.Trans.Class MonadTrans,monad-control-1.0.3.1-1TP8KzWPOEZKjMb7Hfj4ZjControl.Monad.Trans.ControlrestoreMlift mtl-2.2.2Control.Monad.Reader.Classreaderlocalask MonadReaderstreamly-core-0.2.2-inplaceStreamly.Internal.Data.ArrayasBytescastgetIndex writeLastN fromListNlengthreadreadRevwritewriteN#Streamly.Internal.Data.StreamK.TypeStreamK Streamly.Internal.Data.SVar.TypeRate$fNFData1Array $fNFDataArrayStreamly.Internal.Data.Fold.TeetoFoldTeeunTee rateBufferrateGoalrateHighrateLowRunInIOrunInIO MonadAsync MonadRunInIO askRunInIO withRunInIOwithRunInIONoRestoredoFork doForkWithfork forkManagedatomicModifyIORefCASatomicModifyIORefCAS_ writeBarrierstoreLoadBarrierMkStreamConfigStopWhen FirstStopsAllStopAnyStops SVarStatstotalDispatches maxWorkers maxOutQSize maxHeapSize maxWorkQSizeavgWorkerLatencyminWorkerLatencymaxWorkerLatency svarStopTime YieldRateInfosvarLatencyTargetsvarLatencyRangesvarRateBuffersvarGainedLostYieldssvarAllTimeLatencyworkerBootstrapLatencyworkerPollingIntervalworkerPendingLatencyworkerCollectedLatencyworkerMeasuredLatency LatencyRange minLatency maxLatency WorkerInfoworkerYieldMaxworkerYieldCountworkerLatencyStart ChildEvent ChildYieldChildStopChannel ChildStop ThreadAbortLimit UnlimitedLimitedCountmagicMaxBuffer defaultConfig maxYields getYieldLimit maxThreads getMaxThreads getMaxBufferrate getStreamRatesetStreamLatencygetStreamLatencyinspectgetInspectModeeagergetEagerDispatchstopWhen getStopWhenordered getOrdered interleavedgetInterleaved boundThreadsgetBound newRateInfo newSVarStatsavgRateminRatemaxRate constRatedecrementYieldLimitincrementYieldLimitreadOutputQBasicreadOutputQRaw ringDoorBell dumpCreator dumpOutputQ dumpDoorBelldumpNeedDoorBelldumpRunningThreadsdumpWorkerCount withDiagMVar printSVar cleanupSVarminThreadDelaycollectLatency dumpSVarStats addThread delThread modifyThreadallThreadsDonerecordMaxWorkersWork BlockWait PartialWorker ManyWorkerssendWithDoorBellestimateWorkersisBeyondMaxRateworkerRateControl sendYieldsendStophandleChildException contListMap takeInterval intervalsOfChannel outputQueuemaxBufferLimitoutputDoorBell readOutputQoutputQueueFromConsumeroutputDoorBellFromConsumerbufferSpaceDoorBellsvarRef svarStatssvarInspectMode svarCreatordumpSVar newChannelcheckFoldStatus sendToWorkerparEval"Streamly.Internal.Data.IOFinalizerrunIOFinalizer IOFinalizernewIOFinalizerclearingIOFinalizer$fIsMapHashMap toHashMapIOdecrementBufferLimitincrementBufferLimitresetBufferLimitupdateYieldCountsendsendToProducerworkerUpdateLatencysendStopToProducerhandleFoldException pushWorker pushWorkerPardispatchWorkerdispatchWorkerPacedsendWorkerWaitsendFirstWorkersendWorkerDelayPacedsendWorkerDelayreadOutputQBoundedreadOutputQPacedpostProcessPacedpostProcessBoundedcleanupSVarFromWorkerHeapDequeueResultClearingWaitingReadygetYieldRateInfo enqueueAheadreEnqueueAheadqueueEmptyAhead dequeueAhead withIORefdequeueFromHeapdequeueFromHeapSeq heapIsSanerequeueOnHeapTop updateHeapSeq newAheadVarnewParallelVar writeLimitedsvarMrun postProcessmaxWorkerLimit remainingWork yieldRateInfoenqueue eagerDispatch isWorkDone isQueueDonedoorBellOnWorkQworkLoop workerThreads workerCount accountThreadworkerStopMVaryieldstop stopChannel startChannel toChannelK toChannel fromChannelK fromChannelnewInterleaveChannelnewAppendChannel withChannelK withChannelparTwo parConcatMap parConcatparList parListLazyparListInterleavedparListOrdered parListEagerparListEagerFstparListEagerMinparApplyparMapM parSequence parZipWithM parZipWith parMergeByM parMergeByparConcatIterate parRepeatM parReplicateM fromCallback parTapCounttapCount bracket3Dbracket3bracketfinallyafterDafterretryD mkZipType mkCrossTypeWSerialWSerialT getWSerialTSerialT getSerialT toStreamK fromStreamKconsconsMrepeatmapM consMWSerial wSerialFst wSerialMinunfoldrM$fMonadBasebSerialT$fTraversableSerialT$fFoldableSerialT$fNFData1SerialT$fNFDataSerialT$fIsStringSerialT $fReadSerialT $fShowSerialT $fOrdSerialT $fEqSerialT$fIsListSerialT$fMonadStatesSerialT$fMonadReaderrSerialT$fMonadThrowSerialT$fMonadIOSerialT$fFunctorSerialT$fApplicativeSerialT$fMonadTransSerialT$fMonadSerialT$fTraversableWSerialT$fFoldableWSerialT$fNFData1WSerialT$fNFDataWSerialT$fIsStringWSerialT$fReadWSerialT$fShowWSerialT $fOrdWSerialT $fEqWSerialT$fIsListWSerialT$fMonadStatesWSerialT$fMonadReaderrWSerialT$fMonadThrowWSerialT$fMonadIOWSerialT$fFunctorWSerialT$fMonadWSerialT$fApplicativeWSerialT$fMonoidWSerialT$fSemigroupWSerialT$fMonadTransWSerialT$fSemigroupSerialT$fMonoidSerialTtoSVarfromSVar fromSVarDtoSVarParallel newFoldSVar newFoldSVarF fromConsumer pushToFold teeToSVarParallel ParallelT getParallelT parallelK parallelFstK parallelMinK mkParallelK mkParallelD tapAsyncK tapAsyncFnewCallbackStream$fMonadStatesParallelT$fMonadReaderrParallelT$fMonadThrowParallelT$fMonadIOParallelT$fFunctorParallelT$fMonadBasebParallelT$fMonadParallelT$fApplicativeParallelT$fMonoidParallelT$fSemigroupParallelT$fMonadTransParallelTWAsyncWAsyncT getWAsyncTAsync getAsyncTmkAsyncKmkAsyncDasyncK consMAsyncwAsyncK consMWAsync$fMonadStatesAsyncT$fMonadReaderrAsyncT$fMonadThrowAsyncT$fMonadIOAsyncT$fFunctorAsyncT$fMonadBasebAsyncT $fMonadAsyncT$fApplicativeAsyncT$fMonoidAsyncT$fSemigroupAsyncT$fMonadTransAsyncT$fMonadStatesWAsyncT$fMonadReaderrWAsyncT$fMonadThrowWAsyncT$fMonadIOWAsyncT$fFunctorWAsyncT$fMonadBasebWAsyncT$fMonadWAsyncT$fApplicativeWAsyncT$fMonoidWAsyncT$fSemigroupWAsyncT$fMonadTransWAsyncTAheadAheadT getAheadTaheadK$fMonadStatesAheadT$fMonadReaderrAheadT$fMonadThrowAheadT$fMonadIOAheadT$fFunctorAheadT$fMonadBasebAheadT $fMonadAheadT$fApplicativeAheadT$fMonoidAheadT$fSemigroupAheadT$fMonadTransAheadTperiodicticks ticksRate interjecttakeLastInterval dropIntervaldropLastIntervalgroupsOfTimeoutclassifySessionsByGenericclassifySessionsByclassifyKeepAliveSessionsclassifySessionsOfsampleIntervalEndsampleIntervalStart sampleBurstsampleBurstEndsampleBurstStart bufferOldestN bufferLatestN bufferLatest ZipConcurrentgetZipConcurrent ZipSerial ZipStream ZipSerialM getZipSerialM zipWithMKzipWithKconsMZip$fTraversableZipSerialM$fFoldableZipSerialM$fApplicativeZipSerialM$fFunctorZipSerialM$fNFData1ZipSerialM$fNFDataZipSerialM$fIsStringZipSerialM$fReadZipSerialM$fShowZipSerialM$fOrdZipSerialM$fEqZipSerialM$fIsListZipSerialM$fApplicativeZipConcurrent$fFunctorZipConcurrent$fSemigroupZipSerialM$fMonoidZipSerialMZipAsync ZipAsyncM getZipAsyncMzipAsyncWithMK zipAsyncWithK consMZipAsync$fApplicativeZipAsyncM$fFunctorZipAsyncM$fSemigroupZipAsyncM$fMonoidZipAsyncMIsStream|:adapt fromStreamDtoConsK toStreamDmkStreamfoldrMxfoldlMx'foldlx'foldStreamShared foldStream fromSerial fromWSerial fromAsync fromWAsync fromAhead fromParallel fromZipSerial fromZipAsync.:nilnilMbindWithconcatMapFoldableWithconcatForFoldableWithconcatFoldableWithhoist generally liftInner runReaderT usingReaderT evalStateT usingStateT runStateTbeforeafter_ onExceptionfinally_bracket_bracket'ghandlehandle Enumerable enumerateFromenumerateFromToenumerateFromThenenumerateFromThenToenumerateFromStepIntegralenumerateFromIntegralenumerateFromThenIntegralenumerateFromToIntegralenumerateFromThenToIntegralenumerateFromFractionalenumerateFromThenFractionalenumerateFromToFractionalenumerateFromThenToFractionalenumerateFromToSmallenumerateFromThenToSmallenumerateFromThenSmallBounded enumerate enumerateToenumerateFromBounded printState inspectModefromPure fromEffectyieldMrepeatM timesWith absTimesWith relTimesWith foldContinue scanlMAfter'postscanlMAfter' postscanlM'smapMtake takeWhile takeEndBydrop findIndices intersperseMinterjectSuffixreversereverse'mkAsync mkParallel parallelFst concatMapM concatMapconcatM foldManyPost splitOnSeqzipWithMzipWith transformfoldrS foldrSSharedfoldrTsequencetaptapOffsetEverytapAsyncdistributeAsync_ pollCountstracetrace_scanscanManypostscanscanxscanlM'scanl' postscanl' prescanl' prescanlM'scanl1M'scanl1'withfilterfilterMuniqByuniqprunerepeatednubBydeleteBy sampleOld sampleNew sampleRate takeWhileMtakeLast takeWhileLasttakeWhileAround dropWhile dropWhileMdropLast dropWhileLastdropWhileAroundinsertBy intersperse intersperseM_intersperseMWithintersperseMSuffixintersperseMSuffix_intersperseMSuffixWithintersperseMPrefix_delay delayPostdelayPre reassembleByindexedindexedR timestampWith timestamped timeIndexWith timeIndexed elemIndices rollingMap rollingMapM rollingMap2mapMaybe mapMaybeM catMaybesleftsrightsboth|$ applyAsync|&unfoldunfold0unfoldr replicate replicateMtimesabsTimes currentTimerelTimes durationstimeout fromIndices fromIndicesMiterateiterateM fromFoldable fromFoldableM fromListM fromHandle fromPrimIORefunconsfoldrMfoldr1foldlSfoldlTfoldxfoldl'foldl1'foldxMfoldlM'parseDparsemapM_draindrainNrunN drainWhilerunWhile runStreamnullheadheadElsetailinitlastelemnotElemallanyandorsumproductmconcatminimum minimumBymaximum maximumBythe!!lookupfindfindM findIndex elemIndex toListRevtoHandle toStreamRev|$. foldAsync|&. isPrefixOf isInfixOf isSuffixOfisSubsequenceOf stripPrefix stripSuffixeqBycmpByappend interleaveinterleaveSuffixinterleaveInfix interleaveMin roundrobin parallelMin zipAsyncWithM zipAsyncWithmergeBymergeByM mergeByMFused mergeMinBy mergeFstBymerge mergeAsyncBy mergeAsyncByM concatUnfold unfoldManyunfoldManyInterleaveunfoldManyRoundRobin interposeinterposeSuffix gintercalate intercalategintercalateSuffixintercalateSuffixconcatconcatSmapMWithconcatPairsWithiterateMapWith iterateUnfolditerateSmapMWithiterateMapLeftsWith dropPrefix dropInfix dropSuffixfoldMany refoldMany foldSequence foldIterateMrefoldIterateM parseMany parseManyD parseSequence parseManyTill parseIterategroupsBygroupsByRollinggroupssplitOn splitOnSuffix splitOnPrefixwordsBysplitWithSuffix splitOnAny splitBySeqsplitOnSuffixSeqwordsOnsplitWithSuffixSeqsplitOnSuffixSeqAnychunksOfarraysOfchunksOfTimeout splitInnerBysplitInnerBySuffixsampleFromThensortBy crossJoin joinInner joinInnerMapjoinInnerMerge joinLeftMap mergeLeftJoin joinOuterMapmergeOuterJoin intersectByintersectBySorted differenceBymergeDifferenceByunionBy mergeUnionBy fromStreamtoStreamgbracket fromProducerEventeventWd eventFlags eventCookie eventRelPatheventMap WhenExists AddIfExistsReplaceIfExists FailIfExistswatchRec createFlagssetRecursiveModesetFollowSymLinkssetUnwatchMoved setWhenExists setOneShot setOnlyDirsetRootDeleted setRootMovedsetRootPathEventssetAttrsModified setAccessed setOpenedsetWriteClosedsetNonWriteClosed setCreated setDeleted setMovedFrom setMovedTo setModified setAllEvents addToWatchremoveFromWatch watchWithwatchRecursivewatchgetRoot getRelPath getAbsPath getCookie isEventsLostisRootUnwatched isRootDeleted isRootMovedisRootUnmountedisRootPathEventisAttrsModified isAccessedisOpened isWriteClosedisNonWriteClosed isCreated isDeleted isMovedFrom isMovedToisMoved isModifiedisDir showEvent $fShowEvent $fOrdEvent $fEqEvent $fShowCookie $fEqCookie$fShowWDSockSpec sockFamilysockType sockProtosockOpts forSocketM withSocketacceptorconnect connectFromacceptgetChunkputChunkreadChunksWith readChunkschunkReaderWithreadChunksWithBufferOf chunkReaderreadWith readerWithreadWithBufferOf putChunks writeChunkswriteChunksWithwriteChunksWithBufferOf putBytesWith writeWithwriteWithBufferOfwriteMaybesWithputBytesacceptorOnAddrWithacceptorOnAddr acceptorWithacceptorOnPort acceptorLocalacceptorOnPortLocalacceptOnAddrWith acceptOnAddr acceptLocalwithConnectionMusingConnectionwithConnectionputBytesWithBufferOf pipeBytesNormalizationModeNFDNFKDNFCNFKC isAsciiAlpha normalize$fEqNormalizationMode$fShowNormalizationMode$fEnumNormalizationModeUtf8toArraypackunpack readChunk writeChunkControl.ConcurrentforkOS Streamly.Internal.Control.ForkIO rawForkIOpure GHC.MaybeNothingghc-prim GHC.TypesTrueFalserateRecoveryTimegetWorkerLatency mkIOFinalizerrunFinalizerGC Streamly.Internal.Data.Fold.TypeFoldaddOne catEitherscatLefts catRights duplicatefoldlM1'foldr'groupsOflmaplmapM morphInnerrmapM scanMaybe serialWith splitWith takeEndBy_teeWith'Streamly.Internal.Data.Fold.Combinators addStream distributedrainBy drainMapMdrivefoldMapfoldMapMindexlatestmeanone partition rollingHashrollingHashWithSaltsconcatstdDevteetopByunzipvariance%Streamly.Internal.Data.Fold.Containerclassify classifyIO countDistinctcountDistinctIntdemuxdemuxIO demuxToMap demuxToMapIO frequencynubnubInttoIntSettoMaptoMapIOtoSetStreamly.Data.FoldState streamVarSVar svarStyle svarStopStyle svarStopBypushBufferSpacepushBufferPolicypushBufferMVar outputHeap needDoorBellaheadWorkQueuedefState adaptStatesetInspectMode setMaxBuffer setMaxThreads setStreamRate setYieldLimitAheadHeapEntryAheadEntryStreamAheadEntryNullAheadEntryPurePushBufferPolicyPushBufferDropOldPushBufferBlockPushBufferDropNew SVarStopStyleStopNoneStopAnyStopBy SVarStyle WAsyncVar ParallelVarAheadVarAsyncVargetEffectiveWorkerLimitcheckMaxThreadscheckMaxBufferfromChannelRaw _fromChannelD appendWithK_appendWithChanK concatMapDivK mkEnqueueparConcatMapChanK parConcatMapK SemigroupMonad<>fmap fromStreamVar fromProducerD newWAsyncVar forkSVarAsyncjoinStreamVarAsyncsessionCurTimesessionEventTime sessionCountsessionTimerHeapsessionKeyValueMapsessionOutputStream"Streamly.Internal.Data.Stream.TypeStream concatEffect crossWith foldBreak'Streamly.Internal.Data.Stream.Eliminate'Streamly.Internal.Data.Stream.ExceptionafterIO bracketIO bracketIO3 finallyIO&Streamly.Internal.Data.Stream.Generate"Streamly.Internal.Data.Stream.Lift%Streamly.Internal.Data.Stream.Nesting'Streamly.Internal.Data.Stream.Transform)Streamly.Internal.Data.Stream.TransformerFoldableControl.Monad.Trans.ReaderReaderT Control.Monad.Trans.State.StrictStateTGHC.EnumEnummaxBoundBoundedGHC.Real FractionalIntegralenumFrom enumFromThen enumFromToenumFromThenToInttoEnumminBound_serialLatencyGHC.ListData.TraversableForeign.StorableStorable Streamly.Internal.Data.Pipe.TypePipemod/MaybeJust Data.MaybefromJustisJust Data.EitherRightLeftEither$"Streamly.Internal.Data.Unfold.TypeUnfoldodd"Streamly.Internal.Data.Parser.TypeParser GHC.Classescompare"Streamly.Internal.Data.Refold.TypeRefold.Watch openWatch createWatchensureTrailingSlash closeWatchGHC.WordWord8_watchRecursive&network-3.1.2.7-C10aP9ji1JSEkrMzxI8sEfNetwork.Socket.TypesSocketStreamly.Internal.System.IO