module Streamly.Internal.Data.Channel.Types
(
Count (..)
, Limit (..)
, ThreadAbort (..)
, ChildEvent (..)
, SVarStats (..)
, newSVarStats
, WorkerInfo (..)
, LatencyRange (..)
, YieldRateInfo (..)
, newRateInfo
, readOutputQRaw
, readOutputQBasic
, ringDoorBell
, decrementYieldLimit
, incrementYieldLimit
, Rate (..)
, StopWhen (..)
, Config
, magicMaxBuffer
, defaultConfig
, maxThreads
, maxBuffer
, maxYields
, inspect
, eager
, stopWhen
, ordered
, interleaved
, boundThreads
, rate
, avgRate
, minRate
, maxRate
, constRate
, getMaxThreads
, getMaxBuffer
, getStreamRate
, getStreamLatency
, setStreamLatency
, getYieldLimit
, getInspectMode
, getEagerDispatch
, getStopWhen
, getOrdered
, getInterleaved
, getBound
, cleanupSVar
, dumpCreator
, dumpOutputQ
, dumpDoorBell
, dumpNeedDoorBell
, dumpRunningThreads
, dumpWorkerCount
, withDiagMVar
, printSVar
)
where
import Control.Concurrent (ThreadId, throwTo, MVar, tryReadMVar)
import Control.Concurrent.MVar (tryPutMVar)
import Control.Exception
( SomeException(..), Exception, catches, throwIO, Handler(..)
, BlockedIndefinitelyOnMVar(..), BlockedIndefinitelyOnSTM(..))
import Control.Monad (void, when)
import Data.Int (Int64)
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Set (Set)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_, storeLoadBarrier)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units (AbsTime, NanoSecond64(..))
import System.IO (hPutStrLn, stderr)
import qualified Data.Set as Set
newtype Count = Count Int64
deriving ( Count -> Count -> Bool
(Count -> Count -> Bool) -> (Count -> Count -> Bool) -> Eq Count
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Count -> Count -> Bool
$c/= :: Count -> Count -> Bool
== :: Count -> Count -> Bool
$c== :: Count -> Count -> Bool
Eq
, ReadPrec [Count]
ReadPrec Count
Int -> ReadS Count
ReadS [Count]
(Int -> ReadS Count)
-> ReadS [Count]
-> ReadPrec Count
-> ReadPrec [Count]
-> Read Count
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
readListPrec :: ReadPrec [Count]
$creadListPrec :: ReadPrec [Count]
readPrec :: ReadPrec Count
$creadPrec :: ReadPrec Count
readList :: ReadS [Count]
$creadList :: ReadS [Count]
readsPrec :: Int -> ReadS Count
$creadsPrec :: Int -> ReadS Count
Read
, Int -> Count -> ShowS
[Count] -> ShowS
Count -> String
(Int -> Count -> ShowS)
-> (Count -> String) -> ([Count] -> ShowS) -> Show Count
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Count] -> ShowS
$cshowList :: [Count] -> ShowS
show :: Count -> String
$cshow :: Count -> String
showsPrec :: Int -> Count -> ShowS
$cshowsPrec :: Int -> Count -> ShowS
Show
, Int -> Count
Count -> Int
Count -> [Count]
Count -> Count
Count -> Count -> [Count]
Count -> Count -> Count -> [Count]
(Count -> Count)
-> (Count -> Count)
-> (Int -> Count)
-> (Count -> Int)
-> (Count -> [Count])
-> (Count -> Count -> [Count])
-> (Count -> Count -> [Count])
-> (Count -> Count -> Count -> [Count])
-> Enum Count
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: Count -> Count -> Count -> [Count]
$cenumFromThenTo :: Count -> Count -> Count -> [Count]
enumFromTo :: Count -> Count -> [Count]
$cenumFromTo :: Count -> Count -> [Count]
enumFromThen :: Count -> Count -> [Count]
$cenumFromThen :: Count -> Count -> [Count]
enumFrom :: Count -> [Count]
$cenumFrom :: Count -> [Count]
fromEnum :: Count -> Int
$cfromEnum :: Count -> Int
toEnum :: Int -> Count
$ctoEnum :: Int -> Count
pred :: Count -> Count
$cpred :: Count -> Count
succ :: Count -> Count
$csucc :: Count -> Count
Enum
, Count
Count -> Count -> Bounded Count
forall a. a -> a -> Bounded a
maxBound :: Count
$cmaxBound :: Count
minBound :: Count
$cminBound :: Count
Bounded
, Integer -> Count
Count -> Count
Count -> Count -> Count
(Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count)
-> (Count -> Count)
-> (Count -> Count)
-> (Integer -> Count)
-> Num Count
forall a.
(a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a)
-> (a -> a)
-> (a -> a)
-> (Integer -> a)
-> Num a
fromInteger :: Integer -> Count
$cfromInteger :: Integer -> Count
signum :: Count -> Count
$csignum :: Count -> Count
abs :: Count -> Count
$cabs :: Count -> Count
negate :: Count -> Count
$cnegate :: Count -> Count
* :: Count -> Count -> Count
$c* :: Count -> Count -> Count
- :: Count -> Count -> Count
$c- :: Count -> Count -> Count
+ :: Count -> Count -> Count
$c+ :: Count -> Count -> Count
Num
, Num Count
Ord Count
Num Count -> Ord Count -> (Count -> Rational) -> Real Count
Count -> Rational
forall a. Num a -> Ord a -> (a -> Rational) -> Real a
toRational :: Count -> Rational
$ctoRational :: Count -> Rational
Real
, Enum Count
Real Count
Real Count
-> Enum Count
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> (Count -> Count -> (Count, Count))
-> (Count -> Count -> (Count, Count))
-> (Count -> Integer)
-> Integral Count
Count -> Integer
Count -> Count -> (Count, Count)
Count -> Count -> Count
forall a.
Real a
-> Enum a
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> a)
-> (a -> a -> (a, a))
-> (a -> a -> (a, a))
-> (a -> Integer)
-> Integral a
toInteger :: Count -> Integer
$ctoInteger :: Count -> Integer
divMod :: Count -> Count -> (Count, Count)
$cdivMod :: Count -> Count -> (Count, Count)
quotRem :: Count -> Count -> (Count, Count)
$cquotRem :: Count -> Count -> (Count, Count)
mod :: Count -> Count -> Count
$cmod :: Count -> Count -> Count
div :: Count -> Count -> Count
$cdiv :: Count -> Count -> Count
rem :: Count -> Count -> Count
$crem :: Count -> Count -> Count
quot :: Count -> Count -> Count
$cquot :: Count -> Count -> Count
Integral
, Eq Count
Eq Count
-> (Count -> Count -> Ordering)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Bool)
-> (Count -> Count -> Count)
-> (Count -> Count -> Count)
-> Ord Count
Count -> Count -> Bool
Count -> Count -> Ordering
Count -> Count -> Count
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: Count -> Count -> Count
$cmin :: Count -> Count -> Count
max :: Count -> Count -> Count
$cmax :: Count -> Count -> Count
>= :: Count -> Count -> Bool
$c>= :: Count -> Count -> Bool
> :: Count -> Count -> Bool
$c> :: Count -> Count -> Bool
<= :: Count -> Count -> Bool
$c<= :: Count -> Count -> Bool
< :: Count -> Count -> Bool
$c< :: Count -> Count -> Bool
compare :: Count -> Count -> Ordering
$ccompare :: Count -> Count -> Ordering
Ord
)
data Limit = Unlimited | Limited Word deriving Int -> Limit -> ShowS
[Limit] -> ShowS
Limit -> String
(Int -> Limit -> ShowS)
-> (Limit -> String) -> ([Limit] -> ShowS) -> Show Limit
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Limit] -> ShowS
$cshowList :: [Limit] -> ShowS
show :: Limit -> String
$cshow :: Limit -> String
showsPrec :: Int -> Limit -> ShowS
$cshowsPrec :: Int -> Limit -> ShowS
Show
instance Eq Limit where
Limit
Unlimited == :: Limit -> Limit -> Bool
== Limit
Unlimited = Bool
True
Limit
Unlimited == Limited Word
_ = Bool
False
Limited Word
_ == Limit
Unlimited = Bool
False
Limited Word
x == Limited Word
y = Word
x Word -> Word -> Bool
forall a. Eq a => a -> a -> Bool
== Word
y
instance Ord Limit where
Limit
Unlimited <= :: Limit -> Limit -> Bool
<= Limit
Unlimited = Bool
True
Limit
Unlimited <= Limited Word
_ = Bool
False
Limited Word
_ <= Limit
Unlimited = Bool
True
Limited Word
x <= Limited Word
y = Word
x Word -> Word -> Bool
forall a. Ord a => a -> a -> Bool
<= Word
y
data ThreadAbort = ThreadAbort deriving Int -> ThreadAbort -> ShowS
[ThreadAbort] -> ShowS
ThreadAbort -> String
(Int -> ThreadAbort -> ShowS)
-> (ThreadAbort -> String)
-> ([ThreadAbort] -> ShowS)
-> Show ThreadAbort
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ThreadAbort] -> ShowS
$cshowList :: [ThreadAbort] -> ShowS
show :: ThreadAbort -> String
$cshow :: ThreadAbort -> String
showsPrec :: Int -> ThreadAbort -> ShowS
$cshowsPrec :: Int -> ThreadAbort -> ShowS
Show
instance Exception ThreadAbort
data ChildEvent a =
ChildYield a
| ChildStopChannel
| ChildStop ThreadId (Maybe SomeException)
data WorkerInfo = WorkerInfo
{
WorkerInfo -> Count
workerYieldMax :: Count
, WorkerInfo -> IORef Count
workerYieldCount :: IORef Count
, WorkerInfo -> IORef (Count, AbsTime)
workerLatencyStart :: IORef (Count, AbsTime)
}
data LatencyRange = LatencyRange
{ LatencyRange -> NanoSecond64
minLatency :: NanoSecond64
, LatencyRange -> NanoSecond64
maxLatency :: NanoSecond64
} deriving Int -> LatencyRange -> ShowS
[LatencyRange] -> ShowS
LatencyRange -> String
(Int -> LatencyRange -> ShowS)
-> (LatencyRange -> String)
-> ([LatencyRange] -> ShowS)
-> Show LatencyRange
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [LatencyRange] -> ShowS
$cshowList :: [LatencyRange] -> ShowS
show :: LatencyRange -> String
$cshow :: LatencyRange -> String
showsPrec :: Int -> LatencyRange -> ShowS
$cshowsPrec :: Int -> LatencyRange -> ShowS
Show
data YieldRateInfo = YieldRateInfo
{ YieldRateInfo -> NanoSecond64
svarLatencyTarget :: NanoSecond64
, YieldRateInfo -> LatencyRange
svarLatencyRange :: LatencyRange
, YieldRateInfo -> Int
svarRateBuffer :: Int
, YieldRateInfo -> IORef Count
svarGainedLostYields :: IORef Count
, YieldRateInfo -> IORef (Count, AbsTime)
svarAllTimeLatency :: IORef (Count, AbsTime)
, YieldRateInfo -> Maybe NanoSecond64
workerBootstrapLatency :: Maybe NanoSecond64
, YieldRateInfo -> IORef Count
workerPollingInterval :: IORef Count
, YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerPendingLatency :: IORef (Count, Count, NanoSecond64)
, YieldRateInfo -> IORef (Count, Count, NanoSecond64)
workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
, YieldRateInfo -> IORef NanoSecond64
workerMeasuredLatency :: IORef NanoSecond64
}
data SVarStats = SVarStats {
SVarStats -> IORef Int
totalDispatches :: IORef Int
, SVarStats -> IORef Int
maxWorkers :: IORef Int
, SVarStats -> IORef Int
maxOutQSize :: IORef Int
, SVarStats -> IORef Int
maxHeapSize :: IORef Int
, SVarStats -> IORef Int
maxWorkQSize :: IORef Int
, SVarStats -> IORef (Count, NanoSecond64)
avgWorkerLatency :: IORef (Count, NanoSecond64)
, SVarStats -> IORef NanoSecond64
minWorkerLatency :: IORef NanoSecond64
, SVarStats -> IORef NanoSecond64
maxWorkerLatency :: IORef NanoSecond64
, SVarStats -> IORef (Maybe AbsTime)
svarStopTime :: IORef (Maybe AbsTime)
}
data Rate = Rate
{ Rate -> Double
rateLow :: Double
, Rate -> Double
rateGoal :: Double
, Rate -> Double
rateHigh :: Double
, Rate -> Int
rateBuffer :: Int
}
data StopWhen =
FirstStops
| AllStop
| AnyStops
data Config = Config
{
Config -> Maybe Count
_yieldLimit :: Maybe Count
, Config -> Limit
_threadsHigh :: Limit
, Config -> Limit
_bufferHigh :: Limit
, Config -> Maybe NanoSecond64
_streamLatency :: Maybe NanoSecond64
, Config -> Maybe Rate
_maxStreamRate :: Maybe Rate
, Config -> Bool
_inspect :: Bool
, Config -> Bool
_eagerDispatch :: Bool
, Config -> StopWhen
_stopWhen :: StopWhen
, Config -> Bool
_ordered :: Bool
, Config -> Bool
_interleaved :: Bool
, Config -> Bool
_bound :: Bool
}
magicMaxBuffer :: Word
magicMaxBuffer :: Word
magicMaxBuffer = Word
1500
defaultMaxThreads, defaultMaxBuffer :: Limit
defaultMaxThreads :: Limit
defaultMaxThreads = Word -> Limit
Limited Word
magicMaxBuffer
defaultMaxBuffer :: Limit
defaultMaxBuffer = Word -> Limit
Limited Word
magicMaxBuffer
defaultConfig :: Config
defaultConfig :: Config
defaultConfig = Config :: Maybe Count
-> Limit
-> Limit
-> Maybe NanoSecond64
-> Maybe Rate
-> Bool
-> Bool
-> StopWhen
-> Bool
-> Bool
-> Bool
-> Config
Config
{
_yieldLimit :: Maybe Count
_yieldLimit = Maybe Count
forall a. Maybe a
Nothing
, _threadsHigh :: Limit
_threadsHigh = Limit
defaultMaxThreads
, _bufferHigh :: Limit
_bufferHigh = Limit
defaultMaxBuffer
, _maxStreamRate :: Maybe Rate
_maxStreamRate = Maybe Rate
forall a. Maybe a
Nothing
, _streamLatency :: Maybe NanoSecond64
_streamLatency = Maybe NanoSecond64
forall a. Maybe a
Nothing
, _inspect :: Bool
_inspect = Bool
False
, _eagerDispatch :: Bool
_eagerDispatch = Bool
False
, _stopWhen :: StopWhen
_stopWhen = StopWhen
AllStop
, _ordered :: Bool
_ordered = Bool
False
, _interleaved :: Bool
_interleaved = Bool
False
, _bound :: Bool
_bound = Bool
False
}
maxYields :: Maybe Int64 -> Config -> Config
maxYields :: Maybe Int64 -> Config -> Config
maxYields Maybe Int64
lim Config
st =
Config
st { _yieldLimit :: Maybe Count
_yieldLimit =
case Maybe Int64
lim of
Maybe Int64
Nothing -> Maybe Count
forall a. Maybe a
Nothing
Just Int64
n ->
if Int64
n Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
<= Int64
0
then Count -> Maybe Count
forall a. a -> Maybe a
Just Count
0
else Count -> Maybe Count
forall a. a -> Maybe a
Just (Int64 -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int64
n)
}
getYieldLimit :: Config -> Maybe Count
getYieldLimit :: Config -> Maybe Count
getYieldLimit = Config -> Maybe Count
_yieldLimit
maxThreads :: Int -> Config -> Config
maxThreads :: Int -> Config -> Config
maxThreads Int
n Config
st =
Config
st { _threadsHigh :: Limit
_threadsHigh =
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0
then Limit
Unlimited
else if Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then Limit
defaultMaxThreads
else Word -> Limit
Limited (Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
}
getMaxThreads :: Config -> Limit
getMaxThreads :: Config -> Limit
getMaxThreads = Config -> Limit
_threadsHigh
maxBuffer :: Int -> Config -> Config
maxBuffer :: Int -> Config -> Config
maxBuffer Int
n Config
st =
Config
st { _bufferHigh :: Limit
_bufferHigh =
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0
then Limit
Unlimited
else if Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then Limit
defaultMaxBuffer
else Word -> Limit
Limited (Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
}
getMaxBuffer :: Config -> Limit
getMaxBuffer :: Config -> Limit
getMaxBuffer = Config -> Limit
_bufferHigh
rate :: Maybe Rate -> Config -> Config
rate :: Maybe Rate -> Config -> Config
rate Maybe Rate
r Config
st = Config
st { _maxStreamRate :: Maybe Rate
_maxStreamRate = Maybe Rate
r }
getStreamRate :: Config -> Maybe Rate
getStreamRate :: Config -> Maybe Rate
getStreamRate = Config -> Maybe Rate
_maxStreamRate
setStreamLatency :: Int -> Config -> Config
setStreamLatency :: Int -> Config -> Config
setStreamLatency Int
n Config
st =
Config
st { _streamLatency :: Maybe NanoSecond64
_streamLatency =
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
then Maybe NanoSecond64
forall a. Maybe a
Nothing
else NanoSecond64 -> Maybe NanoSecond64
forall a. a -> Maybe a
Just (Int -> NanoSecond64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
}
getStreamLatency :: Config -> Maybe NanoSecond64
getStreamLatency :: Config -> Maybe NanoSecond64
getStreamLatency = Config -> Maybe NanoSecond64
_streamLatency
inspect :: Bool -> Config -> Config
inspect :: Bool -> Config -> Config
inspect Bool
flag Config
st = Config
st { _inspect :: Bool
_inspect = Bool
flag }
getInspectMode :: Config -> Bool
getInspectMode :: Config -> Bool
getInspectMode = Config -> Bool
_inspect
eager :: Bool -> Config -> Config
eager :: Bool -> Config -> Config
eager Bool
flag Config
st = Config
st { _eagerDispatch :: Bool
_eagerDispatch = Bool
flag }
getEagerDispatch :: Config -> Bool
getEagerDispatch :: Config -> Bool
getEagerDispatch = Config -> Bool
_eagerDispatch
stopWhen :: StopWhen -> Config -> Config
stopWhen :: StopWhen -> Config -> Config
stopWhen StopWhen
cond Config
st = Config
st { _stopWhen :: StopWhen
_stopWhen = StopWhen
cond }
getStopWhen :: Config -> StopWhen
getStopWhen :: Config -> StopWhen
getStopWhen = Config -> StopWhen
_stopWhen
ordered :: Bool -> Config -> Config
ordered :: Bool -> Config -> Config
ordered Bool
flag Config
st = Config
st { _ordered :: Bool
_ordered = Bool
flag }
getOrdered :: Config -> Bool
getOrdered :: Config -> Bool
getOrdered = Config -> Bool
_ordered
interleaved :: Bool -> Config -> Config
interleaved :: Bool -> Config -> Config
interleaved Bool
flag Config
st = Config
st { _interleaved :: Bool
_interleaved = Bool
flag }
getInterleaved :: Config -> Bool
getInterleaved :: Config -> Bool
getInterleaved = Config -> Bool
_interleaved
boundThreads :: Bool -> Config -> Config
boundThreads :: Bool -> Config -> Config
boundThreads Bool
flag Config
st = Config
st { _bound :: Bool
_bound = Bool
flag }
getBound :: Config -> Bool
getBound :: Config -> Bool
getBound = Config -> Bool
_bound
newRateInfo :: Config -> IO (Maybe YieldRateInfo)
newRateInfo :: Config -> IO (Maybe YieldRateInfo)
newRateInfo Config
st = do
let rateToLatency :: a -> a
rateToLatency a
r = if a
r a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
0 then a
forall a. Bounded a => a
maxBound else a -> a
forall a b. (RealFrac a, Integral b) => a -> b
round (a -> a) -> a -> a
forall a b. (a -> b) -> a -> b
$ a
1.0e9 a -> a -> a
forall a. Fractional a => a -> a -> a
/ a
r
case Config -> Maybe Rate
getStreamRate Config
st of
Just (Rate Double
low Double
goal Double
high Int
buf) ->
let l :: NanoSecond64
l = Double -> NanoSecond64
forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
goal
minl :: NanoSecond64
minl = Double -> NanoSecond64
forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
high
maxl :: NanoSecond64
maxl = Double -> NanoSecond64
forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
low
in NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
l (NanoSecond64 -> NanoSecond64 -> LatencyRange
LatencyRange NanoSecond64
minl NanoSecond64
maxl) Int
buf
Maybe Rate
Nothing -> Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe YieldRateInfo
forall a. Maybe a
Nothing
where
mkYieldRateInfo :: NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
latency LatencyRange
latRange Int
buf = do
IORef NanoSecond64
measured <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef NanoSecond64
0
IORef (Count, Count, NanoSecond64)
wcur <- (Count, Count, NanoSecond64)
-> IO (IORef (Count, Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
IORef (Count, Count, NanoSecond64)
wcol <- (Count, Count, NanoSecond64)
-> IO (IORef (Count, Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
AbsTime
now <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Count, AbsTime)
wlong <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0,AbsTime
now)
IORef Count
period <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
1
IORef Count
gainLoss <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef (Int64 -> Count
Count Int64
0)
Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe YieldRateInfo -> IO (Maybe YieldRateInfo))
-> Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> Maybe YieldRateInfo
forall a. a -> Maybe a
Just YieldRateInfo :: NanoSecond64
-> LatencyRange
-> Int
-> IORef Count
-> IORef (Count, AbsTime)
-> Maybe NanoSecond64
-> IORef Count
-> IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IORef NanoSecond64
-> YieldRateInfo
YieldRateInfo
{ svarLatencyTarget :: NanoSecond64
svarLatencyTarget = NanoSecond64
latency
, svarLatencyRange :: LatencyRange
svarLatencyRange = LatencyRange
latRange
, svarRateBuffer :: Int
svarRateBuffer = Int
buf
, svarGainedLostYields :: IORef Count
svarGainedLostYields = IORef Count
gainLoss
, workerBootstrapLatency :: Maybe NanoSecond64
workerBootstrapLatency = Config -> Maybe NanoSecond64
getStreamLatency Config
st
, workerPollingInterval :: IORef Count
workerPollingInterval = IORef Count
period
, workerMeasuredLatency :: IORef NanoSecond64
workerMeasuredLatency = IORef NanoSecond64
measured
, workerPendingLatency :: IORef (Count, Count, NanoSecond64)
workerPendingLatency = IORef (Count, Count, NanoSecond64)
wcur
, workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
workerCollectedLatency = IORef (Count, Count, NanoSecond64)
wcol
, svarAllTimeLatency :: IORef (Count, AbsTime)
svarAllTimeLatency = IORef (Count, AbsTime)
wlong
}
newSVarStats :: IO SVarStats
newSVarStats :: IO SVarStats
newSVarStats = do
IORef Int
disp <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxWrk <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxOq <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxHs <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Int
maxWq <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef (Count, NanoSecond64)
avgLat <- (Count, NanoSecond64) -> IO (IORef (Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0, Int64 -> NanoSecond64
NanoSecond64 Int64
0)
IORef NanoSecond64
maxLat <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
IORef NanoSecond64
minLat <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
IORef (Maybe AbsTime)
stpTime <- Maybe AbsTime -> IO (IORef (Maybe AbsTime))
forall a. a -> IO (IORef a)
newIORef Maybe AbsTime
forall a. Maybe a
Nothing
SVarStats -> IO SVarStats
forall (m :: * -> *) a. Monad m => a -> m a
return SVarStats :: IORef Int
-> IORef Int
-> IORef Int
-> IORef Int
-> IORef Int
-> IORef (Count, NanoSecond64)
-> IORef NanoSecond64
-> IORef NanoSecond64
-> IORef (Maybe AbsTime)
-> SVarStats
SVarStats
{ totalDispatches :: IORef Int
totalDispatches = IORef Int
disp
, maxWorkers :: IORef Int
maxWorkers = IORef Int
maxWrk
, maxOutQSize :: IORef Int
maxOutQSize = IORef Int
maxOq
, maxHeapSize :: IORef Int
maxHeapSize = IORef Int
maxHs
, maxWorkQSize :: IORef Int
maxWorkQSize = IORef Int
maxWq
, avgWorkerLatency :: IORef (Count, NanoSecond64)
avgWorkerLatency = IORef (Count, NanoSecond64)
avgLat
, minWorkerLatency :: IORef NanoSecond64
minWorkerLatency = IORef NanoSecond64
minLat
, maxWorkerLatency :: IORef NanoSecond64
maxWorkerLatency = IORef NanoSecond64
maxLat
, svarStopTime :: IORef (Maybe AbsTime)
svarStopTime = IORef (Maybe AbsTime)
stpTime
}
avgRate :: Double -> Config -> Config
avgRate :: Double -> Config -> Config
avgRate Double
r = Maybe Rate -> Config -> Config
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate (Double
rDouble -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2) Double
r (Double
2Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
r) Int
forall a. Bounded a => a
maxBound)
minRate :: Double -> Config -> Config
minRate :: Double -> Config -> Config
minRate Double
r = Maybe Rate -> Config -> Config
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate Double
r Double
r (Double
2Double -> Double -> Double
forall a. Num a => a -> a -> a
*Double
r) Int
forall a. Bounded a => a
maxBound)
maxRate :: Double -> Config -> Config
maxRate :: Double -> Config -> Config
maxRate Double
r = Maybe Rate -> Config -> Config
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate (Double
rDouble -> Double -> Double
forall a. Fractional a => a -> a -> a
/Double
2) Double
r Double
r Int
forall a. Bounded a => a
maxBound)
constRate :: Double -> Config -> Config
constRate :: Double -> Config -> Config
constRate Double
r = Maybe Rate -> Config -> Config
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just (Rate -> Maybe Rate) -> Rate -> Maybe Rate
forall a b. (a -> b) -> a -> b
$ Double -> Double -> Double -> Int -> Rate
Rate Double
r Double
r Double
r Int
0)
{-# INLINE decrementYieldLimit #-}
decrementYieldLimit :: Maybe (IORef Count) -> IO Bool
decrementYieldLimit :: Maybe (IORef Count) -> IO Bool
decrementYieldLimit Maybe (IORef Count)
remaining =
case Maybe (IORef Count)
remaining of
Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just IORef Count
ref -> do
Count
r <- IORef Count -> (Count -> (Count, Count)) -> IO Count
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Count
ref ((Count -> (Count, Count)) -> IO Count)
-> (Count -> (Count, Count)) -> IO Count
forall a b. (a -> b) -> a -> b
$ \Count
x -> (Count
x Count -> Count -> Count
forall a. Num a => a -> a -> a
- Count
1, Count
x)
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Count
r Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
>= Count
1
{-# INLINE incrementYieldLimit #-}
incrementYieldLimit :: Maybe (IORef Count) -> IO ()
incrementYieldLimit :: Maybe (IORef Count) -> IO ()
incrementYieldLimit Maybe (IORef Count)
remaining =
case Maybe (IORef Count)
remaining of
Maybe (IORef Count)
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just IORef Count
ref -> IORef Count -> (Count -> Count) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef Count
ref (Count -> Count -> Count
forall a. Num a => a -> a -> a
+ Count
1)
{-# INLINE readOutputQBasic #-}
readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic :: forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic IORef ([ChildEvent a], Int)
q = IORef ([ChildEvent a], Int)
-> (([ChildEvent a], Int)
-> (([ChildEvent a], Int), ([ChildEvent a], Int)))
-> IO ([ChildEvent a], Int)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([ChildEvent a], Int)
q ((([ChildEvent a], Int)
-> (([ChildEvent a], Int), ([ChildEvent a], Int)))
-> IO ([ChildEvent a], Int))
-> (([ChildEvent a], Int)
-> (([ChildEvent a], Int), ([ChildEvent a], Int)))
-> IO ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ \([ChildEvent a], Int)
x -> (([],Int
0), ([ChildEvent a], Int)
x)
{-# INLINE readOutputQRaw #-}
readOutputQRaw ::
IORef ([ChildEvent a], Int) -> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw :: forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw IORef ([ChildEvent a], Int)
q Maybe SVarStats
stats = do
([ChildEvent a]
list, Int
len) <- IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic IORef ([ChildEvent a], Int)
q
case Maybe SVarStats
stats of
Just SVarStats
ss -> do
let ref :: IORef Int
ref = SVarStats -> IORef Int
maxOutQSize SVarStats
ss
Int
oqLen <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
ref
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
oqLen) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Int
ref Int
len
Maybe SVarStats
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a]
list, Int
len)
{-# INLINE ringDoorBell #-}
ringDoorBell :: IORef Bool -> MVar () -> IO ()
ringDoorBell :: IORef Bool -> MVar () -> IO ()
ringDoorBell IORef Bool
needBell MVar ()
bell = do
IO ()
storeLoadBarrier
Bool
w <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
needBell
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
w (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IORef Bool -> (Bool -> Bool) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef Bool
needBell (Bool -> Bool -> Bool
forall a b. a -> b -> a
const Bool
False)
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
bell ()
dumpCreator :: Show a => a -> String
dumpCreator :: forall a. Show a => a -> String
dumpCreator a
tid = String
"Creator tid = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> a -> String
forall a. Show a => a -> String
show a
tid
dumpOutputQ :: (Foldable t, Show a1) => IORef (t a2, a1) -> IO String
dumpOutputQ :: forall (t :: * -> *) a1 a2.
(Foldable t, Show a1) =>
IORef (t a2, a1) -> IO String
dumpOutputQ IORef (t a2, a1)
q = do
(t a2
oqList, a1
oqLen) <- IORef (t a2, a1) -> IO (t a2, a1)
forall a. IORef a -> IO a
readIORef IORef (t a2, a1)
q
String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
[ String
"outputQueue length computed = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show (t a2 -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length t a2
oqList)
, String
"outputQueue length maintained = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> a1 -> String
forall a. Show a => a -> String
show a1
oqLen
]
dumpDoorBell :: Show a => MVar a -> IO String
dumpDoorBell :: forall a. Show a => MVar a -> IO String
dumpDoorBell MVar a
mvar = do
Maybe a
db <- MVar a -> IO (Maybe a)
forall a. MVar a -> IO (Maybe a)
tryReadMVar MVar a
mvar
String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ String
"outputDoorBell = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Maybe a -> String
forall a. Show a => a -> String
show Maybe a
db
dumpNeedDoorBell :: Show a => IORef a -> IO String
dumpNeedDoorBell :: forall a. Show a => IORef a -> IO String
dumpNeedDoorBell IORef a
ref = do
a
waiting <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref
String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ String
"needDoorBell = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> a -> String
forall a. Show a => a -> String
show a
waiting
dumpRunningThreads :: Show a => IORef a -> IO String
dumpRunningThreads :: forall a. Show a => IORef a -> IO String
dumpRunningThreads IORef a
ref = do
a
rthread <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref
String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ String
"running threads = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> a -> String
forall a. Show a => a -> String
show a
rthread
dumpWorkerCount :: Show a => IORef a -> IO String
dumpWorkerCount :: forall a. Show a => IORef a -> IO String
dumpWorkerCount IORef a
ref = do
a
workers <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref
String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ String
"running thread count = " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> a -> String
forall a. Show a => a -> String
show a
workers
{-# NOINLINE mvarExcHandler #-}
mvarExcHandler :: IO String -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler :: IO String -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler IO String
dump String
label e :: BlockedIndefinitelyOnMVar
e@BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar = do
String
svInfo <- IO String
dump
Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
label String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"BlockedIndefinitelyOnMVar\n" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
svInfo
BlockedIndefinitelyOnMVar -> IO ()
forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnMVar
e
{-# NOINLINE stmExcHandler #-}
stmExcHandler :: IO String -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler :: IO String -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler IO String
dump String
label e :: BlockedIndefinitelyOnSTM
e@BlockedIndefinitelyOnSTM
BlockedIndefinitelyOnSTM = do
String
svInfo <- IO String
dump
Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
label String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
" " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"BlockedIndefinitelyOnSTM\n" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
svInfo
BlockedIndefinitelyOnSTM -> IO ()
forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnSTM
e
withDiagMVar :: Bool -> IO String -> String -> IO () -> IO ()
withDiagMVar :: Bool -> IO String -> String -> IO () -> IO ()
withDiagMVar Bool
inspecting IO String
dump String
label IO ()
action =
if Bool
inspecting
then
IO ()
action IO () -> [Handler ()] -> IO ()
forall a. IO a -> [Handler a] -> IO a
`catches` [ (BlockedIndefinitelyOnMVar -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (IO String -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler IO String
dump String
label)
, (BlockedIndefinitelyOnSTM -> IO ()) -> Handler ()
forall a e. Exception e => (e -> IO a) -> Handler a
Handler (IO String -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler IO String
dump String
label)
]
else IO ()
action
printSVar :: IO String -> String -> IO ()
printSVar :: IO String -> String -> IO ()
printSVar IO String
dump String
how = do
String
svInfo <- IO String
dump
Handle -> String -> IO ()
hPutStrLn Handle
stderr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"\n" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
how String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"\n" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
svInfo
cleanupSVar :: IORef (Set ThreadId) -> IO ()
cleanupSVar :: IORef (Set ThreadId) -> IO ()
cleanupSVar IORef (Set ThreadId)
workerSet = do
Set ThreadId
workers <- IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef IORef (Set ThreadId)
workerSet
(ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort)
(Set ThreadId -> [ThreadId]
forall a. Set a -> [a]
Set.toList Set ThreadId
workers)