{-# OPTIONS_GHC -Wno-deprecations #-}
{-# OPTIONS_GHC -Wno-orphans #-}

-- |
-- Module      : Streamly.Internal.Data.Stream.IsStream.Generate
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Most of the combinators in this module can be implemented as unfolds. Some
-- of them however can only be expressed in terms StreamK e.g. cons/consM,
-- fromFoldable, mfix. We can possibly remove those from this module which can
-- be expressed as unfolds. Unless we want to use rewrite rules to rewrite them
-- as StreamK when StreamK is used, avoiding conversion to StreamD. Will that
-- help? Are there any other reasons to keep these and not use unfolds?

module Streamly.Internal.Data.Stream.IsStream.Generate {-# DEPRECATED "Please use \"Streamly.Data.Stream.*\" instead." #-}
    (
    -- * Primitives
      IsStream.nil
    , IsStream.nilM
    , IsStream.cons
    , (IsStream..:)

    , consM
    , (|:)

    -- * From 'Unfold'
    , unfold
    , unfold0

    -- * Unfolding
    , unfoldr
    , unfoldrM

    -- * From Values
    , fromPure
    , fromEffect
    , repeat
    , repeatM
    , replicate
    , replicateM

    -- * Enumeration
    , Enumerable (..)
    , enumerate
    , enumerateTo

    -- * Time Enumeration
    , times
    , absTimes
    , absTimesWith
    , relTimes
    , relTimesWith
    , durations
    , ticks
    , timeout

    -- * From Generators
    , fromIndices
    , fromIndicesM
    -- , generate
    -- , generateM

    -- * Iteration
    , iterate
    , iterateM

    -- * Cyclic Elements
    , mfix

    -- * From Containers
    , IsStream.fromList
    , fromListM
    , fromFoldable
    , fromFoldableM
    , fromCallback
    , fromPrimIORef

    -- * Deprecated
    , yield
    , yieldM
    , fromHandle
    , currentTime
    )
where

#include "inline.hs"

import Control.Monad.IO.Class (MonadIO(..))
import Data.Void (Void)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Unfold (Unfold)
import Streamly.Internal.Data.SVar (Rate (..))
import Streamly.Internal.Data.Stream.IsStream.Enumeration
    (Enumerable(..), enumerate, enumerateTo)
import Streamly.Internal.Data.Stream.IsStream.Common
    ( absTimesWith, concatM, relTimesWith, timesWith, fromPure, fromEffect
    , yield, yieldM, repeatM)
import Streamly.Internal.Data.Stream.IsStream.Type
    (IsStream (..), fromSerial, consM, fromStreamD)
import Streamly.Internal.Data.Stream.Serial (SerialT, WSerialT)
import Streamly.Internal.Data.Time.Units (AbsTime , RelTime64, addToAbsTime64)
import Streamly.Data.MutByteArray (Unbox)

import qualified Streamly.Internal.Data.MutArray as Unboxed
    (pollIntIORef, IORef)
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream as D
    (unfold, unfoldr, toStreamK, unfoldr, repeat, replicate, replicateM
    , fromIndices, fromIndicesM, iterate, iterateM, toStreamK, fromListM
    , fromListM)
import qualified Streamly.Internal.Data.StreamK as K
    (unfoldr, unfoldrMWith, replicateMWith, fromIndicesMWith, iterateMWith
    , mfix, fromFoldable, fromFoldableM)
import qualified Streamly.Internal.Data.Stream.Serial as Stream (fromStreamK)
import qualified System.IO as IO

import Prelude hiding (iterate, replicate, repeat)

-- $setup
-- >>> :m
-- >>> :set -fno-warn-deprecations
-- >>> import Data.Function ((&))
-- >>> import Prelude hiding (iterate, replicate, repeat)
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import qualified Streamly.Internal.Data.Stream.IsStream as Stream
-- >>> import qualified Streamly.Internal.Data.Unfold as Unfold
-- >>> import Control.Concurrent (threadDelay)
-- >>> import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
--
-- >>> hSetBuffering stdout LineBuffering

------------------------------------------------------------------------------
-- From Unfold
------------------------------------------------------------------------------

-- | Convert an 'Unfold' into a stream by supplying it an input seed.
--
-- >>> Stream.drain $ Stream.unfold Unfold.replicateM (3, putStrLn "hello")
-- hello
-- hello
-- hello
--
-- /Since: 0.7.0/
{-# INLINE unfold #-}
unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b
unfold :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Unfold m a b -> a -> t m b
unfold Unfold m a b
unf a
x = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Unfold m a b -> a -> Stream m b
forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> a -> Stream m b
D.unfold Unfold m a b
unf a
x

-- | Convert an 'Unfold' with a closed input end into a stream.
--
-- /Pre-release/
{-# INLINE unfold0 #-}
unfold0 :: (IsStream t, Monad m) => Unfold m Void b -> t m b
unfold0 :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b.
(IsStream t, Monad m) =>
Unfold m Void b -> t m b
unfold0 Unfold m Void b
unf = Unfold m Void b -> Void -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Unfold m a b -> a -> t m b
unfold Unfold m Void b
unf ([Char] -> Void
forall a. HasCallStack => [Char] -> a
error [Char]
"unfold0: unexpected void evaluation")

------------------------------------------------------------------------------
-- Generation by Unfolding
------------------------------------------------------------------------------

-- |
-- >>> :{
-- unfoldr step s =
--     case step s of
--         Nothing -> Stream.nil
--         Just (a, b) -> a `Stream.cons` unfoldr step b
-- :}
--
-- Build a stream by unfolding a /pure/ step function @step@ starting from a
-- seed @s@.  The step function returns the next element in the stream and the
-- next seed value. When it is done it returns 'Nothing' and the stream ends.
-- For example,
--
-- >>> :{
-- let f b =
--         if b > 2
--         then Nothing
--         else Just (b, b + 1)
-- in Stream.toList $ Stream.unfoldr f 0
-- :}
-- [0,1,2]
--
-- @since 0.1.0
{-# INLINE_EARLY unfoldr #-}
unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a
unfoldr :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) b a.
(Monad m, IsStream t) =>
(b -> Maybe (a, b)) -> b -> t m a
unfoldr b -> Maybe (a, b)
step b
seed = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD ((b -> Maybe (a, b)) -> b -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
(s -> Maybe (a, s)) -> s -> Stream m a
D.unfoldr b -> Maybe (a, b)
step b
seed)
{-# RULES "unfoldr fallback to StreamK" [1]
    forall a b. D.toStreamK (D.unfoldr a b) = K.unfoldr a b #-}

-- | Build a stream by unfolding a /monadic/ step function starting from a
-- seed.  The step function returns the next element in the stream and the next
-- seed value. When it is done it returns 'Nothing' and the stream ends. For
-- example,
--
-- >>> :{
-- let f b =
--         if b > 2
--         then return Nothing
--         else return (Just (b, b + 1))
-- in Stream.toList $ Stream.unfoldrM f 0
-- :}
-- [0,1,2]
--
-- When run concurrently, the next unfold step can run concurrently with the
-- processing of the output of the previous step.  Note that more than one step
-- cannot run concurrently as the next step depends on the output of the
-- previous step.
--
-- >>> :{
-- let f b =
--         if b > 2
--         then return Nothing
--         else threadDelay 1000000 >> return (Just (b, b + 1))
-- in Stream.toList $ Stream.delay 1 $ Stream.fromAsync $ Stream.unfoldrM f 0
-- :}
-- [0,1,2]
--
-- /Concurrent/
--
-- /Since: 0.1.0/
{-# INLINE_EARLY unfoldrM #-}
unfoldrM :: forall t m b a. (IsStream t, MonadAsync m) =>
    (b -> m (Maybe (a, b))) -> b -> t m a
unfoldrM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadAsync m) =>
(b -> m (Maybe (a, b))) -> b -> t m a
unfoldrM b -> m (Maybe (a, b))
step = StreamK m a -> t m a
forall (m :: * -> *) a. StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a) -> (b -> StreamK m a) -> b -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (m a -> StreamK m a -> StreamK m a)
-> (b -> m (Maybe (a, b))) -> b -> StreamK m a
forall (m :: * -> *) a b.
Monad m =>
(m a -> StreamK m a -> StreamK m a)
-> (b -> m (Maybe (a, b))) -> b -> StreamK m a
K.unfoldrMWith ((m a -> t m a -> t m a) -> m a -> StreamK m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(m a -> t m a -> t m a) -> m a -> StreamK m a -> StreamK m a
IsStream.toConsK (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
consM @t)) b -> m (Maybe (a, b))
step

{-# RULES "unfoldrM serial" unfoldrM = unfoldrMSerial #-}
{-# INLINE_EARLY unfoldrMSerial #-}
unfoldrMSerial :: MonadAsync m => (b -> m (Maybe (a, b))) -> b -> SerialT m a
unfoldrMSerial :: forall (m :: * -> *) b a.
MonadAsync m =>
(b -> m (Maybe (a, b))) -> b -> SerialT m a
unfoldrMSerial = (b -> m (Maybe (a, b))) -> b -> SerialT m a
forall (m :: * -> *) b a.
Monad m =>
(b -> m (Maybe (a, b))) -> b -> SerialT m a
Serial.unfoldrM

{-# RULES "unfoldrM wSerial" unfoldrM = unfoldrMWSerial #-}
{-# INLINE_EARLY unfoldrMWSerial #-}
unfoldrMWSerial :: MonadAsync m => (b -> m (Maybe (a, b))) -> b -> WSerialT m a
unfoldrMWSerial :: forall (m :: * -> *) b a.
MonadAsync m =>
(b -> m (Maybe (a, b))) -> b -> WSerialT m a
unfoldrMWSerial b -> m (Maybe (a, b))
f = SerialT m a -> WSerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
SerialT m a -> t m a
fromSerial (SerialT m a -> WSerialT m a)
-> (b -> SerialT m a) -> b -> WSerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (b -> m (Maybe (a, b))) -> b -> SerialT m a
forall (m :: * -> *) b a.
Monad m =>
(b -> m (Maybe (a, b))) -> b -> SerialT m a
Serial.unfoldrM b -> m (Maybe (a, b))
f

{-# RULES "unfoldrM zipSerial" unfoldrM = unfoldrMZipSerial #-}
{-# INLINE_EARLY unfoldrMZipSerial #-}
unfoldrMZipSerial :: MonadAsync m =>
    (b -> m (Maybe (a, b))) -> b -> IsStream.ZipSerialM m a
unfoldrMZipSerial :: forall (m :: * -> *) b a.
MonadAsync m =>
(b -> m (Maybe (a, b))) -> b -> ZipSerialM m a
unfoldrMZipSerial b -> m (Maybe (a, b))
f = SerialT m a -> ZipSerialM m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
SerialT m a -> t m a
fromSerial (SerialT m a -> ZipSerialM m a)
-> (b -> SerialT m a) -> b -> ZipSerialM m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (b -> m (Maybe (a, b))) -> b -> SerialT m a
forall (m :: * -> *) b a.
Monad m =>
(b -> m (Maybe (a, b))) -> b -> SerialT m a
Serial.unfoldrM b -> m (Maybe (a, b))
f

------------------------------------------------------------------------------
-- From Values
------------------------------------------------------------------------------

-- |
-- Generate an infinite stream by repeating a pure value.
--
-- @since 0.4.0
{-# INLINE_NORMAL repeat #-}
repeat :: (IsStream t, Monad m) => a -> t m a
repeat :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
a -> t m a
repeat = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (a -> Stream m a) -> a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall (m :: * -> *) a. Monad m => a -> Stream m a
D.repeat

-- |
-- >>> replicate n = Stream.take n . Stream.repeat
--
-- Generate a stream of length @n@ by repeating a value @n@ times.
--
-- @since 0.6.0
{-# INLINE_NORMAL replicate #-}
replicate :: (IsStream t, Monad m) => Int -> a -> t m a
replicate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Int -> a -> t m a
replicate Int
n = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (a -> Stream m a) -> a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> a -> Stream m a
forall (m :: * -> *) a. Monad m => Int -> a -> Stream m a
D.replicate Int
n

-- |
-- >>> replicateM n = Stream.take n . Stream.repeatM
--
-- Generate a stream by performing a monadic action @n@ times. Same as:
--
-- >>> pr n = threadDelay 1000000 >> print n
--
-- This runs serially and takes 3 seconds:
--
-- >>> Stream.drain $ Stream.fromSerial $ Stream.replicateM 3 $ pr 1
-- 1
-- 1
-- 1
--
-- This runs concurrently and takes just 1 second:
--
-- >>> Stream.drain $ Stream.fromAsync  $ Stream.replicateM 3 $ pr 1
-- 1
-- 1
-- 1
--
-- /Concurrent/
--
-- @since 0.1.1
{-# INLINE_EARLY replicateM #-}
replicateM :: forall t m a. (IsStream t, MonadAsync m) => Int -> m a -> t m a
replicateM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
Int -> m a -> t m a
replicateM Int
count =
    StreamK m a -> t m a
forall (m :: * -> *) a. StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a) -> (m a -> StreamK m a) -> m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (m a -> StreamK m a -> StreamK m a) -> Int -> m a -> StreamK m a
forall (m :: * -> *) a.
(m a -> StreamK m a -> StreamK m a) -> Int -> m a -> StreamK m a
K.replicateMWith ((m a -> t m a -> t m a) -> m a -> StreamK m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(m a -> t m a -> t m a) -> m a -> StreamK m a -> StreamK m a
IsStream.toConsK (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
consM @t)) Int
count

{-# RULES "replicateM serial" replicateM = replicateMSerial #-}
{-# INLINE replicateMSerial #-}
replicateMSerial :: MonadAsync m => Int -> m a -> SerialT m a
replicateMSerial :: forall (m :: * -> *) a. MonadAsync m => Int -> m a -> SerialT m a
replicateMSerial Int
n = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> SerialT m a)
-> (m a -> Stream m a) -> m a -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> m a -> Stream m a
forall (m :: * -> *) a. Monad m => Int -> m a -> Stream m a
D.replicateM Int
n

------------------------------------------------------------------------------
-- Time Enumeration
------------------------------------------------------------------------------

-- | @times@ returns a stream of time value tuples with clock of 10 ms
-- granularity. The first component of the tuple is an absolute time reference
-- (epoch) denoting the start of the stream and the second component is a time
-- relative to the reference.
--
-- >>> Stream.mapM_ (\x -> print x >> threadDelay 1000000) $ Stream.take 3 $ Stream.times
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),RelTime64 (NanoSecond64 ...))
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Pre-release/
--
{-# INLINE times #-}
times :: (IsStream t, MonadAsync m) => t m (AbsTime, RelTime64)
times :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
t m (AbsTime, RelTime64)
times = Double -> t m (AbsTime, RelTime64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
Double -> t m (AbsTime, RelTime64)
timesWith Double
0.01

-- | @absTimes@ returns a stream of absolute timestamps using a clock of 10 ms
-- granularity.
--
-- >>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.absTimes
-- AbsTime (TimeSpec {sec = ..., nsec = ...})
-- AbsTime (TimeSpec {sec = ..., nsec = ...})
-- AbsTime (TimeSpec {sec = ..., nsec = ...})
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Pre-release/
--
{-# INLINE absTimes #-}
absTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m AbsTime
absTimes :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, Functor (t m)) =>
t m AbsTime
absTimes = ((AbsTime, RelTime64) -> AbsTime)
-> t m (AbsTime, RelTime64) -> t m AbsTime
forall a b. (a -> b) -> t m a -> t m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((AbsTime -> RelTime64 -> AbsTime)
-> (AbsTime, RelTime64) -> AbsTime
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry AbsTime -> RelTime64 -> AbsTime
addToAbsTime64) t m (AbsTime, RelTime64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
t m (AbsTime, RelTime64)
times

{-# DEPRECATED currentTime "Please use absTimes instead" #-}
{-# INLINE currentTime #-}
currentTime :: (IsStream t, MonadAsync m, Functor (t m))
    => Double -> t m AbsTime
currentTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m AbsTime
currentTime = Double -> t m AbsTime
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m AbsTime
absTimesWith

-- | @relTimes@ returns a stream of relative time values starting from 0,
-- using a clock of granularity 10 ms.
--
-- >>> Stream.mapM_ print $ Stream.delayPre 1 $ Stream.take 3 $ Stream.relTimes
-- RelTime64 (NanoSecond64 ...)
-- RelTime64 (NanoSecond64 ...)
-- RelTime64 (NanoSecond64 ...)
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Pre-release/
--
{-# INLINE relTimes #-}
relTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m RelTime64
relTimes :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, Functor (t m)) =>
t m RelTime64
relTimes = ((AbsTime, RelTime64) -> RelTime64)
-> t m (AbsTime, RelTime64) -> t m RelTime64
forall a b. (a -> b) -> t m a -> t m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (AbsTime, RelTime64) -> RelTime64
forall a b. (a, b) -> b
snd t m (AbsTime, RelTime64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
t m (AbsTime, RelTime64)
times

-- | @durations g@ returns a stream of relative time values measuring the time
-- elapsed since the immediate predecessor element of the stream was generated.
-- The first element of the stream is always 0. @durations@ uses a clock of
-- granularity @g@ specified in seconds. A low granularity clock is more
-- expensive in terms of CPU usage. The minimum granularity is 1 millisecond.
-- Durations lower than 1 ms will be 0.
--
-- Note: This API is not safe on 32-bit machines.
--
-- /Unimplemented/
--
{-# INLINE durations #-}
durations :: -- (IsStream t, MonadAsync m) =>
    Double -> t m RelTime64
durations :: forall (t :: * -> * -> *) m. Double -> t m RelTime64
durations = Double -> t m RelTime64
forall a. HasCallStack => a
undefined

-- | Generate ticks at the specified rate. The rate is adaptive, the tick
-- generation speed can be increased or decreased at different times to achieve
-- the specified rate.  The specific behavior for different styles of 'Rate'
-- specifications is documented under 'Rate'.  The effective maximum rate
-- achieved by a stream is governed by the processor speed.
--
-- /Unimplemented/
--
{-# INLINE ticks #-}
ticks :: -- (IsStream t, MonadAsync m) =>
    Rate -> t m ()
ticks :: forall (t :: * -> * -> *) m. Rate -> t m ()
ticks = Rate -> t m ()
forall a. HasCallStack => a
undefined

-- | Generate a singleton event at or after the specified absolute time. Note
-- that this is different from a threadDelay, a threadDelay starts from the
-- time when the action is evaluated, whereas if we use AbsTime based timeout
-- it will immediately expire if the action is evaluated too late.
--
-- /Unimplemented/
--
{-# INLINE timeout #-}
timeout :: -- (IsStream t, MonadAsync m) =>
    AbsTime -> t m ()
timeout :: forall (t :: * -> * -> *) m. AbsTime -> t m ()
timeout = AbsTime -> t m ()
forall a. HasCallStack => a
undefined

------------------------------------------------------------------------------
-- From Generator functions
------------------------------------------------------------------------------

-- XXX we can remove it and recommend the definition in terms of enumerate and
-- map. Check performance equivalence.
--
-- |
-- >>> fromIndices f = fmap f $ Stream.enumerateFrom 0
-- >>> fromIndices f = let g i = f i `Stream.cons` g (i + 1) in g 0
--
-- Generate an infinite stream, whose values are the output of a function @f@
-- applied on the corresponding index.  Index starts at 0.
--
-- >>> Stream.toList $ Stream.take 5 $ Stream.fromIndices id
-- [0,1,2,3,4]
--
-- @since 0.6.0
{-# INLINE fromIndices #-}
fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a
fromIndices :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(Int -> a) -> t m a
fromIndices = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a)
-> ((Int -> a) -> Stream m a) -> (Int -> a) -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> a) -> Stream m a
forall (m :: * -> *) a. Monad m => (Int -> a) -> Stream m a
D.fromIndices

--
-- |
-- >>> fromIndicesM f = Stream.mapM f $ Stream.enumerateFrom 0
-- >>> fromIndicesM f = let g i = f i `Stream.consM` g (i + 1) in g 0
--
-- Generate an infinite stream, whose values are the output of a monadic
-- function @f@ applied on the corresponding index. Index starts at 0.
--
-- /Concurrent/
--
-- @since 0.6.0
{-# INLINE_EARLY fromIndicesM #-}
fromIndicesM :: forall t m a. (IsStream t, MonadAsync m) =>
    (Int -> m a) -> t m a
fromIndicesM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
(Int -> m a) -> t m a
fromIndicesM = StreamK m a -> t m a
forall (m :: * -> *) a. StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a)
-> ((Int -> m a) -> StreamK m a) -> (Int -> m a) -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (m a -> StreamK m a -> StreamK m a) -> (Int -> m a) -> StreamK m a
forall (m :: * -> *) a.
(m a -> StreamK m a -> StreamK m a) -> (Int -> m a) -> StreamK m a
K.fromIndicesMWith ((m a -> t m a -> t m a) -> m a -> StreamK m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(m a -> t m a -> t m a) -> m a -> StreamK m a -> StreamK m a
IsStream.toConsK (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
consM @t))

{-# RULES "fromIndicesM serial" fromIndicesM = fromIndicesMSerial #-}
{-# INLINE fromIndicesMSerial #-}
fromIndicesMSerial :: MonadAsync m => (Int -> m a) -> SerialT m a
fromIndicesMSerial :: forall (m :: * -> *) a. MonadAsync m => (Int -> m a) -> SerialT m a
fromIndicesMSerial = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> SerialT m a)
-> ((Int -> m a) -> Stream m a) -> (Int -> m a) -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> m a) -> Stream m a
forall (m :: * -> *) a. Monad m => (Int -> m a) -> Stream m a
D.fromIndicesM

------------------------------------------------------------------------------
-- Iterating functions
------------------------------------------------------------------------------

-- |
-- >>> iterate f x = x `Stream.cons` iterate f x
--
-- Generate an infinite stream with @x@ as the first element and each
-- successive element derived by applying the function @f@ on the previous
-- element.
--
-- >>> Stream.toList $ Stream.take 5 $ Stream.iterate (+1) 1
-- [1,2,3,4,5]
--
-- @since 0.1.2
{-# INLINE_NORMAL iterate #-}
iterate :: (IsStream t, Monad m) => (a -> a) -> a -> t m a
iterate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a) -> a -> t m a
iterate a -> a
step = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a) -> (a -> Stream m a) -> a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> a) -> a -> Stream m a
forall (m :: * -> *) a. Monad m => (a -> a) -> a -> Stream m a
D.iterate a -> a
step

-- |
-- >>> iterateM f m = m >>= \a -> return a `Stream.consM` iterateM f (f a)
--
-- Generate an infinite stream with the first element generated by the action
-- @m@ and each successive element derived by applying the monadic function
-- @f@ on the previous element.
--
-- >>> pr n = threadDelay 1000000 >> print n
-- >>> :{
-- Stream.iterateM (\x -> pr x >> return (x + 1)) (return 0)
--     & Stream.take 3
--     & Stream.fromSerial
--     & Stream.toList
-- :}
-- 0
-- 1
-- [0,1,2]
--
-- When run concurrently, the next iteration can run concurrently with the
-- processing of the previous iteration. Note that more than one iteration
-- cannot run concurrently as the next iteration depends on the output of the
-- previous iteration.
--
-- >>> :{
-- Stream.iterateM (\x -> pr x >> return (x + 1)) (return 0)
--     & Stream.delay 1
--     & Stream.take 3
--     & Stream.fromAsync
--     & Stream.toList
-- :}
-- 0
-- 1
-- ...
--
-- /Concurrent/
--
-- /Since: 0.1.2/
--
-- /Since: 0.7.0 (signature change)/
{-# INLINE_EARLY iterateM #-}
iterateM :: forall t m a. (IsStream t, MonadAsync m) =>
    (a -> m a) -> m a -> t m a
iterateM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
(a -> m a) -> m a -> t m a
iterateM a -> m a
f = StreamK m a -> t m a
forall (m :: * -> *) a. StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a) -> (m a -> StreamK m a) -> m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (m a -> StreamK m a -> StreamK m a)
-> (a -> m a) -> m a -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
(m a -> StreamK m a -> StreamK m a)
-> (a -> m a) -> m a -> StreamK m a
K.iterateMWith ((m a -> t m a -> t m a) -> m a -> StreamK m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(m a -> t m a -> t m a) -> m a -> StreamK m a -> StreamK m a
IsStream.toConsK (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
consM @t))  a -> m a
f

{-# RULES "iterateM serial" iterateM = iterateMSerial #-}
{-# INLINE iterateMSerial #-}
iterateMSerial :: MonadAsync m => (a -> m a) -> m a -> SerialT m a
iterateMSerial :: forall (m :: * -> *) a.
MonadAsync m =>
(a -> m a) -> m a -> SerialT m a
iterateMSerial a -> m a
step = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> SerialT m a)
-> (m a -> Stream m a) -> m a -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> m a) -> m a -> Stream m a
forall (m :: * -> *) a. Monad m => (a -> m a) -> m a -> Stream m a
D.iterateM a -> m a
step

-- | We can define cyclic structures using @let@:
--
-- >>> let (a, b) = ([1, b], head a) in (a, b)
-- ([1,1],1)
--
-- The function @fix@ defined as:
--
-- >>> fix f = let x = f x in x
--
-- ensures that the argument of a function and its output refer to the same
-- lazy value @x@ i.e.  the same location in memory.  Thus @x@ can be defined
-- in terms of itself, creating structures with cyclic references.
--
-- >>> f ~(a, b) = ([1, b], head a)
-- >>> fix f
-- ([1,1],1)
--
-- 'Control.Monad.mfix' is essentially the same as @fix@ but for monadic
-- values.
--
-- Using 'mfix' for streams we can construct a stream in which each element of
-- the stream is defined in a cyclic fashion. The argument of the function
-- being fixed represents the current element of the stream which is being
-- returned by the stream monad. Thus, we can use the argument to construct
-- itself.
--
-- /Pre-release/
{-# INLINE mfix #-}
mfix :: (IsStream t, Monad m) => (m a -> t m a) -> t m a
mfix :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(m a -> t m a) -> t m a
mfix m a -> t m a
f = StreamK m a -> t m a
forall (m :: * -> *) a. StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a) -> StreamK m a -> t m a
forall a b. (a -> b) -> a -> b
$ (m a -> StreamK m a) -> StreamK m a
forall (m :: * -> *) a.
Monad m =>
(m a -> StreamK m a) -> StreamK m a
K.mfix (t m a -> StreamK m a
forall (m :: * -> *) a. t m a -> StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream (t m a -> StreamK m a) -> (m a -> t m a) -> m a -> StreamK m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> t m a
f)

------------------------------------------------------------------------------
-- Conversions
------------------------------------------------------------------------------

-- |
-- >>> fromFoldable = Prelude.foldr Stream.cons Stream.nil
--
-- Construct a stream from a 'Foldable' containing pure values:
--
-- @since 0.2.0
{-# INLINE fromFoldable #-}
fromFoldable :: (IsStream t, Foldable f) => f a -> t m a
fromFoldable :: forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
fromFoldable = StreamK m a -> t m a
forall (m :: * -> *) a. StreamK m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream (StreamK m a -> t m a) -> (f a -> StreamK m a) -> f a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. f a -> StreamK m a
forall (f :: * -> *) a (m :: * -> *).
Foldable f =>
f a -> StreamK m a
K.fromFoldable

-- |
-- >>> fromFoldableM = Prelude.foldr Stream.consM Stream.nil
--
-- Construct a stream from a 'Foldable' containing monadic actions.
--
-- >>> pr n = threadDelay 1000000 >> print n
-- >>> Stream.drain $ Stream.fromSerial $ Stream.fromFoldableM $ map pr [1,2,3]
-- 1
-- 2
-- 3
--
-- >>> Stream.drain $ Stream.fromAsync $ Stream.fromFoldableM $ map pr [1,2,3]
-- ...
-- ...
-- ...
--
-- /Concurrent (do not use with 'fromParallel' on infinite containers)/
--
-- @since 0.3.0
{-# INLINE fromFoldableM #-}
fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a
fromFoldableM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) a.
(IsStream t, MonadAsync m, Foldable f) =>
f (m a) -> t m a
fromFoldableM = (m a -> t m a -> t m a) -> t m a -> f (m a) -> t m a
forall a b. (a -> b -> b) -> b -> f a -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
Prelude.foldr m a -> t m a -> t m a
forall (m :: * -> *) a. MonadAsync m => m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
consM t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
IsStream.nil

-- |
-- >>> fromListM = Stream.fromFoldableM
-- >>> fromListM = Stream.sequence . Stream.fromList
-- >>> fromListM = Stream.mapM id . Stream.fromList
-- >>> fromListM = Prelude.foldr Stream.consM Stream.nil
--
-- Construct a stream from a list of monadic actions. This is more efficient
-- than 'fromFoldableM' for serial streams.
--
-- @since 0.4.0
{-# INLINE_EARLY fromListM #-}
fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a
fromListM :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadAsync m, IsStream t) =>
[m a] -> t m a
fromListM = [m a] -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) a.
(IsStream t, MonadAsync m, Foldable f) =>
f (m a) -> t m a
fromFoldableM
{-# RULES "fromListM fallback to StreamK" [1]
    forall a. D.toStreamK (D.fromListM a) = K.fromFoldableM a #-}

{-# RULES "fromListM serial" fromListM = fromListMSerial #-}
{-# INLINE_EARLY fromListMSerial #-}
fromListMSerial :: MonadAsync m => [m a] -> SerialT m a
fromListMSerial :: forall (m :: * -> *) a. MonadAsync m => [m a] -> SerialT m a
fromListMSerial = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> SerialT m a)
-> ([m a] -> Stream m a) -> [m a] -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [m a] -> Stream m a
forall (m :: * -> *) a. Monad m => [m a] -> Stream m a
D.fromListM

-- | Read lines from an IO Handle into a stream of Strings.
--
-- @since 0.1.0
{-# DEPRECATED fromHandle
   "Please use Streamly.FileSystem.Handle module (see the changelog)" #-}
fromHandle :: (IsStream t, MonadIO m) => IO.Handle -> t m String
fromHandle :: forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Handle -> t m [Char]
fromHandle Handle
h = t m [Char]
go
  where
  go :: t m [Char]
go = (forall r.
 State StreamK m [Char]
 -> ([Char] -> t m [Char] -> m r) -> ([Char] -> m r) -> m r -> m r)
-> t m [Char]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
 State StreamK m a
 -> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
IsStream.mkStream ((forall r.
  State StreamK m [Char]
  -> ([Char] -> t m [Char] -> m r) -> ([Char] -> m r) -> m r -> m r)
 -> t m [Char])
-> (forall r.
    State StreamK m [Char]
    -> ([Char] -> t m [Char] -> m r) -> ([Char] -> m r) -> m r -> m r)
-> t m [Char]
forall a b. (a -> b) -> a -> b
$ \State StreamK m [Char]
_ [Char] -> t m [Char] -> m r
yld [Char] -> m r
_ m r
stp -> do
        Bool
eof <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Handle -> IO Bool
IO.hIsEOF Handle
h
        if Bool
eof
        then m r
stp
        else do
            [Char]
str <- IO [Char] -> m [Char]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Char] -> m [Char]) -> IO [Char] -> m [Char]
forall a b. (a -> b) -> a -> b
$ Handle -> IO [Char]
IO.hGetLine Handle
h
            [Char] -> t m [Char] -> m r
yld [Char]
str t m [Char]
go

-- XXX This should perhaps be moved to Parallel
--
-- | Takes a callback setter function and provides it with a callback.  The
-- callback when invoked adds a value at the tail of the stream. Returns a
-- stream of values generated by the callback.
--
-- /Pre-release/
--
{-# INLINE fromCallback #-}
fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a
fromCallback :: forall (m :: * -> *) a.
MonadAsync m =>
((a -> m ()) -> m ()) -> SerialT m a
fromCallback (a -> m ()) -> m ()
setCallback = m (SerialT m a) -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM (m (SerialT m a) -> SerialT m a) -> m (SerialT m a) -> SerialT m a
forall a b. (a -> b) -> a -> b
$ do
    (a -> m ()
callback, StreamK m a
stream) <- m (a -> m (), StreamK m a)
forall (m :: * -> *) a. MonadAsync m => m (a -> m (), StreamK m a)
Par.newCallbackStream
    (a -> m ()) -> m ()
setCallback a -> m ()
callback
    SerialT m a -> m (SerialT m a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (SerialT m a -> m (SerialT m a)) -> SerialT m a -> m (SerialT m a)
forall a b. (a -> b) -> a -> b
$ StreamK m a -> SerialT m a
forall (m :: * -> *) a. Stream m a -> SerialT m a
Stream.fromStreamK StreamK m a
stream

-- | Construct a stream by reading an 'Unboxed' 'IORef' repeatedly.
--
-- /Pre-release/
--
{-# INLINE fromPrimIORef #-}
fromPrimIORef :: (IsStream t, MonadIO m, Unbox a) => Unboxed.IORef a -> t m a
fromPrimIORef :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Unbox a) =>
IORef a -> t m a
fromPrimIORef = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> t m a)
-> (IORef a -> Stream m a) -> IORef a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IORef a -> Stream m a
forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
IORef a -> Stream m a
Unboxed.pollIntIORef