module Streamly.Internal.Data.Fold.Async
(
takeInterval
, intervalsOf
)
where
import Control.Concurrent (threadDelay, forkIO, killThread)
import Control.Concurrent.MVar (MVar, newMVar, swapMVar, readMVar)
import Control.Exception (SomeException(..), catch, mask)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(..))
import Streamly.Internal.Control.Concurrent (MonadAsync, withRunInIO)
import Streamly.Internal.Data.Tuple.Strict (Tuple3'(..))
import Streamly.Internal.Data.Fold.Type
{-# INLINE takeInterval #-}
takeInterval :: MonadAsync m => Double -> Fold m a b -> Fold m a b
takeInterval :: forall (m :: * -> *) a b.
MonadAsync m =>
Double -> Fold m a b -> Fold m a b
takeInterval Double
n (Fold s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
done) = forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold Tuple3' s (MVar Bool) ThreadId
-> a -> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
step' m (Step (Tuple3' s (MVar Bool) ThreadId) b)
initial' forall {b} {c}. Tuple3' s b c -> m b
done'
where
initial' :: m (Step (Tuple3' s (MVar Bool) ThreadId) b)
initial' = do
Step s b
res <- m (Step s b)
initial
case Step s b
res of
Partial s
s -> do
MVar Bool
mv <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (MVar a)
newMVar Bool
False
ThreadId
t <-
forall (m :: * -> *) b.
MonadRunInIO m =>
((forall a. m a -> IO (StM m a)) -> IO (StM m b)) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO (StM m a)
run ->
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
ThreadId
tid <-
IO () -> IO ThreadId
forkIO
forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
(forall a. IO a -> IO a
restore forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. m a -> IO (StM m a)
run (forall {m :: * -> *}. MonadIO m => MVar Bool -> m ()
timerThread MVar Bool
mv))
(MVar Bool -> SomeException -> IO ()
handleChildException MVar Bool
mv)
forall a. m a -> IO (StM m a)
run (forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
tid)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
Partial forall a b. (a -> b) -> a -> b
$ forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' s
s MVar Bool
mv ThreadId
t
Done b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. b -> Step s b
Done b
b
step' :: Tuple3' s (MVar Bool) ThreadId
-> a -> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
step' (Tuple3' s
s MVar Bool
mv ThreadId
t) a
a = do
Bool
val <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
readMVar MVar Bool
mv
if Bool
val
then do
Step s b
res <- s -> a -> m (Step s b)
step s
s a
a
case Step s b
res of
Partial s
sres -> forall s b. b -> Step s b
Done forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> m b
done s
sres
Done b
bres -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. b -> Step s b
Done b
bres
else do
Step s b
res <- s -> a -> m (Step s b)
step s
s a
a
case Step s b
res of
Partial s
fs -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
Partial forall a b. (a -> b) -> a -> b
$ forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' s
fs MVar Bool
mv ThreadId
t
Done b
b -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ThreadId -> IO ()
killThread ThreadId
t) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return (forall s b. b -> Step s b
Done b
b)
done' :: Tuple3' s b c -> m b
done' (Tuple3' s
s b
_ c
_) = s -> m b
done s
s
timerThread :: MVar Bool -> m ()
timerThread MVar Bool
mv = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
n forall a. Num a => a -> a -> a
* Double
1000000)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO a
swapMVar MVar Bool
mv Bool
True
handleChildException :: MVar Bool -> SomeException -> IO ()
handleChildException :: MVar Bool -> SomeException -> IO ()
handleChildException MVar Bool
mv SomeException
_ = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO a
swapMVar MVar Bool
mv Bool
True
{-# INLINE intervalsOf #-}
intervalsOf :: MonadAsync m => Double -> Fold m a b -> Fold m b c -> Fold m a c
intervalsOf :: forall (m :: * -> *) a b c.
MonadAsync m =>
Double -> Fold m a b -> Fold m b c -> Fold m a c
intervalsOf Double
n Fold m a b
split = forall (m :: * -> *) a b c.
Monad m =>
Fold m a b -> Fold m b c -> Fold m a c
many (forall (m :: * -> *) a b.
MonadAsync m =>
Double -> Fold m a b -> Fold m a b
takeInterval Double
n Fold m a b
split)