{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
#ifdef __HADDOCK_VERSION__
#undef INSPECTION
#endif
#ifdef INSPECTION
{-# LANGUAGE TemplateHaskell #-}
{-# OPTIONS_GHC -fplugin Test.Inspection.Plugin #-}
#endif
module Streamly.Internal.Data.Stream.SVar
( fromSVar
, fromStreamVar
, fromProducer
, fromConsumer
, toSVar
, pushToFold
)
where
import Control.Exception (fromException)
import Control.Monad (when, void)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef)
import Data.Maybe (isNothing)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import System.Mem (performMajorGC)
import Streamly.Internal.Data.SVar
import Streamly.Internal.Data.Stream.StreamK hiding (reverse)
#if __GLASGOW_HASKELL__ < 810
#ifdef INSPECTION
import Control.Exception (Exception)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Typeable (Typeable)
import Test.Inspection (inspect, hasNoTypeClassesExcept)
#endif
#endif
{-# NOINLINE fromStreamVar #-}
fromStreamVar :: MonadAsync m => SVar Stream m a -> Stream m a
fromStreamVar sv = MkStream $ \st yld sng stp -> do
list <- readOutputQ sv
foldStream st yld sng stp $ processEvents $ reverse list
where
allDone stp = do
when (svarInspectMode sv) $ do
t <- liftIO $ getTime Monotonic
liftIO $ writeIORef (svarStopTime (svarStats sv)) (Just t)
liftIO $ printSVar sv "SVar Done"
stp
{-# INLINE processEvents #-}
processEvents [] = MkStream $ \st yld sng stp -> do
done <- postProcess sv
if done
then allDone stp
else foldStream st yld sng stp $ fromStreamVar sv
processEvents (ev : es) = MkStream $ \st yld sng stp -> do
let rest = processEvents es
case ev of
ChildYield a -> yld a rest
ChildStop tid e -> do
accountThread sv tid
case e of
Nothing -> do
stop <- shouldStop tid
if stop
then liftIO (cleanupSVar sv) >> allDone stp
else foldStream st yld sng stp rest
Just ex ->
case fromException ex of
Just ThreadAbort ->
foldStream st yld sng stp rest
Nothing -> liftIO (cleanupSVar sv) >> throwM ex
shouldStop tid =
case svarStopStyle sv of
StopNone -> return False
StopAny -> return True
StopBy -> do
sid <- liftIO $ readIORef (svarStopBy sv)
return $ if tid == sid then True else False
#if __GLASGOW_HASKELL__ < 810
#ifdef INSPECTION
inspect $ hasNoTypeClassesExcept 'fromStreamVar
[ ''Monad
, ''Applicative
, ''MonadThrow
, ''Exception
, ''MonadIO
, ''MonadBaseControl
, ''Typeable
, ''Functor
]
#endif
#endif
{-# INLINE fromSVar #-}
fromSVar :: (MonadAsync m, IsStream t) => SVar Stream m a -> t m a
fromSVar sv =
mkStream $ \st yld sng stp -> do
ref <- liftIO $ newIORef ()
_ <- liftIO $ mkWeakIORef ref hook
foldStreamShared st yld sng stp $
fromStream $ fromStreamVar sv{svarRef = Just ref}
where
hook = do
when (svarInspectMode sv) $ do
r <- liftIO $ readIORef (svarStopTime (svarStats sv))
when (isNothing r) $
printSVar sv "SVar Garbage Collected"
cleanupSVar sv
when (svarInspectMode sv) performMajorGC
toSVar :: (IsStream t, MonadAsync m) => SVar Stream m a -> t m a -> m ()
toSVar sv m = toStreamVar sv (toStream m)
{-# NOINLINE fromProducer #-}
fromProducer :: MonadAsync m => SVar Stream m a -> Stream m a
fromProducer sv = mkStream $ \st yld sng stp -> do
list <- readOutputQ sv
foldStream st yld sng stp $ processEvents $ reverse list
where
allDone stp = do
when (svarInspectMode sv) $ do
t <- liftIO $ getTime Monotonic
liftIO $ writeIORef (svarStopTime (svarStats sv)) (Just t)
liftIO $ printSVar sv "SVar Done"
sendStopToProducer sv
stp
{-# INLINE processEvents #-}
processEvents [] = mkStream $ \st yld sng stp -> do
foldStream st yld sng stp $ fromProducer sv
processEvents (ev : es) = mkStream $ \_ yld _ stp -> do
let rest = processEvents es
case ev of
ChildYield a -> yld a rest
ChildStop tid e -> do
accountThread sv tid
case e of
Nothing -> allDone stp
Just _ -> error "Bug: fromProducer: received exception"
{-# NOINLINE fromConsumer #-}
fromConsumer :: MonadAsync m => SVar Stream m a -> m Bool
fromConsumer sv = do
(list, _) <- liftIO $ readOutputQBasic (outputQueueFromConsumer sv)
processEvents $ reverse list
where
{-# INLINE processEvents #-}
processEvents [] = return False
processEvents (ev : _) = do
case ev of
ChildStop _ e -> do
case e of
Nothing -> return True
Just ex -> throwM ex
ChildYield _ -> error "Bug: fromConsumer: invalid ChildYield event"
{-# INLINE pushToFold #-}
pushToFold :: MonadAsync m => SVar Stream m a -> a -> m Bool
pushToFold sv a = do
let qref = outputQueueFromConsumer sv
done <- do
(_, n) <- liftIO $ readIORef qref
if (n > 0)
then fromConsumer sv
else return False
if done
then return True
else liftIO $ do
decrementBufferLimit sv
void $ send sv (ChildYield a)
return False