module Streamly.Internal.Data.Stream.IsStream.Combinators
( maxThreads
, maxBuffer
, maxYields
, rate
, avgRate
, minRate
, maxRate
, constRate
, inspectMode
, printState
)
where
#include "inline.hs"
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Int (Int64)
import Streamly.Internal.Data.Stream.IsStream.Type
(IsStream, mkStream, foldStreamShared)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.Stream.StreamK.Type (Stream)
import Streamly.Internal.Data.SVar
{-# INLINE_NORMAL maxThreads #-}
maxThreads :: IsStream t => Int -> t m a -> t m a
maxThreads :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Int -> t m a -> t m a
maxThreads Int
n t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setMaxThreads Int
n State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m
{-# INLINE_NORMAL maxBuffer #-}
maxBuffer :: IsStream t => Int -> t m a -> t m a
maxBuffer :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Int -> t m a -> t m a
maxBuffer Int
n t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setMaxBuffer Int
n State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m
{-# INLINE_NORMAL rate #-}
rate :: IsStream t => Maybe Rate -> t m a -> t m a
rate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate Maybe Rate
r t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
case Maybe Rate
r of
Just (Rate Double
low Double
goal Double
_ Int
_) | Double
goal forall a. Ord a => a -> a -> Bool
< Double
low ->
forall a. HasCallStack => [Char] -> a
error [Char]
"rate: Target rate cannot be lower than minimum rate."
Just (Rate Double
_ Double
goal Double
high Int
_) | Double
goal forall a. Ord a => a -> a -> Bool
> Double
high ->
forall a. HasCallStack => [Char] -> a
error [Char]
"rate: Target rate cannot be greater than maximum rate."
Just (Rate Double
low Double
_ Double
high Int
_) | Double
low forall a. Ord a => a -> a -> Bool
> Double
high ->
forall a. HasCallStack => [Char] -> a
error [Char]
"rate: Minimum rate cannot be greater than maximum rate."
Maybe Rate
_ -> forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Maybe Rate -> State t m a -> State t m a
setStreamRate Maybe Rate
r State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m
avgRate :: IsStream t => Double -> t m a -> t m a
avgRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Double -> t m a -> t m a
avgRate Double
r = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate (Double
rforall a. Fractional a => a -> a -> a
/Double
2) Double
r (Double
2forall a. Num a => a -> a -> a
*Double
r) forall a. Bounded a => a
maxBound)
minRate :: IsStream t => Double -> t m a -> t m a
minRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Double -> t m a -> t m a
minRate Double
r = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate Double
r Double
r (Double
2forall a. Num a => a -> a -> a
*Double
r) forall a. Bounded a => a
maxBound)
maxRate :: IsStream t => Double -> t m a -> t m a
maxRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Double -> t m a -> t m a
maxRate Double
r = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate (Double
rforall a. Fractional a => a -> a -> a
/Double
2) Double
r Double
r forall a. Bounded a => a
maxBound)
constRate :: IsStream t => Double -> t m a -> t m a
constRate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Double -> t m a -> t m a
constRate Double
r = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Rate -> t m a -> t m a
rate (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate Double
r Double
r Double
r Int
0)
{-# INLINE_NORMAL _serialLatency #-}
_serialLatency :: IsStream t => Int -> t m a -> t m a
_serialLatency :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Int -> t m a -> t m a
_serialLatency Int
n t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> State t m a -> State t m a
setStreamLatency Int
n State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m
{-# INLINE_NORMAL maxYields #-}
maxYields :: IsStream t => Maybe Int64 -> t m a -> t m a
maxYields :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Maybe Int64 -> t m a -> t m a
maxYields Maybe Int64
n t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Maybe Int64 -> State t m a -> State t m a
setYieldLimit Maybe Int64
n State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m
{-# RULES "maxYields serial" maxYields = maxYieldsSerial #-}
maxYieldsSerial :: Maybe Int64 -> SerialT m a -> SerialT m a
maxYieldsSerial :: forall (m :: * -> *) a. Maybe Int64 -> SerialT m a -> SerialT m a
maxYieldsSerial Maybe Int64
_ = forall a. a -> a
id
printState :: MonadIO m => State Stream m a -> m ()
printState :: forall (m :: * -> *) a. MonadIO m => State Stream m a -> m ()
printState State Stream m a
st = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
let msv :: Maybe (SVar Stream m a)
msv = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st
case Maybe (SVar Stream m a)
msv of
Just SVar Stream m a
sv -> forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO [Char]
dumpSVar SVar Stream m a
sv forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [Char] -> IO ()
putStrLn
Maybe (SVar Stream m a)
Nothing -> [Char] -> IO ()
putStrLn [Char]
"No SVar"
inspectMode :: IsStream t => t m a -> t m a
inspectMode :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> t m a
inspectMode t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
stp a -> m r
sng m r
yld ->
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> State t m a
setInspectMode State Stream m a
st) a -> t m a -> m r
stp a -> m r
sng m r
yld t m a
m