-- |
-- Module      : Streamly.Internal.Data.Fold.Async
-- Copyright   : (c) 2019 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
module Streamly.Internal.Data.Fold.Async
    (
    -- * Trimming
      takeInterval

    -- * Splitting
    , intervalsOf
    )
where

import Control.Concurrent (threadDelay, forkIO, killThread)
import Control.Concurrent.MVar (MVar, newMVar, swapMVar, readMVar)
import Control.Exception (SomeException(..), catch, mask)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(..))
import Streamly.Internal.Control.Concurrent (MonadAsync, withRunInIO)
import Streamly.Internal.Data.Tuple.Strict (Tuple3'(..))

import Streamly.Internal.Data.Fold.Type

-- $setup
-- >>> :m
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Fold.Async as Fold

-- XXX We can use asyncClock here. A parser can be used to return an input that
-- arrives after the timeout.
-- XXX If n is 0 return immediately in initial.
-- XXX we should probably discard the input received after the timeout like
-- takeEndBy_.
--
-- | @takeInterval n fold@ uses @fold@ to fold the input items arriving within
-- a window of first @n@ seconds.
--
-- >>> input = Stream.delay 0.1 $ Stream.fromList [1..]
-- >>> Stream.fold (Fold.takeInterval 1.0 Fold.toList) input
-- [1,2,3,4,5,6,7,8,9,10,11]
--
-- Stops when @fold@ stops or when the timeout occurs. Note that the fold needs
-- an input after the timeout to stop. For example, if no input is pushed to
-- the fold until one hour after the timeout had occurred, then the fold will
-- be done only after consuming that input.
--
-- /Pre-release/
--
{-# INLINE takeInterval #-}
takeInterval :: MonadAsync m => Double -> Fold m a b -> Fold m a b
takeInterval :: forall (m :: * -> *) a b.
MonadAsync m =>
Double -> Fold m a b -> Fold m a b
takeInterval Double
n (Fold s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
done) = forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold Tuple3' s (MVar Bool) ThreadId
-> a -> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
step' m (Step (Tuple3' s (MVar Bool) ThreadId) b)
initial' forall {b} {c}. Tuple3' s b c -> m b
done'

    where

    initial' :: m (Step (Tuple3' s (MVar Bool) ThreadId) b)
initial' = do
        Step s b
res <- m (Step s b)
initial
        case Step s b
res of
            Partial s
s -> do
                MVar Bool
mv <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (MVar a)
newMVar Bool
False
                ThreadId
t <-
                    forall (m :: * -> *) b.
MonadRunInIO m =>
((forall a. m a -> IO (StM m a)) -> IO (StM m b)) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO (StM m a)
run ->
                        forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
                            ThreadId
tid <-
                                IO () -> IO ThreadId
forkIO
                                  forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
                                        (forall a. IO a -> IO a
restore forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. m a -> IO (StM m a)
run (forall {m :: * -> *}. MonadIO m => MVar Bool -> m ()
timerThread MVar Bool
mv))
                                        (MVar Bool -> SomeException -> IO ()
handleChildException MVar Bool
mv)
                            forall a. m a -> IO (StM m a)
run (forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
tid)
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
Partial forall a b. (a -> b) -> a -> b
$ forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' s
s MVar Bool
mv ThreadId
t
            Done b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. b -> Step s b
Done b
b

    step' :: Tuple3' s (MVar Bool) ThreadId
-> a -> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
step' (Tuple3' s
s MVar Bool
mv ThreadId
t) a
a = do
        Bool
val <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
readMVar MVar Bool
mv
        if Bool
val
        then do
            Step s b
res <- s -> a -> m (Step s b)
step s
s a
a
            case Step s b
res of
                Partial s
sres -> forall s b. b -> Step s b
Done forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> m b
done s
sres
                Done b
bres -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. b -> Step s b
Done b
bres
        else do
            Step s b
res <- s -> a -> m (Step s b)
step s
s a
a
            case Step s b
res of
                Partial s
fs -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
Partial forall a b. (a -> b) -> a -> b
$ forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' s
fs MVar Bool
mv ThreadId
t
                Done b
b -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ThreadId -> IO ()
killThread ThreadId
t) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return (forall s b. b -> Step s b
Done b
b)

    done' :: Tuple3' s b c -> m b
done' (Tuple3' s
s b
_ c
_) = s -> m b
done s
s

    timerThread :: MVar Bool -> m ()
timerThread MVar Bool
mv = do
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
n forall a. Num a => a -> a -> a
* Double
1000000)
        -- Use IORef + CAS? instead of MVar since its a Bool?
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO a
swapMVar MVar Bool
mv Bool
True

    handleChildException :: MVar Bool -> SomeException -> IO ()
    handleChildException :: MVar Bool -> SomeException -> IO ()
handleChildException MVar Bool
mv SomeException
_ = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO a
swapMVar MVar Bool
mv Bool
True

-- For example, we can copy and distribute a stream to multiple folds where
-- each fold can group the input differently e.g. by one second, one minute and
-- one hour windows respectively and fold each resulting stream of folds.

-- | Group the input stream into windows of n second each using the first fold
-- and then fold the resulting groups using the second fold.
--
-- >>> intervals = Fold.intervalsOf 0.5 Fold.toList Fold.toList
-- >>> Stream.fold intervals $ Stream.delay 0.2 $ Stream.fromList [1..10]
-- [[1,2,3,4],[5,6,7],[8,9,10]]
--
-- > intervalsOf n split = many (takeInterval n split)
--
-- /Pre-release/
--
{-# INLINE intervalsOf #-}
intervalsOf :: MonadAsync m => Double -> Fold m a b -> Fold m b c -> Fold m a c
intervalsOf :: forall (m :: * -> *) a b c.
MonadAsync m =>
Double -> Fold m a b -> Fold m b c -> Fold m a c
intervalsOf Double
n Fold m a b
split = forall (m :: * -> *) a b c.
Monad m =>
Fold m a b -> Fold m b c -> Fold m a c
many (forall (m :: * -> *) a b.
MonadAsync m =>
Double -> Fold m a b -> Fold m a b
takeInterval Double
n Fold m a b
split)