module Streamly.Internal.Data.SVar.Pull
(
readOutputQBasic
, readOutputQRaw
, readOutputQPaced
, readOutputQBounded
, postProcessPaced
, postProcessBounded
, cleanupSVar
, cleanupSVarFromWorker
)
where
#include "inline.hs"
import Control.Concurrent (myThreadId, throwTo)
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (readIORef, writeIORef)
import Data.IORef (IORef)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS)
import qualified Data.Set as S
import Streamly.Internal.Data.SVar.Type
import Streamly.Internal.Data.SVar.Dispatch
{-# INLINE readOutputQBasic #-}
readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic :: forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic IORef ([ChildEvent a], Int)
q = forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([ChildEvent a], Int)
q forall a b. (a -> b) -> a -> b
$ \([ChildEvent a], Int)
x -> (([],Int
0), ([ChildEvent a], Int)
x)
{-# INLINE readOutputQRaw #-}
readOutputQRaw :: SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv = do
([ChildEvent a]
list, Int
len) <- forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ do
let ref :: IORef Int
ref = SVarStats -> IORef Int
maxOutQSize forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv
Int
oqLen <- forall a. IORef a -> IO a
readIORef IORef Int
ref
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len forall a. Ord a => a -> a -> Bool
> Int
oqLen) forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef IORef Int
ref Int
len
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a]
list, Int
len)
readOutputQBounded :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQBounded :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded SVar t m a
sv = do
([ChildEvent a]
list, Int
len) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv
if Int
len forall a. Ord a => a -> a -> Bool
<= Int
0
then m [ChildEvent a]
blockingRead
else do
m ()
sendOneWorker
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
list
where
sendOneWorker :: m ()
sendOneWorker = do
Int
cnt <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
cnt forall a. Ord a => a -> a -> Bool
<= Int
0) forall a b. (a -> b) -> a -> b
$ do
Bool
done <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv)
{-# INLINE blockingRead #-}
blockingRead :: m [ChildEvent a]
blockingRead = do
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
sendWorkerDelay (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
0) SVar t m a
sv
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a b. (a, b) -> a
fst forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv)
readOutputQPaced :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQPaced :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced SVar t m a
sv = do
([ChildEvent a]
list, Int
len) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv
if Int
len forall a. Ord a => a -> a -> Bool
<= Int
0
then m [ChildEvent a]
blockingRead
else do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
list
where
{-# INLINE blockingRead #-}
blockingRead :: m [ChildEvent a]
blockingRead = do
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
sendWorkerDelayPaced forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a b. (a, b) -> a
fst forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv)
postProcessPaced :: MonadAsync m => SVar t m a -> m Bool
postProcessPaced :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced SVar t m a
sv = do
Bool
workersDone <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
if Bool
workersDone
then do
Bool
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
r) forall a b. (a -> b) -> a -> b
$ do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv
Bool
noWorker <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
noWorker forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
r
else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
postProcessBounded :: MonadAsync m => SVar t m a -> m Bool
postProcessBounded :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded SVar t m a
sv = do
Bool
workersDone <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
if Bool
workersDone
then do
Bool
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
r) (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
r
else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
cleanupSVar :: SVar t m a -> IO ()
cleanupSVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
sv = do
Set ThreadId
workers <- forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv)
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort)
Set ThreadId
workers
cleanupSVarFromWorker :: SVar t m a -> IO ()
cleanupSVarFromWorker :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar t m a
sv = do
Set ThreadId
workers <- forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv)
ThreadId
self <- IO ThreadId
myThreadId
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort)
(forall a. (a -> Bool) -> [a] -> [a]
Prelude.filter (forall a. Eq a => a -> a -> Bool
/= ThreadId
self) forall a b. (a -> b) -> a -> b
$ forall a. Set a -> [a]
S.toList Set ThreadId
workers)