#ifdef __HADDOCK_VERSION__
#undef INSPECTION
#endif
#ifdef INSPECTION
{-# LANGUAGE TemplateHaskell #-}
{-# OPTIONS_GHC -fplugin Test.Inspection.Plugin #-}
#endif
module Streamly.Internal.Data.Stream.SVar
(
fromSVar
, fromSVarD
, toSVar
, toSVarParallel
, fromConsumer
, pushToFold
, teeToSVar
, newFoldSVar
, newFoldSVarF
)
where
#include "inline.hs"
import Control.Concurrent (myThreadId, takeMVar)
import Control.Exception (fromException)
import Control.Monad (when, void)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef)
import Data.Maybe (isNothing)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_)
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import System.Mem (performMajorGC)
import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import Streamly.Internal.Data.SVar
#if __GLASGOW_HASKELL__ < 810
#ifdef INSPECTION
import Control.Exception (Exception)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Typeable (Typeable)
import Test.Inspection (inspect, hasNoTypeClassesExcept)
#endif
#endif
{-# NOINLINE fromStreamVar #-}
fromStreamVar :: MonadAsync m => SVar K.Stream m a -> K.Stream m a
fromStreamVar :: SVar Stream m a -> Stream m a
fromStreamVar SVar Stream m a
sv = (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.MkStream ((forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
[ChildEvent a]
list <- SVar Stream m a -> m [ChildEvent a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar Stream m a
sv
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> Stream m a
processEvents ([ChildEvent a] -> Stream m a) -> [ChildEvent a] -> Stream m a
forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
reverse [ChildEvent a]
list
where
allDone :: m b -> m b
allDone m b
stp = do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar Stream m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar Stream m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
AbsTime
t <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (SVar Stream m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar Stream m a
sv)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> String -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar Stream m a
sv String
"SVar Done"
m b
stp
{-# INLINE processEvents #-}
processEvents :: [ChildEvent a] -> Stream m a
processEvents [] = (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.MkStream ((forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
Bool
done <- SVar Stream m a -> m Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m Bool
postProcess SVar Stream m a
sv
if Bool
done
then m r -> m r
forall (m :: * -> *) b. MonadIO m => m b -> m b
allDone m r
stp
else State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> Stream m a
fromStreamVar SVar Stream m a
sv
processEvents (ChildEvent a
ev : [ChildEvent a]
es) = (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.MkStream ((forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
let rest :: Stream m a
rest = [ChildEvent a] -> Stream m a
processEvents [ChildEvent a]
es
case ChildEvent a
ev of
ChildYield a -> a -> Stream m a -> m r
yld a
a Stream m a
rest
ChildStop tid e -> do
SVar Stream m a -> ThreadId -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar Stream m a
sv ThreadId
tid
case Maybe SomeException
e of
Maybe SomeException
Nothing -> do
Bool
stop <- ThreadId -> m Bool
forall (m :: * -> *). MonadIO m => ThreadId -> m Bool
shouldStop ThreadId
tid
if Bool
stop
then IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar Stream m a
sv) m () -> m r -> m r
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m r -> m r
forall (m :: * -> *) b. MonadIO m => m b -> m b
allDone m r
stp
else State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
rest
Just SomeException
ex ->
case SomeException -> Maybe ThreadAbort
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
Just ThreadAbort
ThreadAbort ->
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
rest
Maybe ThreadAbort
Nothing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar Stream m a
sv) m () -> m r -> m r
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m r
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
shouldStop :: ThreadId -> m Bool
shouldStop ThreadId
tid =
case SVar Stream m a -> SVarStopStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle SVar Stream m a
sv of
SVarStopStyle
StopNone -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
SVarStopStyle
StopAny -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
SVarStopStyle
StopBy -> do
ThreadId
sid <- IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ThreadId -> m ThreadId) -> IO ThreadId -> m ThreadId
forall a b. (a -> b) -> a -> b
$ IORef ThreadId -> IO ThreadId
forall a. IORef a -> IO a
readIORef (SVar Stream m a -> IORef ThreadId
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy SVar Stream m a
sv)
Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ ThreadId
tid ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
== ThreadId
sid
#if __GLASGOW_HASKELL__ < 810
#ifdef INSPECTION
inspect $ hasNoTypeClassesExcept 'fromStreamVar
[ ''Monad
, ''Applicative
, ''MonadThrow
, ''Exception
, ''MonadIO
, ''MonadBaseControl
, ''Typeable
, ''Functor
]
#endif
#endif
{-# INLINE fromSVar #-}
fromSVar :: (MonadAsync m, K.IsStream t) => SVar K.Stream m a -> t m a
fromSVar :: SVar Stream m a -> t m a
fromSVar SVar Stream m a
sv =
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
K.mkStream ((forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
IORef ()
ref <- IO (IORef ()) -> m (IORef ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ()) -> m (IORef ())) -> IO (IORef ()) -> m (IORef ())
forall a b. (a -> b) -> a -> b
$ () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
Weak (IORef ())
_ <- IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Weak (IORef ())) -> m (Weak (IORef ())))
-> IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall a b. (a -> b) -> a -> b
$ IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref IO ()
hook
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (t m a -> m r) -> t m a -> m r
forall a b. (a -> b) -> a -> b
$
Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
K.fromStream (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> Stream m a
fromStreamVar SVar Stream m a
sv{svarRef :: Maybe (IORef ())
svarRef = IORef () -> Maybe (IORef ())
forall a. a -> Maybe a
Just IORef ()
ref}
where
hook :: IO ()
hook = do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar Stream m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar Stream m a
sv) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Maybe AbsTime
r <- IO (Maybe AbsTime) -> IO (Maybe AbsTime)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe AbsTime) -> IO (Maybe AbsTime))
-> IO (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (SVar Stream m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar Stream m a
sv))
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe AbsTime -> Bool
forall a. Maybe a -> Bool
isNothing Maybe AbsTime
r) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
SVar Stream m a -> String -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar Stream m a
sv String
"SVar Garbage Collected"
SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar Stream m a
sv
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar Stream m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar Stream m a
sv) IO ()
performMajorGC
data FromSVarState t m a =
FromSVarInit
| FromSVarRead (SVar t m a)
| FromSVarLoop (SVar t m a) [ChildEvent a]
| FromSVarDone (SVar t m a)
{-# INLINE_NORMAL fromSVarD #-}
fromSVarD :: (MonadAsync m) => SVar t m a -> D.Stream m a
fromSVarD :: SVar t m a -> Stream m a
fromSVarD SVar t m a
svar = (State Stream m a
-> FromSVarState t m a -> m (Step (FromSVarState t m a) a))
-> FromSVarState t m a -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a
-> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
forall p.
p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
FromSVarState t m a
FromSVarInit
where
{-# INLINE_LATE step #-}
step :: p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step p
_ FromSVarState t m a
FromSVarInit = do
IORef ()
ref <- IO (IORef ()) -> m (IORef ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ()) -> m (IORef ())) -> IO (IORef ()) -> m (IORef ())
forall a b. (a -> b) -> a -> b
$ () -> IO (IORef ())
forall a. a -> IO (IORef a)
newIORef ()
Weak (IORef ())
_ <- IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Weak (IORef ())) -> m (Weak (IORef ())))
-> IO (Weak (IORef ())) -> m (Weak (IORef ()))
forall a b. (a -> b) -> a -> b
$ IORef () -> IO () -> IO (Weak (IORef ()))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref IO ()
hook
let sv :: SVar t m a
sv = SVar t m a
svar{svarRef :: Maybe (IORef ())
svarRef = IORef () -> Maybe (IORef ())
forall a. a -> Maybe a
Just IORef ()
ref}
Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
sv)
where
{-# NOINLINE hook #-}
hook :: IO ()
hook = do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
svar) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Maybe AbsTime
r <- IO (Maybe AbsTime) -> IO (Maybe AbsTime)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe AbsTime) -> IO (Maybe AbsTime))
-> IO (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> IO (Maybe AbsTime)
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
svar))
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe AbsTime -> Bool
forall a. Maybe a -> Bool
isNothing Maybe AbsTime
r) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
SVar t m a -> String -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
svar String
"SVar Garbage Collected"
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
svar
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
svar) IO ()
performMajorGC
step p
_ (FromSVarRead SVar t m a
sv) = do
[ChildEvent a]
list <- SVar t m a -> m [ChildEvent a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar t m a
sv
Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv ([ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)
step p
_ (FromSVarLoop SVar t m a
sv []) = do
Bool
done <- SVar t m a -> m Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m Bool
postProcess SVar t m a
sv
Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ if Bool
done
then SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarDone SVar t m a
sv
else SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
sv
step p
_ (FromSVarLoop SVar t m a
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
case ChildEvent a
ev of
ChildYield a
a -> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ a -> FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. a -> s -> Step s a
D.Yield a
a (SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
ChildStop ThreadId
tid Maybe SomeException
e -> do
SVar t m a -> ThreadId -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar t m a
sv ThreadId
tid
case Maybe SomeException
e of
Maybe SomeException
Nothing -> do
Bool
stop <- ThreadId -> m Bool
forall (m :: * -> *). MonadIO m => ThreadId -> m Bool
shouldStop ThreadId
tid
if Bool
stop
then do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
sv)
Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarDone SVar t m a
sv)
else Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
Just SomeException
ex ->
case SomeException -> Maybe ThreadAbort
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
Just ThreadAbort
ThreadAbort ->
Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
Maybe ThreadAbort
Nothing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
sv) m ()
-> m (Step (FromSVarState t m a) a)
-> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
where
shouldStop :: ThreadId -> m Bool
shouldStop ThreadId
tid =
case SVar t m a -> SVarStopStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle SVar t m a
sv of
SVarStopStyle
StopNone -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
SVarStopStyle
StopAny -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
SVarStopStyle
StopBy -> do
ThreadId
sid <- IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ThreadId -> m ThreadId) -> IO ThreadId -> m ThreadId
forall a b. (a -> b) -> a -> b
$ IORef ThreadId -> IO ThreadId
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef ThreadId
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy SVar t m a
sv)
Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ ThreadId
tid ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
== ThreadId
sid
step p
_ (FromSVarDone SVar t m a
sv) = do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
AbsTime
t <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> String -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
sv String
"SVar Done"
Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FromSVarState t m a) a
forall s a. Step s a
D.Stop
toSVar :: (K.IsStream t, MonadAsync m) => SVar K.Stream m a -> t m a -> m ()
toSVar :: SVar Stream m a -> t m a -> m ()
toSVar SVar Stream m a
sv t m a
m = SVar Stream m a -> Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m ()
toStreamVar SVar Stream m a
sv (Stream m a -> m ()) -> Stream m a -> m ()
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
K.toStream t m a
m
{-# INLINE write #-}
write :: MonadIO m => SVar t m a -> Maybe WorkerInfo -> Fold m a ()
write :: SVar t m a -> Maybe WorkerInfo -> Fold m a ()
write SVar t m a
svar Maybe WorkerInfo
winfo = (() -> a -> m (Step () ()))
-> m (Step () ()) -> (() -> m ()) -> Fold m a ()
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold () -> a -> m (Step () ())
forall (m :: * -> *) b. MonadIO m => () -> a -> m (Step () b)
step m (Step () ())
forall b. m (Step () b)
initial () -> m ()
forall (m :: * -> *). MonadIO m => () -> m ()
extract
where
initial :: m (Step () b)
initial = Step () b -> m (Step () b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step () b -> m (Step () b)) -> Step () b -> m (Step () b)
forall a b. (a -> b) -> a -> b
$ () -> Step () b
forall s b. s -> Step s b
FL.Partial ()
step :: () -> a -> m (Step () b)
step () a
x =
IO (Step () b) -> m (Step () b)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Step () b) -> m (Step () b))
-> IO (Step () b) -> m (Step () b)
forall a b. (a -> b) -> a -> b
$ do
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar t m a
svar
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
svar (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
x)
Step () b -> IO (Step () b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step () b -> IO (Step () b)) -> Step () b -> IO (Step () b)
forall a b. (a -> b) -> a -> b
$ () -> Step () b
forall s b. s -> Step s b
FL.Partial ()
extract :: () -> m ()
extract () = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo
{-# INLINE writeLimited #-}
writeLimited :: MonadIO m
=> SVar t m a -> Maybe WorkerInfo -> Fold m a ()
writeLimited :: SVar t m a -> Maybe WorkerInfo -> Fold m a ()
writeLimited SVar t m a
svar Maybe WorkerInfo
winfo = (Bool -> a -> m (Step Bool ()))
-> m (Step Bool ()) -> (Bool -> m ()) -> Fold m a ()
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold Bool -> a -> m (Step Bool ())
forall (m :: * -> *). MonadIO m => Bool -> a -> m (Step Bool ())
step m (Step Bool ())
forall b. m (Step Bool b)
initial Bool -> m ()
forall (m :: * -> *). MonadIO m => Bool -> m ()
extract
where
initial :: m (Step Bool b)
initial = Step Bool b -> m (Step Bool b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Bool b -> m (Step Bool b)) -> Step Bool b -> m (Step Bool b)
forall a b. (a -> b) -> a -> b
$ Bool -> Step Bool b
forall s b. s -> Step s b
FL.Partial Bool
True
step :: Bool -> a -> m (Step Bool ())
step Bool
True a
x =
IO (Step Bool ()) -> m (Step Bool ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Step Bool ()) -> m (Step Bool ()))
-> IO (Step Bool ()) -> m (Step Bool ())
forall a b. (a -> b) -> a -> b
$ do
Bool
yieldLimitOk <- SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar t m a
svar
if Bool
yieldLimitOk
then do
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar t m a
svar
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
svar (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
x)
Step Bool () -> IO (Step Bool ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Bool () -> IO (Step Bool ()))
-> Step Bool () -> IO (Step Bool ())
forall a b. (a -> b) -> a -> b
$ Bool -> Step Bool ()
forall s b. s -> Step s b
FL.Partial Bool
True
else do
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar t m a
svar
SVar t m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo
Step Bool () -> IO (Step Bool ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Bool () -> IO (Step Bool ()))
-> Step Bool () -> IO (Step Bool ())
forall a b. (a -> b) -> a -> b
$ () -> Step Bool ()
forall s b. b -> Step s b
FL.Done ()
step Bool
False a
_ = Step Bool () -> m (Step Bool ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Bool () -> m (Step Bool ()))
-> Step Bool () -> m (Step Bool ())
forall a b. (a -> b) -> a -> b
$ () -> Step Bool ()
forall s b. b -> Step s b
FL.Done ()
extract :: Bool -> m ()
extract Bool
True = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo
extract Bool
False = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
{-# INLINE toSVarParallel #-}
toSVarParallel :: MonadAsync m
=> State t m a -> SVar t m a -> D.Stream m a -> m ()
toSVarParallel :: State t m a -> SVar t m a -> Stream m a -> m ()
toSVarParallel State t m a
st SVar t m a
sv Stream m a
xs =
if SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv
then m ()
forkWithDiag
else do
ThreadId
tid <-
case State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
Maybe Count
Nothing -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
work Maybe WorkerInfo
forall a. Maybe a
Nothing)
(SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv)
(SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
Just Count
_ -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
workLim Maybe WorkerInfo
forall a. Maybe a
Nothing)
(SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv)
(SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv ThreadId
tid
where
{-# NOINLINE work #-}
work :: Maybe WorkerInfo -> m ()
work Maybe WorkerInfo
info = Fold m a () -> Stream m a -> m ()
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold (SVar t m a -> Maybe WorkerInfo -> Fold m a ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> Maybe WorkerInfo -> Fold m a ()
write SVar t m a
sv Maybe WorkerInfo
info) Stream m a
xs
{-# NOINLINE workLim #-}
workLim :: Maybe WorkerInfo -> m ()
workLim Maybe WorkerInfo
info = Fold m a () -> Stream m a -> m ()
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold (SVar t m a -> Maybe WorkerInfo -> Fold m a ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> Maybe WorkerInfo -> Fold m a ()
writeLimited SVar t m a
sv Maybe WorkerInfo
info) Stream m a
xs
{-# NOINLINE forkWithDiag #-}
forkWithDiag :: m ()
forkWithDiag = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> (Int -> Int) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) ((Int -> Int) -> IO ()) -> (Int -> Int) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv
Maybe WorkerInfo
winfo <-
case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> Maybe WorkerInfo -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe WorkerInfo
forall a. Maybe a
Nothing
Just YieldRateInfo
_ -> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo))
-> IO (Maybe WorkerInfo) -> m (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ do
IORef Count
cntRef <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
0
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Count, AbsTime)
lat <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0, AbsTime
t)
Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe WorkerInfo -> IO (Maybe WorkerInfo))
-> Maybe WorkerInfo -> IO (Maybe WorkerInfo)
forall a b. (a -> b) -> a -> b
$ WorkerInfo -> Maybe WorkerInfo
forall a. a -> Maybe a
Just WorkerInfo :: Count -> IORef Count -> IORef (Count, AbsTime) -> WorkerInfo
WorkerInfo
{ workerYieldMax :: Count
workerYieldMax = Count
0
, workerYieldCount :: IORef Count
workerYieldCount = IORef Count
cntRef
, workerLatencyStart :: IORef (Count, AbsTime)
workerLatencyStart = IORef (Count, AbsTime)
lat
}
ThreadId
tid <-
case State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
Maybe Count
Nothing -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
work Maybe WorkerInfo
winfo)
(SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv)
(SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
Just Count
_ -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
workLim Maybe WorkerInfo
winfo)
(SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv)
(SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv ThreadId
tid
{-# NOINLINE fromProducer #-}
fromProducer :: forall m a . MonadAsync m => SVar K.Stream m a -> K.Stream m a
fromProducer :: SVar Stream m a -> Stream m a
fromProducer SVar Stream m a
sv = (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
K.mkStream ((forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
[ChildEvent a]
list <- SVar Stream m a -> m [ChildEvent a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar Stream m a
sv
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> Stream m a
processEvents ([ChildEvent a] -> Stream m a) -> [ChildEvent a] -> Stream m a
forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
reverse [ChildEvent a]
list
where
allDone :: m r -> m r
allDone :: m r -> m r
allDone m r
stp = do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar Stream m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar Stream m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
AbsTime
t <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (SVar Stream m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar Stream m a
sv)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> String -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar Stream m a
sv String
"SVar Done"
SVar Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
sendStopToProducer SVar Stream m a
sv
m r
stp
{-# INLINE processEvents #-}
processEvents :: [ChildEvent a] -> K.Stream m a
processEvents :: [ChildEvent a] -> Stream m a
processEvents [] = (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
K.mkStream ((forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> Stream m a
fromProducer SVar Stream m a
sv
processEvents (ChildEvent a
ev : [ChildEvent a]
es) = (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
K.mkStream ((forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
_ a -> Stream m a -> m r
yld a -> m r
_ m r
stp -> do
let rest :: Stream m a
rest = [ChildEvent a] -> Stream m a
processEvents [ChildEvent a]
es
case ChildEvent a
ev of
ChildYield a
a -> a -> Stream m a -> m r
yld a
a Stream m a
rest
ChildStop ThreadId
tid Maybe SomeException
e -> do
SVar Stream m a -> ThreadId -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar Stream m a
sv ThreadId
tid
case Maybe SomeException
e of
Maybe SomeException
Nothing -> m r -> m r
forall r. m r -> m r
allDone m r
stp
Just SomeException
_ -> String -> m r
forall a. HasCallStack => String -> a
error String
"Bug: fromProducer: received exception"
{-# INLINE newFoldSVar #-}
newFoldSVar :: (K.IsStream t, MonadAsync m)
=> State K.Stream m a -> (t m a -> m b) -> m (SVar K.Stream m a)
newFoldSVar :: State Stream m a -> (t m a -> m b) -> m (SVar Stream m a)
newFoldSVar State Stream m a
stt t m a -> m b
f = do
SVar Stream m a
sv <- SVarStopStyle -> State Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopAny (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
stt)
IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar Stream m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar Stream m a
sv
m ThreadId -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m ThreadId -> m ()) -> m ThreadId -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m b -> m ()) -> m b -> m ()
forall a b. (a -> b) -> a -> b
$ t m a -> m b
f (t m a -> m b) -> t m a -> m b
forall a b. (a -> b) -> a -> b
$ Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
K.fromStream (Stream m a -> t m a) -> Stream m a -> t m a
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> Stream m a
fromProducer SVar Stream m a
sv)
(SVar Stream m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv)
(SVar Stream m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleFoldException SVar Stream m a
sv)
SVar Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar Stream m a
sv
{-# INLINE_NORMAL fromProducerD #-}
fromProducerD :: (MonadAsync m) => SVar t m a -> D.Stream m a
fromProducerD :: SVar t m a -> Stream m a
fromProducerD SVar t m a
svar = (State Stream m a
-> FromSVarState t m a -> m (Step (FromSVarState t m a) a))
-> FromSVarState t m a -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a
-> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) p (t :: (* -> *) -> * -> *) a.
MonadIO m =>
p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step (SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
svar)
where
{-# INLINE_LATE step #-}
step :: p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step p
_ (FromSVarRead SVar t m a
sv) = do
[ChildEvent a]
list <- SVar t m a -> m [ChildEvent a]
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar t m a
sv
Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv ([ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)
step p
_ (FromSVarLoop SVar t m a
sv []) = Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (FromSVarState t m a -> Step (FromSVarState t m a) a)
-> FromSVarState t m a -> Step (FromSVarState t m a) a
forall a b. (a -> b) -> a -> b
$ SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
sv
step p
_ (FromSVarLoop SVar t m a
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
case ChildEvent a
ev of
ChildYield a
a -> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ a -> FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. a -> s -> Step s a
D.Yield a
a (SVar t m a -> [ChildEvent a] -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
ChildStop ThreadId
tid Maybe SomeException
e -> do
SVar t m a -> ThreadId -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar t m a
sv ThreadId
tid
case Maybe SomeException
e of
Maybe SomeException
Nothing -> do
SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
sendStopToProducer SVar t m a
sv
Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a))
-> Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState t m a -> Step (FromSVarState t m a) a
forall s a. s -> Step s a
D.Skip (SVar t m a -> FromSVarState t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarDone SVar t m a
sv)
Just SomeException
_ -> String -> m (Step (FromSVarState t m a) a)
forall a. HasCallStack => String -> a
error String
"Bug: fromProducer: received exception"
step p
_ (FromSVarDone SVar t m a
sv) = do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
AbsTime
t <- IO AbsTime -> m AbsTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO AbsTime -> m AbsTime) -> IO AbsTime -> m AbsTime
forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> String -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
sv String
"SVar Done"
Step (FromSVarState t m a) a -> m (Step (FromSVarState t m a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FromSVarState t m a) a
forall s a. Step s a
D.Stop
step p
_ FromSVarState t m a
FromSVarInit = m (Step (FromSVarState t m a) a)
forall a. HasCallStack => a
undefined
{-# INLINE newFoldSVarF #-}
newFoldSVarF :: MonadAsync m => State t m a -> Fold m a b -> m (SVar t m a)
newFoldSVarF :: State t m a -> Fold m a b -> m (SVar t m a)
newFoldSVarF State t m a
stt Fold m a b
f = do
SVar t m a
sv <- SVarStopStyle -> State t m a -> m (SVar t m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopAny (State t m a -> State t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State t m a
stt)
IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv
m ThreadId -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m ThreadId -> m ()) -> m ThreadId -> m ()
forall a b. (a -> b) -> a -> b
$ m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadBaseControl IO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (SVar t m a -> m ()
forall (t :: (* -> *) -> * -> *). SVar t m a -> m ()
work SVar t m a
sv) (SVar t m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (SVar t m a -> SomeException -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleFoldException SVar t m a
sv)
SVar t m a -> m (SVar t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
sv
where
{-# NOINLINE work #-}
work :: SVar t m a -> m ()
work SVar t m a
sv = m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m b -> m ()) -> m b -> m ()
forall a b. (a -> b) -> a -> b
$ Fold m a b -> Stream m a -> m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold Fold m a b
f (Stream m a -> m b) -> Stream m a -> m b
forall a b. (a -> b) -> a -> b
$ SVar t m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
fromProducerD SVar t m a
sv
{-# NOINLINE fromConsumer #-}
fromConsumer :: MonadAsync m => SVar K.Stream m a -> m Bool
fromConsumer :: SVar Stream m a -> m Bool
fromConsumer SVar Stream m a
sv = do
([ChildEvent a]
list, Int
_) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic (SVar Stream m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer SVar Stream m a
sv)
[ChildEvent a] -> m Bool
forall (m :: * -> *) a. MonadThrow m => [ChildEvent a] -> m Bool
processEvents ([ChildEvent a] -> m Bool) -> [ChildEvent a] -> m Bool
forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
reverse [ChildEvent a]
list
where
{-# INLINE processEvents #-}
processEvents :: [ChildEvent a] -> m Bool
processEvents [] = Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
processEvents (ChildEvent a
ev : [ChildEvent a]
_) = do
case ChildEvent a
ev of
ChildStop ThreadId
_ Maybe SomeException
e -> do
case Maybe SomeException
e of
Maybe SomeException
Nothing -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just SomeException
ex -> SomeException -> m Bool
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
ChildYield a
_ -> String -> m Bool
forall a. HasCallStack => String -> a
error String
"Bug: fromConsumer: invalid ChildYield event"
{-# INLINE pushToFold #-}
pushToFold :: MonadAsync m => SVar K.Stream m a -> a -> m Bool
pushToFold :: SVar Stream m a -> a -> m Bool
pushToFold SVar Stream m a
sv a
a = do
let qref :: IORef ([ChildEvent a], Int)
qref = SVar Stream m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer SVar Stream m a
sv
Bool
done <- do
([ChildEvent a]
_, Int
n) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef IORef ([ChildEvent a], Int)
qref
if (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0)
then SVar Stream m a -> m Bool
forall (m :: * -> *) a. MonadAsync m => SVar Stream m a -> m Bool
fromConsumer SVar Stream m a
sv
else Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
if Bool
done
then Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar Stream m a
sv
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Stream m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
{-# INLINE teeToSVar #-}
teeToSVar :: (K.IsStream t, MonadAsync m) =>
SVar K.Stream m a -> t m a -> t m a
teeToSVar :: SVar Stream m a -> t m a -> t m a
teeToSVar SVar Stream m a
svr t m a
m = (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
K.mkStream ((forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (Bool -> t m a -> t m a
forall (t :: (* -> *) -> * -> *).
IsStream t =>
Bool -> t m a -> t m a
go Bool
False t m a
m)
where
go :: Bool -> t m a -> t m a
go Bool
False t m a
m0 = (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
K.mkStream ((forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
_ m r
stp -> do
let drain :: m ()
drain = do
Bool
done <- SVar Stream m a -> m Bool
forall (m :: * -> *) a. MonadAsync m => SVar Stream m a -> m Bool
fromConsumer SVar Stream m a
svr
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> String -> IO () -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar Stream m a
svr String
"teeToSVar: waiting to drain"
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar Stream m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar Stream m a
svr)
m ()
drain
stopFold :: m ()
stopFold = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
svr Maybe WorkerInfo
forall a. Maybe a
Nothing
m ()
drain
stop :: m r
stop = m ()
stopFold m () -> m r -> m r
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m r
stp
single :: a -> m r
single a
a = do
Bool
done <- SVar Stream m a -> a -> m Bool
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> a -> m Bool
pushToFold SVar Stream m a
svr a
a
a -> t m a -> m r
yld a
a (Bool -> t m a -> t m a
go Bool
done (m () -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a
K.nilM m ()
stopFold))
yieldk :: a -> t m a -> m r
yieldk a
a t m a
r = SVar Stream m a -> a -> m Bool
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> a -> m Bool
pushToFold SVar Stream m a
svr a
a m Bool -> (Bool -> m r) -> m r
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
done -> a -> t m a -> m r
yld a
a (Bool -> t m a -> t m a
go Bool
done t m a
r)
in State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
K.foldStreamShared State Stream m a
st a -> t m a -> m r
yieldk a -> m r
single m r
stop t m a
m0
go Bool
True t m a
m0 = t m a
m0