{-# OPTIONS_GHC -Wno-orphans #-}
module Streamly.Internal.Data.Stream.IsStream.Generate
(
K.nil
, K.nilM
, K.cons
, (K..:)
, consM
, (|:)
, unfold
, unfold0
, unfoldr
, unfoldrM
, fromPure
, fromEffect
, repeat
, repeatM
, replicate
, replicateM
, Enumerable (..)
, enumerate
, enumerateTo
, times
, absTimes
, absTimesWith
, relTimes
, relTimesWith
, durations
, ticks
, timeout
, fromIndices
, fromIndicesM
, iterate
, iterateM
, K.mfix
, P.fromList
, fromListM
, K.fromFoldable
, fromFoldableM
, fromCallback
, K.once
, yield
, yieldM
, each
, fromHandle
, currentTime
)
where
#include "inline.hs"
import Control.Monad.IO.Class (MonadIO(..))
import Data.Void (Void)
import Streamly.Internal.Data.Unfold.Type (Unfold)
import Streamly.Internal.Data.SVar (MonadAsync, Rate (..))
import Streamly.Internal.Data.Stream.IsStream.Enumeration
(Enumerable(..), enumerate, enumerateTo)
import Streamly.Internal.Data.Stream.IsStream.Common
( absTimesWith, concatM, relTimesWith, timesWith, fromPure, fromEffect
, yield, yieldM, repeatM)
import Streamly.Internal.Data.Stream.Prelude (fromStreamS)
import Streamly.Internal.Data.Stream.StreamD (fromStreamD)
import Streamly.Internal.Data.Stream.StreamK (IsStream((|:), consM))
import Streamly.Internal.Data.Stream.Serial (SerialT, WSerialT)
import Streamly.Internal.Data.Stream.Zip (ZipSerialM)
import Streamly.Internal.Data.Time.Units (AbsTime , RelTime64, addToAbsTime64)
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.Prelude as P
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.StreamD.Generate as D
import qualified Streamly.Internal.Data.Stream.StreamK as K
#ifdef USE_STREAMK_ONLY
import qualified Streamly.Internal.Data.Stream.StreamK as S
#else
import qualified Streamly.Internal.Data.Stream.StreamD.Generate as S
#endif
import qualified System.IO as IO
import Prelude hiding (iterate, replicate, repeat)
{-# INLINE unfold #-}
unfold :: (IsStream t, Monad m) => Unfold m a b -> a -> t m b
unfold :: Unfold m a b -> a -> t m b
unfold Unfold m a b
unf a
x = Stream m b -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m b -> t m b) -> Stream m b -> t m b
forall a b. (a -> b) -> a -> b
$ Unfold m a b -> a -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> a -> Stream m b
D.unfold Unfold m a b
unf a
x
{-# INLINE unfold0 #-}
unfold0 :: (IsStream t, Monad m) => Unfold m Void b -> t m b
unfold0 :: Unfold m Void b -> t m b
unfold0 Unfold m Void b
unf = Unfold m Void b -> Void -> t m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Unfold m a b -> a -> t m b
unfold Unfold m Void b
unf ([Char] -> Void
forall a. HasCallStack => [Char] -> a
error [Char]
"unfold0: unexpected void evaluation")
{-# INLINE_EARLY unfoldr #-}
unfoldr :: (Monad m, IsStream t) => (b -> Maybe (a, b)) -> b -> t m a
unfoldr :: (b -> Maybe (a, b)) -> b -> t m a
unfoldr b -> Maybe (a, b)
step b
seed = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS ((b -> Maybe (a, b)) -> b -> Stream m a
forall (m :: * -> *) s a.
Monad m =>
(s -> Maybe (a, s)) -> s -> Stream m a
S.unfoldr b -> Maybe (a, b)
step b
seed)
{-# RULES "unfoldr fallback to StreamK" [1]
forall a b. S.toStreamK (S.unfoldr a b) = K.unfoldr a b #-}
{-# INLINE_EARLY unfoldrM #-}
unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a
unfoldrM :: (b -> m (Maybe (a, b))) -> b -> t m a
unfoldrM = (b -> m (Maybe (a, b))) -> b -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadAsync m) =>
(b -> m (Maybe (a, b))) -> b -> t m a
K.unfoldrM
{-# RULES "unfoldrM serial" unfoldrM = unfoldrMSerial #-}
{-# INLINE_EARLY unfoldrMSerial #-}
unfoldrMSerial :: MonadAsync m => (b -> m (Maybe (a, b))) -> b -> SerialT m a
unfoldrMSerial :: (b -> m (Maybe (a, b))) -> b -> SerialT m a
unfoldrMSerial = (b -> m (Maybe (a, b))) -> b -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> m (Maybe (a, b))) -> b -> t m a
Serial.unfoldrM
{-# RULES "unfoldrM wSerial" unfoldrM = unfoldrMWSerial #-}
{-# INLINE_EARLY unfoldrMWSerial #-}
unfoldrMWSerial :: MonadAsync m => (b -> m (Maybe (a, b))) -> b -> WSerialT m a
unfoldrMWSerial :: (b -> m (Maybe (a, b))) -> b -> WSerialT m a
unfoldrMWSerial = (b -> m (Maybe (a, b))) -> b -> WSerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> m (Maybe (a, b))) -> b -> t m a
Serial.unfoldrM
{-# RULES "unfoldrM zipSerial" unfoldrM = unfoldrMZipSerial #-}
{-# INLINE_EARLY unfoldrMZipSerial #-}
unfoldrMZipSerial :: MonadAsync m => (b -> m (Maybe (a, b))) -> b -> ZipSerialM m a
unfoldrMZipSerial :: (b -> m (Maybe (a, b))) -> b -> ZipSerialM m a
unfoldrMZipSerial = (b -> m (Maybe (a, b))) -> b -> ZipSerialM m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> m (Maybe (a, b))) -> b -> t m a
Serial.unfoldrM
{-# INLINE_NORMAL repeat #-}
repeat :: (IsStream t, Monad m) => a -> t m a
repeat :: a -> t m a
repeat = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> (a -> Stream m a) -> a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall (m :: * -> *) a. Monad m => a -> Stream m a
S.repeat
{-# INLINE_NORMAL replicate #-}
replicate :: (IsStream t, Monad m) => Int -> a -> t m a
replicate :: Int -> a -> t m a
replicate Int
n = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> (a -> Stream m a) -> a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> a -> Stream m a
forall (m :: * -> *) a. Monad m => Int -> a -> Stream m a
S.replicate Int
n
{-# INLINE_EARLY replicateM #-}
replicateM :: (IsStream t, MonadAsync m) => Int -> m a -> t m a
replicateM :: Int -> m a -> t m a
replicateM = Int -> m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
Int -> m a -> t m a
K.replicateM
{-# RULES "replicateM serial" replicateM = replicateMSerial #-}
{-# INLINE replicateMSerial #-}
replicateMSerial :: MonadAsync m => Int -> m a -> SerialT m a
replicateMSerial :: Int -> m a -> SerialT m a
replicateMSerial Int
n = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> SerialT m a)
-> (m a -> Stream m a) -> m a -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> m a -> Stream m a
forall (m :: * -> *) a. Monad m => Int -> m a -> Stream m a
S.replicateM Int
n
{-# INLINE times #-}
times :: (IsStream t, MonadAsync m) => t m (AbsTime, RelTime64)
times :: t m (AbsTime, RelTime64)
times = Double -> t m (AbsTime, RelTime64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
Double -> t m (AbsTime, RelTime64)
timesWith Double
0.01
{-# INLINE absTimes #-}
absTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m AbsTime
absTimes :: t m AbsTime
absTimes = ((AbsTime, RelTime64) -> AbsTime)
-> t m (AbsTime, RelTime64) -> t m AbsTime
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((AbsTime -> RelTime64 -> AbsTime)
-> (AbsTime, RelTime64) -> AbsTime
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry AbsTime -> RelTime64 -> AbsTime
addToAbsTime64) t m (AbsTime, RelTime64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
t m (AbsTime, RelTime64)
times
{-# DEPRECATED currentTime "Please use absTimes instead" #-}
{-# INLINE currentTime #-}
currentTime :: (IsStream t, MonadAsync m, Functor (t m))
=> Double -> t m AbsTime
currentTime :: Double -> t m AbsTime
currentTime = Double -> t m AbsTime
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m AbsTime
absTimesWith
{-# INLINE relTimes #-}
relTimes :: (IsStream t, MonadAsync m, Functor (t m)) => t m RelTime64
relTimes :: t m RelTime64
relTimes = ((AbsTime, RelTime64) -> RelTime64)
-> t m (AbsTime, RelTime64) -> t m RelTime64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (AbsTime, RelTime64) -> RelTime64
forall a b. (a, b) -> b
snd t m (AbsTime, RelTime64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
t m (AbsTime, RelTime64)
times
{-# INLINE durations #-}
durations ::
Double -> t m RelTime64
durations :: Double -> t m RelTime64
durations = Double -> t m RelTime64
forall a. HasCallStack => a
undefined
{-# INLINE ticks #-}
ticks ::
Rate -> t m ()
ticks :: Rate -> t m ()
ticks = Rate -> t m ()
forall a. HasCallStack => a
undefined
{-# INLINE timeout #-}
timeout ::
AbsTime -> t m ()
timeout :: AbsTime -> t m ()
timeout = AbsTime -> t m ()
forall a. HasCallStack => a
undefined
{-# INLINE fromIndices #-}
fromIndices :: (IsStream t, Monad m) => (Int -> a) -> t m a
fromIndices :: (Int -> a) -> t m a
fromIndices = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a)
-> ((Int -> a) -> Stream m a) -> (Int -> a) -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> a) -> Stream m a
forall (m :: * -> *) a. Monad m => (Int -> a) -> Stream m a
S.fromIndices
{-# INLINE_EARLY fromIndicesM #-}
fromIndicesM :: (IsStream t, MonadAsync m) => (Int -> m a) -> t m a
fromIndicesM :: (Int -> m a) -> t m a
fromIndicesM = (Int -> m a) -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
(Int -> m a) -> t m a
K.fromIndicesM
{-# RULES "fromIndicesM serial" fromIndicesM = fromIndicesMSerial #-}
{-# INLINE fromIndicesMSerial #-}
fromIndicesMSerial :: MonadAsync m => (Int -> m a) -> SerialT m a
fromIndicesMSerial :: (Int -> m a) -> SerialT m a
fromIndicesMSerial = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> SerialT m a)
-> ((Int -> m a) -> Stream m a) -> (Int -> m a) -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> m a) -> Stream m a
forall (m :: * -> *) a. Monad m => (Int -> m a) -> Stream m a
S.fromIndicesM
{-# INLINE_NORMAL iterate #-}
iterate :: (IsStream t, Monad m) => (a -> a) -> a -> t m a
iterate :: (a -> a) -> a -> t m a
iterate a -> a
step = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> t m a) -> (a -> Stream m a) -> a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> a) -> a -> Stream m a
forall (m :: * -> *) a. Monad m => (a -> a) -> a -> Stream m a
S.iterate a -> a
step
{-# INLINE_EARLY iterateM #-}
iterateM :: (IsStream t, MonadAsync m) => (a -> m a) -> m a -> t m a
iterateM :: (a -> m a) -> m a -> t m a
iterateM = (a -> m a) -> m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
(a -> m a) -> m a -> t m a
K.iterateM
{-# RULES "iterateM serial" iterateM = iterateMSerial #-}
{-# INLINE iterateMSerial #-}
iterateMSerial :: MonadAsync m => (a -> m a) -> m a -> SerialT m a
iterateMSerial :: (a -> m a) -> m a -> SerialT m a
iterateMSerial a -> m a
step = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamS (Stream m a -> SerialT m a)
-> (m a -> Stream m a) -> m a -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> m a) -> m a -> Stream m a
forall (m :: * -> *) a. Monad m => (a -> m a) -> m a -> Stream m a
S.iterateM a -> m a
step
{-# INLINE fromFoldableM #-}
fromFoldableM :: (IsStream t, MonadAsync m, Foldable f) => f (m a) -> t m a
fromFoldableM :: f (m a) -> t m a
fromFoldableM = (m a -> t m a -> t m a) -> t m a -> f (m a) -> t m a
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
Prelude.foldr m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
consM t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
{-# INLINE_EARLY fromListM #-}
fromListM :: (MonadAsync m, IsStream t) => [m a] -> t m a
fromListM :: [m a] -> t m a
fromListM = [m a] -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) a.
(IsStream t, MonadAsync m, Foldable f) =>
f (m a) -> t m a
fromFoldableM
{-# RULES "fromListM fallback to StreamK" [1]
forall a. D.toStreamK (D.fromListM a) = fromFoldableM a #-}
{-# RULES "fromListM serial" fromListM = fromListMSerial #-}
{-# INLINE_EARLY fromListMSerial #-}
fromListMSerial :: MonadAsync m => [m a] -> SerialT m a
fromListMSerial :: [m a] -> SerialT m a
fromListMSerial = Stream m a -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD (Stream m a -> SerialT m a)
-> ([m a] -> Stream m a) -> [m a] -> SerialT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [m a] -> Stream m a
forall (m :: * -> *) a. MonadAsync m => [m a] -> Stream m a
D.fromListM
{-# DEPRECATED each "Please use fromFoldable instead." #-}
{-# INLINE each #-}
each :: (IsStream t, Foldable f) => f a -> t m a
each :: f a -> t m a
each = f a -> t m a
forall (t :: (* -> *) -> * -> *) (f :: * -> *) a (m :: * -> *).
(IsStream t, Foldable f) =>
f a -> t m a
K.fromFoldable
{-# DEPRECATED fromHandle
"Please use Streamly.FileSystem.Handle module (see the changelog)" #-}
fromHandle :: (IsStream t, MonadIO m) => IO.Handle -> t m String
fromHandle :: Handle -> t m [Char]
fromHandle Handle
h = t m [Char]
go
where
go :: t m [Char]
go = (forall r.
State Stream m [Char]
-> ([Char] -> t m [Char] -> m r) -> ([Char] -> m r) -> m r -> m r)
-> t m [Char]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
K.mkStream ((forall r.
State Stream m [Char]
-> ([Char] -> t m [Char] -> m r) -> ([Char] -> m r) -> m r -> m r)
-> t m [Char])
-> (forall r.
State Stream m [Char]
-> ([Char] -> t m [Char] -> m r) -> ([Char] -> m r) -> m r -> m r)
-> t m [Char]
forall a b. (a -> b) -> a -> b
$ \State Stream m [Char]
_ [Char] -> t m [Char] -> m r
yld [Char] -> m r
_ m r
stp -> do
Bool
eof <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Handle -> IO Bool
IO.hIsEOF Handle
h
if Bool
eof
then m r
stp
else do
[Char]
str <- IO [Char] -> m [Char]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Char] -> m [Char]) -> IO [Char] -> m [Char]
forall a b. (a -> b) -> a -> b
$ Handle -> IO [Char]
IO.hGetLine Handle
h
[Char] -> t m [Char] -> m r
yld [Char]
str t m [Char]
go
{-# INLINE fromCallback #-}
fromCallback :: MonadAsync m => ((a -> m ()) -> m ()) -> SerialT m a
fromCallback :: ((a -> m ()) -> m ()) -> SerialT m a
fromCallback (a -> m ()) -> m ()
setCallback = m (SerialT m a) -> SerialT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m (t m a) -> t m a
concatM (m (SerialT m a) -> SerialT m a) -> m (SerialT m a) -> SerialT m a
forall a b. (a -> b) -> a -> b
$ do
(a -> m ()
callback, SerialT m a
stream) <- m (a -> m (), SerialT m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m (a -> m (), t m a)
Par.newCallbackStream
(a -> m ()) -> m ()
setCallback a -> m ()
callback
SerialT m a -> m (SerialT m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SerialT m a
stream