#ifdef __HADDOCK_VERSION__
#undef INSPECTION
#endif
#ifdef INSPECTION
{-# LANGUAGE TemplateHaskell #-}
{-# OPTIONS_GHC -fplugin Test.Inspection.Plugin #-}
#endif
module Streamly.Internal.Data.Stream.Concurrent.Channel.Operations
(
toChannel
, toChannelK
, fromChannel
, fromChannelK
)
where
#include "inline.hs"
import Control.Exception (fromException)
import Control.Monad (when)
import Control.Monad.Catch (throwM, MonadThrow)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef)
import Data.Maybe (isNothing)
import Streamly.Internal.Control.Concurrent
(MonadAsync, MonadRunInIO, askRunInIO)
import Streamly.Internal.Data.Stream.StreamD.Type (Stream)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import System.Mem (performMajorGC)
import qualified Streamly.Internal.Data.Stream.StreamD as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Stream.Channel.Types hiding (inspect)
import Prelude hiding (map, concat, concatMap)
#ifdef INSPECTION
import Control.Exception (Exception)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Typeable (Typeable)
import Test.Inspection (inspect, hasNoTypeClassesExcept)
#endif
{-# INLINE toChannelK #-}
toChannelK :: MonadRunInIO m => Channel m a -> K.StreamK m a -> m ()
toChannelK :: forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
sv StreamK m a
m = do
RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Bool -> (RunInIO m, StreamK m a) -> IO ()
enqueue Channel m a
sv Bool
False (RunInIO m
runIn, StreamK m a
m)
{-# INLINE toChannel #-}
toChannel :: MonadRunInIO m => Channel m a -> Stream m a -> m ()
toChannel :: forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> Stream m a -> m ()
toChannel Channel m a
chan = forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK
{-# NOINLINE fromChannelRaw #-}
fromChannelRaw :: (MonadIO m, MonadThrow m) => Channel m a -> K.StreamK m a
fromChannelRaw :: forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
Channel m a -> StreamK m a
fromChannelRaw Channel m a
sv = forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.MkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
[ChildEvent a]
list <- forall (m :: * -> *) a. Channel m a -> m [ChildEvent a]
readOutputQ Channel m a
sv
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> StreamK m a
processEvents forall a b. (a -> b) -> a -> b
$ forall a. [a] -> [a]
reverse [ChildEvent a]
list
where
cleanup :: m ()
cleanup = do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)) (forall a. a -> Maybe a
Just AbsTime
t)
IO String -> String -> IO ()
printSVar (forall (m :: * -> *) a. Channel m a -> IO String
dumpSVar Channel m a
sv) String
"SVar Done"
{-# INLINE processEvents #-}
processEvents :: [ChildEvent a] -> StreamK m a
processEvents [] = forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.MkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
Bool
done <- forall (m :: * -> *) a. Channel m a -> m Bool
postProcess Channel m a
sv
if Bool
done
then m ()
cleanup forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m r
stp
else forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
Channel m a -> StreamK m a
fromChannelRaw Channel m a
sv
processEvents (ChildEvent a
ev : [ChildEvent a]
es) = forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.MkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
let rest :: StreamK m a
rest = [ChildEvent a] -> StreamK m a
processEvents [ChildEvent a]
es
case ChildEvent a
ev of
ChildYield a
a -> a -> StreamK m a -> m r
yld a
a StreamK m a
rest
ChildEvent a
ChildStopChannel -> do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Set ThreadId) -> IO ()
cleanupSVar (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv))
m ()
cleanup forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m r
stp
ChildStop ThreadId
tid Maybe SomeException
e -> do
forall (m :: * -> *) a. Channel m a -> ThreadId -> m ()
accountThread Channel m a
sv ThreadId
tid
case Maybe SomeException
e of
Maybe SomeException
Nothing -> forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
rest
Just SomeException
ex ->
case forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
Just ThreadAbort
ThreadAbort ->
forall a. HasCallStack => String -> a
error String
"processEvents: got ThreadAbort"
Maybe ThreadAbort
Nothing -> do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Set ThreadId) -> IO ()
cleanupSVar (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv))
m ()
cleanup forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
#ifdef INSPECTION
inspect $ hasNoTypeClassesExcept 'fromChannelRaw
[ ''Monad
, ''Applicative
, ''MonadThrow
, ''Exception
, ''MonadIO
, ''MonadBaseControl
, ''Typeable
, ''Functor
]
#endif
{-# INLINE fromChannelK #-}
fromChannelK :: MonadAsync m => Channel m a -> K.StreamK m a
fromChannelK :: forall (m :: * -> *) a. MonadAsync m => Channel m a -> StreamK m a
fromChannelK Channel m a
sv =
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
IORef ()
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef ()
Weak (IORef ())
_ <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref IO ()
hook
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m ()
startChannel Channel m a
sv
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
Channel m a -> StreamK m a
fromChannelRaw Channel m a
sv{svarRef :: Maybe (IORef ())
svarRef = forall a. a -> Maybe a
Just IORef ()
ref}
where
hook :: IO ()
hook = do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) forall a b. (a -> b) -> a -> b
$ do
Maybe AbsTime
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv))
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Maybe a -> Bool
isNothing Maybe AbsTime
r) forall a b. (a -> b) -> a -> b
$
IO String -> String -> IO ()
printSVar (forall (m :: * -> *) a. Channel m a -> IO String
dumpSVar Channel m a
sv) String
"SVar Garbage Collected"
IORef (Set ThreadId) -> IO ()
cleanupSVar (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) IO ()
performMajorGC
{-# INLINE fromChannel #-}
fromChannel :: MonadAsync m => Channel m a -> Stream m a
fromChannel :: forall (m :: * -> *) a. MonadAsync m => Channel m a -> Stream m a
fromChannel = forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadAsync m => Channel m a -> StreamK m a
fromChannelK
data FromSVarState t m a =
FromSVarInit
| FromSVarRead (Channel m a)
| FromSVarLoop (Channel m a) [ChildEvent a]
| FromSVarDone (Channel m a)
{-# INLINE_NORMAL _fromChannelD #-}
_fromChannelD :: (MonadIO m, MonadThrow m) => Channel m a -> D.Stream m a
_fromChannelD :: forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
Channel m a -> Stream m a
_fromChannelD Channel m a
svar = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream forall {p} {t} {t}.
p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step forall t (m :: * -> *) a. FromSVarState t m a
FromSVarInit
where
{-# INLINE_LATE step #-}
step :: p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step p
_ FromSVarState t m a
FromSVarInit = do
IORef ()
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef ()
Weak (IORef ())
_ <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref IO ()
hook
let sv :: Channel m a
sv = Channel m a
svar{svarRef :: Maybe (IORef ())
svarRef = forall a. a -> Maybe a
Just IORef ()
ref}
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall t (m :: * -> *) a. Channel m a -> FromSVarState t m a
FromSVarRead Channel m a
sv)
where
{-# NOINLINE hook #-}
hook :: IO ()
hook = do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
svar) forall a b. (a -> b) -> a -> b
$ do
Maybe AbsTime
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
svar))
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Maybe a -> Bool
isNothing Maybe AbsTime
r) forall a b. (a -> b) -> a -> b
$
IO String -> String -> IO ()
printSVar (forall (m :: * -> *) a. Channel m a -> IO String
dumpSVar Channel m a
svar) String
"SVar Garbage Collected"
IORef (Set ThreadId) -> IO ()
cleanupSVar (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
svar)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
svar) IO ()
performMajorGC
step p
_ (FromSVarRead Channel m a
sv) = do
[ChildEvent a]
list <- forall (m :: * -> *) a. Channel m a -> m [ChildEvent a]
readOutputQ Channel m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall t (m :: * -> *) a.
Channel m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop Channel m a
sv (forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)
step p
_ (FromSVarLoop Channel m a
sv []) = do
Bool
done <- forall (m :: * -> *) a. Channel m a -> m Bool
postProcess Channel m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ if Bool
done
then forall t (m :: * -> *) a. Channel m a -> FromSVarState t m a
FromSVarDone Channel m a
sv
else forall t (m :: * -> *) a. Channel m a -> FromSVarState t m a
FromSVarRead Channel m a
sv
step p
_ (FromSVarLoop Channel m a
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
case ChildEvent a
ev of
ChildYield a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
D.Yield a
a (forall t (m :: * -> *) a.
Channel m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop Channel m a
sv [ChildEvent a]
es)
ChildEvent a
ChildStopChannel -> do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Set ThreadId) -> IO ()
cleanupSVar (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv))
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall t (m :: * -> *) a. Channel m a -> FromSVarState t m a
FromSVarDone Channel m a
sv)
ChildStop ThreadId
tid Maybe SomeException
e -> do
forall (m :: * -> *) a. Channel m a -> ThreadId -> m ()
accountThread Channel m a
sv ThreadId
tid
case Maybe SomeException
e of
Maybe SomeException
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall t (m :: * -> *) a.
Channel m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop Channel m a
sv [ChildEvent a]
es)
Just SomeException
ex ->
case forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
Just ThreadAbort
ThreadAbort ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall t (m :: * -> *) a.
Channel m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop Channel m a
sv [ChildEvent a]
es)
Maybe ThreadAbort
Nothing -> do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Set ThreadId) -> IO ()
cleanupSVar (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv))
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
step p
_ (FromSVarDone Channel m a
sv) = do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) forall a b. (a -> b) -> a -> b
$ do
AbsTime
t <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)) (forall a. a -> Maybe a
Just AbsTime
t)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ IO String -> String -> IO ()
printSVar (forall (m :: * -> *) a. Channel m a -> IO String
dumpSVar Channel m a
sv) String
"SVar Done"
forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
D.Stop