module Streamly.Internal.Data.Fold.Concurrent.Channel.Type
( Channel (..)
, newChannel
, Config
, sendToWorker
, checkFoldStatus
, dumpSVar
)
where
#include "inline.hs"
import Control.Concurrent (ThreadId, myThreadId, tryPutMVar)
import Control.Concurrent.MVar (MVar, newEmptyMVar, takeMVar)
import Control.Exception (SomeException(..))
import Control.Monad (void)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (IORef, newIORef, readIORef)
import Data.List (intersperse)
import Streamly.Internal.Control.Concurrent
(MonadAsync, MonadRunInIO, askRunInIO)
import Streamly.Internal.Control.ForkLifted (doForkWith)
import Streamly.Internal.Data.Fold (Fold(..))
import Streamly.Internal.Data.Channel.Dispatcher (dumpSVarStats)
import Streamly.Internal.Data.Channel.Worker (sendWithDoorBell)
import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Stream as D
import Streamly.Internal.Data.Channel.Types
data Channel m a b = Channel
{
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue :: IORef ([ChildEvent a], Int)
, forall (m :: * -> *) a b. Channel m a b -> Limit
maxBufferLimit :: Limit
, forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell :: MVar ()
, forall (m :: * -> *) a b. Channel m a b -> m [ChildEvent a]
readOutputQ :: m [ChildEvent a]
, forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent b], Int)
outputQueueFromConsumer :: IORef ([ChildEvent b], Int)
, forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBellFromConsumer :: MVar ()
, forall (m :: * -> *) a b. Channel m a b -> MVar ()
bufferSpaceDoorBell :: MVar ()
, forall (m :: * -> *) a b. Channel m a b -> Maybe (IORef ())
svarRef :: Maybe (IORef ())
, forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats :: SVarStats
, forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode :: Bool
, forall (m :: * -> *) a b. Channel m a b -> ThreadId
svarCreator :: ThreadId
}
{-# NOINLINE dumpSVar #-}
dumpSVar :: Channel m a b -> IO String
dumpSVar :: forall (m :: * -> *) a b. Channel m a b -> IO String
dumpSVar Channel m a b
sv = do
[String]
xs <- [IO String] -> IO [String]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence ([IO String] -> IO [String]) -> [IO String] -> IO [String]
forall a b. (a -> b) -> a -> b
$ IO String -> [IO String] -> [IO String]
forall a. a -> [a] -> [a]
intersperse (String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"\n")
[ String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId -> String
forall a. Show a => a -> String
dumpCreator (Channel m a b -> ThreadId
forall (m :: * -> *) a b. Channel m a b -> ThreadId
svarCreator Channel m a b
sv))
, String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------CURRENT STATE-----------"
, IORef ([ChildEvent a], Int) -> IO String
forall (t :: * -> *) a1 a2.
(Foldable t, Show a1) =>
IORef (t a2, a1) -> IO String
dumpOutputQ (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
sv)
, MVar () -> IO String
forall a. Show a => MVar a -> IO String
dumpDoorBell (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
sv)
, String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------STATS-----------\n"
, Bool -> Maybe YieldRateInfo -> SVarStats -> IO String
dumpSVarStats (Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
sv) Maybe YieldRateInfo
forall a. Maybe a
Nothing (Channel m a b -> SVarStats
forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
sv)
]
String -> IO String
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String]
xs
sendToDriver :: Channel m a b -> ChildEvent b -> IO Int
sendToDriver :: forall (m :: * -> *) a b. Channel m a b -> ChildEvent b -> IO Int
sendToDriver Channel m a b
sv ChildEvent b
msg = do
IORef ([ChildEvent b], Int) -> MVar () -> ChildEvent b -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell (Channel m a b -> IORef ([ChildEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent b], Int)
outputQueueFromConsumer Channel m a b
sv)
(Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBellFromConsumer Channel m a b
sv) ChildEvent b
msg
sendYieldToDriver :: MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
sv b
res = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> ChildEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> ChildEvent b -> IO Int
sendToDriver Channel m a b
sv (b -> ChildEvent b
forall a. a -> ChildEvent a
ChildYield b
res)
{-# NOINLINE sendExceptionToDriver #-}
sendExceptionToDriver :: Channel m a b -> SomeException -> IO ()
sendExceptionToDriver :: forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv SomeException
e = do
ThreadId
tid <- IO ThreadId
myThreadId
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> ChildEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> ChildEvent b -> IO Int
sendToDriver Channel m a b
sv (ThreadId -> Maybe SomeException -> ChildEvent b
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e))
data FromSVarState m a b =
FromSVarRead (Channel m a b)
| FromSVarLoop (Channel m a b) [ChildEvent a]
{-# INLINE_NORMAL fromProducerD #-}
fromProducerD :: MonadIO m => Channel m a b -> D.Stream m a
fromProducerD :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromProducerD Channel m a b
svar = (State StreamK m a
-> FromSVarState m a b -> m (Step (FromSVarState m a b) a))
-> FromSVarState m a b -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
forall {m :: * -> *} {p} {a} {b}.
Monad m =>
p -> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
step (Channel m a b -> FromSVarState m a b
forall (m :: * -> *) a b. Channel m a b -> FromSVarState m a b
FromSVarRead Channel m a b
svar)
where
{-# INLINE_LATE step #-}
step :: p -> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
step p
_ (FromSVarRead Channel m a b
sv) = do
[ChildEvent a]
list <- Channel m a b -> m [ChildEvent a]
forall (m :: * -> *) a b. Channel m a b -> m [ChildEvent a]
readOutputQ Channel m a b
sv
Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. s -> Step s a
D.Skip (FromSVarState m a b -> Step (FromSVarState m a b) a)
-> FromSVarState m a b -> Step (FromSVarState m a b) a
forall a b. (a -> b) -> a -> b
$ Channel m a b -> [ChildEvent a] -> FromSVarState m a b
forall (m :: * -> *) a b.
Channel m a b -> [ChildEvent a] -> FromSVarState m a b
FromSVarLoop Channel m a b
sv ([ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)
step p
_ (FromSVarLoop Channel m a b
sv []) = Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. s -> Step s a
D.Skip (FromSVarState m a b -> Step (FromSVarState m a b) a)
-> FromSVarState m a b -> Step (FromSVarState m a b) a
forall a b. (a -> b) -> a -> b
$ Channel m a b -> FromSVarState m a b
forall (m :: * -> *) a b. Channel m a b -> FromSVarState m a b
FromSVarRead Channel m a b
sv
step p
_ (FromSVarLoop Channel m a b
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
case ChildEvent a
ev of
ChildYield a
a -> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ a -> FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. a -> s -> Step s a
D.Yield a
a (Channel m a b -> [ChildEvent a] -> FromSVarState m a b
forall (m :: * -> *) a b.
Channel m a b -> [ChildEvent a] -> FromSVarState m a b
FromSVarLoop Channel m a b
sv [ChildEvent a]
es)
ChildEvent a
ChildStopChannel -> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FromSVarState m a b) a
forall s a. Step s a
D.Stop
ChildEvent a
_ -> m (Step (FromSVarState m a b) a)
forall a. HasCallStack => a
undefined
{-# INLINE readOutputQChan #-}
readOutputQChan :: Channel m a b -> IO ([ChildEvent a], Int)
readOutputQChan :: forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a b
chan = do
let ss :: Maybe SVarStats
ss = if Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan then SVarStats -> Maybe SVarStats
forall a. a -> Maybe a
Just (Channel m a b -> SVarStats
forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
chan) else Maybe SVarStats
forall a. Maybe a
Nothing
r :: ([ChildEvent a], Int)
r@([ChildEvent a]
_, Int
n) <- IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
chan) Maybe SVarStats
ss
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
then do
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> IO String -> String -> IO () -> IO ()
withDiagMVar
(Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan)
(Channel m a b -> IO String
forall (m :: * -> *) a b. Channel m a b -> IO String
dumpSVar Channel m a b
chan)
String
"readOutputQChan: nothing to do"
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
chan)
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
chan) Maybe SVarStats
ss
else ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a], Int)
r
{-# INLINE readOutputQDB #-}
readOutputQDB :: Channel m a b -> IO ([ChildEvent a], Int)
readOutputQDB :: forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readOutputQDB Channel m a b
chan = do
([ChildEvent a], Int)
r <- Channel m a b -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a b
chan
Bool
_ <- MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
bufferSpaceDoorBell Channel m a b
chan) ()
([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a], Int)
r
mkNewChannel :: forall m a b. MonadIO m => Config -> IO (Channel m a b)
mkNewChannel :: forall (m :: * -> *) a b. MonadIO m => Config -> IO (Channel m a b)
mkNewChannel Config
cfg = do
IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
IORef ([ChildEvent b], Int)
outQRev <- ([ChildEvent b], Int) -> IO (IORef ([ChildEvent b], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMvRev <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
MVar ()
bufferMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let getSVar :: Channel m a b -> Channel m a b
getSVar :: Channel m a b -> Channel m a b
getSVar Channel m a b
sv = Channel
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputQueueFromConsumer :: IORef ([ChildEvent b], Int)
outputQueueFromConsumer = IORef ([ChildEvent b], Int)
outQRev
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
outQMvRev
, bufferSpaceDoorBell :: MVar ()
bufferSpaceDoorBell = MVar ()
bufferMv
, maxBufferLimit :: Limit
maxBufferLimit = Config -> Limit
getMaxBuffer Config
cfg
, readOutputQ :: m [ChildEvent a]
readOutputQ = IO [ChildEvent a] -> m [ChildEvent a]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [ChildEvent a] -> m [ChildEvent a])
-> IO [ChildEvent a] -> m [ChildEvent a]
forall a b. (a -> b) -> a -> b
$ (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (Channel m a b -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readOutputQDB Channel m a b
sv)
, svarRef :: Maybe (IORef ())
svarRef = Maybe (IORef ())
forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = Config -> Bool
getInspectMode Config
cfg
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: Channel m a b
sv = Channel m a b -> Channel m a b
getSVar Channel m a b
sv in Channel m a b -> IO (Channel m a b)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a b
sv
{-# INLINABLE newChannel #-}
{-# SPECIALIZE newChannel ::
(Config -> Config) -> Fold IO a b -> IO (Channel IO a b) #-}
newChannel :: (MonadRunInIO m) =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel :: forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel Config -> Config
modifier Fold m a b
f = do
let config :: Config
config = Config -> Config
modifier Config
defaultConfig
Channel m a b
sv <- IO (Channel m a b) -> m (Channel m a b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Channel m a b) -> m (Channel m a b))
-> IO (Channel m a b) -> m (Channel m a b)
forall a b. (a -> b) -> a -> b
$ Config -> IO (Channel m a b)
forall (m :: * -> *) a b. MonadIO m => Config -> IO (Channel m a b)
mkNewChannel Config
config
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
m ThreadId -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m ThreadId -> m ()) -> m ThreadId -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doForkWith (Config -> Bool
getBound Config
config) (Channel m a b -> m ()
work Channel m a b
sv) RunInIO m
mrun (Channel m a b -> SomeException -> IO ()
forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv)
Channel m a b -> m (Channel m a b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a b
sv
where
{-# NOINLINE work #-}
work :: Channel m a b -> m ()
work Channel m a b
chan =
let f1 :: Fold m a ()
f1 = (b -> m ()) -> Fold m a b -> Fold m a ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> Fold m a b -> Fold m a c
Fold.rmapM (m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> (b -> m ()) -> b -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
chan) Fold m a b
f
in Fold m a () -> Stream m a -> m ()
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold Fold m a ()
f1 (Stream m a -> m ()) -> Stream m a -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> Stream m a
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromProducerD Channel m a b
chan
{-# NOINLINE checkFoldStatus #-}
checkFoldStatus :: MonadAsync m => Channel m a b -> m (Maybe b)
checkFoldStatus :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
sv = do
([ChildEvent b]
list, Int
_) <- IO ([ChildEvent b], Int) -> m ([ChildEvent b], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent b], Int) -> m ([ChildEvent b], Int))
-> IO ([ChildEvent b], Int) -> m ([ChildEvent b], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent b], Int) -> IO ([ChildEvent b], Int)
forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic (Channel m a b -> IORef ([ChildEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent b], Int)
outputQueueFromConsumer Channel m a b
sv)
[ChildEvent b] -> m (Maybe b)
forall {m :: * -> *} {a}.
MonadThrow m =>
[ChildEvent a] -> m (Maybe a)
processEvents ([ChildEvent b] -> m (Maybe b)) -> [ChildEvent b] -> m (Maybe b)
forall a b. (a -> b) -> a -> b
$ [ChildEvent b] -> [ChildEvent b]
forall a. [a] -> [a]
reverse [ChildEvent b]
list
where
{-# INLINE processEvents #-}
processEvents :: [ChildEvent a] -> m (Maybe a)
processEvents [] = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
processEvents (ChildEvent a
ev : [ChildEvent a]
_) = do
case ChildEvent a
ev of
ChildStop ThreadId
_ Maybe SomeException
e -> m (Maybe a)
-> (SomeException -> m (Maybe a))
-> Maybe SomeException
-> m (Maybe a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe m (Maybe a)
forall a. HasCallStack => a
undefined SomeException -> m (Maybe a)
forall e a. (HasCallStack, Exception e) => e -> m a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
throwM Maybe SomeException
e
ChildEvent a
ChildStopChannel -> m (Maybe a)
forall a. HasCallStack => a
undefined
ChildYield a
b -> Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
b)
{-# INLINE isBufferAvailable #-}
isBufferAvailable :: MonadIO m => Channel m a b -> m Bool
isBufferAvailable :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
sv = do
let limit :: Limit
limit = Channel m a b -> Limit
forall (m :: * -> *) a b. Channel m a b -> Limit
maxBufferLimit Channel m a b
sv
case Limit
limit of
Limit
Unlimited -> Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Limited Word
lim -> do
([ChildEvent a]
_, Int
n) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
sv)
Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n
{-# INLINE sendToWorker #-}
sendToWorker :: MonadAsync m => Channel m a b -> a -> m (Maybe b)
sendToWorker :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m (Maybe b)
sendToWorker Channel m a b
chan a
a = m (Maybe b)
go
where
go :: m (Maybe b)
go = do
let qref :: IORef ([ChildEvent b], Int)
qref = Channel m a b -> IORef ([ChildEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent b], Int)
outputQueueFromConsumer Channel m a b
chan
Maybe b
status <- do
([ChildEvent b]
_, Int
n) <- IO ([ChildEvent b], Int) -> m ([ChildEvent b], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent b], Int) -> m ([ChildEvent b], Int))
-> IO ([ChildEvent b], Int) -> m ([ChildEvent b], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent b], Int) -> IO ([ChildEvent b], Int)
forall a. IORef a -> IO a
readIORef IORef ([ChildEvent b], Int)
qref
if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then Channel m a b -> m (Maybe b)
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
chan
else Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
case Maybe b
status of
Just b
_ -> Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
status
Maybe b
Nothing -> do
Bool
r <- Channel m a b -> m Bool
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
chan
if Bool
r
then do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
(IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
(Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
chan)
(Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
chan)
(a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Maybe b -> m (Maybe b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
else do
() <- IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
bufferSpaceDoorBell Channel m a b
chan)
m (Maybe b)
go