{-# LANGUAGE CPP                       #-}
{-# LANGUAGE ConstraintKinds           #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE FlexibleInstances         #-}
{-# LANGUAGE GeneralizedNewtypeDeriving#-}
{-# LANGUAGE InstanceSigs              #-}
{-# LANGUAGE MultiParamTypeClasses     #-}
{-# LANGUAGE UndecidableInstances      #-} -- XXX

-- |
-- Module      : Streamly.Streams.Ahead
-- Copyright   : (c) 2017 Harendra Kumar
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Streams.Ahead
    (
      AheadT
    , Ahead
    , aheadly
    , ahead
    )
where

import Control.Concurrent.MVar (putMVar, takeMVar)
import Control.Exception (assert)
import Control.Monad (ap, void, when)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
-- import Control.Monad.Error.Class   (MonadError(..))
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Heap (Heap, Entry(..))
import Data.IORef (IORef, readIORef, atomicModifyIORef, writeIORef)
import Data.Maybe (fromJust)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import GHC.Exts (inline)

import qualified Data.Heap as H

import Streamly.Streams.SVar (fromSVar)
import Streamly.Streams.Serial (map)
import Streamly.Internal.Data.SVar
import Streamly.Streams.StreamK
       (IsStream(..), Stream, mkStream, foldStream, foldStreamShared,
        foldStreamSVar)
import qualified Streamly.Streams.StreamK as K

import Prelude hiding (map)

#include "Instances.hs"

-------------------------------------------------------------------------------
-- Ahead
-------------------------------------------------------------------------------

-- Lookahead streams can execute multiple tasks concurrently, ahead of time,
-- but always serve them in the same order as they appear in the stream. To
-- implement lookahead streams efficiently we assign a sequence number to each
-- task when the task is picked up for execution. When the task finishes, the
-- output is tagged with the same sequence number and we rearrange the outputs
-- in sequence based on that number.
--
-- To explain the mechanism imagine that the current task at the head of the
-- stream has a "token" to yield to the outputQueue. The ownership of the token
-- is determined by the current sequence number is maintained in outputHeap.
-- Sequence number is assigned when a task is queued. When a thread dequeues a
-- task it picks up the sequence number as well and when the output is ready it
-- uses the sequence number to queue the output to the outputQueue.
--
-- The thread with current sequence number sends the output directly to the
-- outputQueue. Other threads push the output to the outputHeap. When the task
-- being queued on the heap is a stream of many elements we evaluate only the
-- first element and keep the rest of the unevaluated computation in the heap.
-- When such a task gets the "token" for outputQueue it evaluates and directly
-- yields all the elements to the outputQueue without checking for the
-- "token".
--
-- Note that no two outputs in the heap can have the same sequence numbers and
-- therefore we do not need a stable heap. We have also separated the buffer
-- for the current task (outputQueue) and the pending tasks (outputHeap) so
-- that the pending tasks cannot interfere with the current task. Note that for
-- a single task just the outputQueue is enough and for the case of many
-- threads just a heap is good enough. However we balance between these two
-- cases, so that both are efficient.
--
-- For bigger streams it may make sense to have separate buffers for each
-- stream. However, for singleton streams this may become inefficient. However,
-- if we do not have separate buffers, then the streams that come later in
-- sequence may hog the buffer, hindering the streams that are ahead. For this
-- reason we have a single element buffer limitation for the streams being
-- executed in advance.
--
-- This scheme works pretty efficiently with less than 40% extra overhead
-- compared to the Async streams where we do not have any kind of sequencing of
-- the outputs. It is especially devised so that we are most efficient when we
-- have short tasks and need just a single thread. Also when a thread yields
-- many items it can hold lockfree access to the outputQueue and do it
-- efficiently.
--
-- XXX Maybe we can start the ahead threads at a lower cpu and IO priority so
-- that they do not hog the resources and hinder the progress of the threads in
-- front of them.

-- Left associated ahead expressions are expensive. We start a new SVar for
-- each left associative expression. The queue is used only for right
-- associated expression, we queue the right expression and execute the left.
-- Thererefore the queue never has more than on item in it.
--
-- XXX Also note that limiting concurrency for cases like "take 10" would not
-- work well with left associative expressions, because we have no visibility
-- about how much the left side of the expression would yield.
--
-- XXX It may be a good idea to increment sequence numbers for each yield,
-- currently a stream on the left side of the expression may yield many
-- elements with the same sequene number. We can then use the seq number to
-- enforce yieldMax and yieldLImit as well.

-- Invariants:
--
-- * A worker should always ensure that it pushes all the consecutive items in
-- the heap to the outputQueue especially the items on behalf of the workers
-- that have already left when we were holding the token. This avoids deadlock
-- conditions when the later workers completion depends on the consumption of
-- earlier results. For more details see comments in the consumer pull side
-- code.

{-# INLINE underMaxHeap #-}
underMaxHeap ::
       SVar Stream m a
    -> Heap (Entry Int (AheadHeapEntry Stream m a))
    -> IO Bool
underMaxHeap sv hp = do
    (_, len) <- readIORef (outputQueue sv)

    -- XXX simplify this
    let maxHeap = case maxBufferLimit sv of
            Limited lim -> Limited $
                max 0 (lim - fromIntegral len)
            Unlimited -> Unlimited

    case maxHeap of
        Limited lim -> do
            active <- readIORef (workerCount sv)
            return $ H.size hp + active <= fromIntegral lim
        Unlimited -> return True

-- Return value:
-- True => stop
-- False => continue
preStopCheck ::
       SVar Stream m a
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)) , Maybe Int)
    -> IO Bool
preStopCheck sv heap =
    -- check the stop condition under a lock before actually
    -- stopping so that the whole herd does not stop at once.
    withIORef heap $ \(hp, _) -> do
        heapOk <- underMaxHeap sv hp
        takeMVar (workerStopMVar sv)
        let stop = do
                putMVar (workerStopMVar sv) ()
                return True
            continue = do
                putMVar (workerStopMVar sv) ()
                return False
        if heapOk
        then
            case yieldRateInfo sv of
                Nothing -> continue
                Just yinfo -> do
                    rateOk <- isBeyondMaxRate sv yinfo
                    if rateOk then continue else stop
        else stop

abortExecution ::
       IORef ([Stream m a], Int)
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> Stream m a
    -> IO ()
abortExecution q sv winfo m = do
    reEnqueueAhead sv q m
    incrementYieldLimit sv
    sendStop sv winfo

-- XXX In absence of a "noyield" primitive (i.e. do not pre-empt inside a
-- critical section) from GHC RTS, we have a difficult problem. Assume we have
-- a 100,000 threads producing output and queuing it to the heap for
-- sequencing. The heap can be drained only by one thread at a time, any thread
-- that finds that heap can be drained now, takes a lock and starts draining
-- it, however the thread may get prempted in the middle of it holding the
-- lock. Since that thread is holding the lock, the other threads cannot pick
-- up the draining task, therefore they proceed to picking up the next task to
-- execute. If the draining thread could yield voluntarily at a point where it
-- has released the lock, then the next threads could pick up the draining
-- instead of executing more tasks. When there are 100,000 threads the drainer
-- gets a cpu share to run only 1:100000 of the time. This makes the heap
-- accumulate a lot of output when we the buffer size is large.
--
-- The solutions to this problem are:
-- 1) make the other threads wait in a queue until the draining finishes
-- 2) make the other threads queue and go away if draining is in progress
--
-- In both cases we give the drainer a chance to run more often.
--
processHeap :: MonadIO m
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> AheadHeapEntry Stream m a
    -> Int
    -> Bool -- we are draining the heap before we stop
    -> m ()
processHeap q heap st sv winfo entry sno stopping = loopHeap sno entry

    where

    stopIfNeeded ent seqNo r = do
        stopIt <- liftIO $ preStopCheck sv heap
        if stopIt
        then liftIO $ do
            -- put the entry back in the heap and stop
            requeueOnHeapTop heap (Entry seqNo ent) seqNo
            sendStop sv winfo
        else runStreamWithYieldLimit True seqNo r

    loopHeap seqNo ent =
        case ent of
            AheadEntryNull -> nextHeap seqNo
            AheadEntryPure a -> do
                -- Use 'send' directly so that we do not account this in worker
                -- latency as this will not be the real latency.
                -- Don't stop the worker in this case as we are just
                -- transferring available results from heap to outputQueue.
                void $ liftIO $ send sv (ChildYield a)
                nextHeap seqNo
            AheadEntryStream r ->
                if stopping
                then stopIfNeeded ent seqNo r
                else runStreamWithYieldLimit True seqNo r

    nextHeap prevSeqNo = do
        res <- liftIO $ dequeueFromHeapSeq heap (prevSeqNo + 1)
        case res of
            Ready (Entry seqNo hent) -> loopHeap seqNo hent
            Clearing -> liftIO $ sendStop sv winfo
            Waiting _ ->
                if stopping
                then do
                    r <- liftIO $ preStopCheck sv heap
                    if r
                    then liftIO $ sendStop sv winfo
                    else processWorkQueue prevSeqNo
                else inline processWorkQueue prevSeqNo

    processWorkQueue prevSeqNo = do
        work <- dequeueAhead q
        case work of
            Nothing -> liftIO $ sendStop sv winfo
            Just (m, seqNo) -> do
                yieldLimitOk <- liftIO $ decrementYieldLimit sv
                if yieldLimitOk
                then
                    if seqNo == prevSeqNo + 1
                    then processWithToken q heap st sv winfo m seqNo
                    else processWithoutToken q heap st sv winfo m seqNo
                else liftIO $ abortExecution q sv winfo m

    -- We do not stop the worker on buffer full here as we want to proceed to
    -- nextHeap anyway so that we can clear any subsequent entries. We stop
    -- only in yield continuation where we may have a remaining stream to be
    -- pushed on the heap.
    singleStreamFromHeap seqNo a = do
        void $ liftIO $ sendYield sv winfo (ChildYield a)
        nextHeap seqNo

    -- XXX when we have an unfinished stream on the heap we cannot account all
    -- the yields of that stream until it finishes, so if we have picked up
    -- and executed more actions beyond that in the parent stream and put them
    -- on the heap then they would eat up some yield limit which is not
    -- correct, we will think that our yield limit is over even though we have
    -- to yield items from unfinished stream before them. For this reason, if
    -- there are pending items in the heap we drain them unconditionally
    -- without considering the yield limit.
    runStreamWithYieldLimit continue seqNo r = do
        _ <- liftIO $ decrementYieldLimit sv
        if continue -- see comment above -- && yieldLimitOk
        then do
            let stop = do
                  liftIO (incrementYieldLimit sv)
                  nextHeap seqNo
            foldStreamSVar sv st
                          (yieldStreamFromHeap seqNo)
                          (singleStreamFromHeap seqNo)
                          stop
                          r
        else liftIO $ do
            let ent = Entry seqNo (AheadEntryStream r)
            liftIO $ requeueOnHeapTop heap ent seqNo
            incrementYieldLimit sv
            sendStop sv winfo

    yieldStreamFromHeap seqNo a r = do
        continue <- liftIO $ sendYield sv winfo (ChildYield a)
        runStreamWithYieldLimit continue seqNo r

{-# NOINLINE drainHeap #-}
drainHeap :: MonadIO m
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> m ()
drainHeap q heap st sv winfo = do
    r <- liftIO $ dequeueFromHeap heap
    case r of
        Ready (Entry seqNo hent) ->
            processHeap q heap st sv winfo hent seqNo True
        _ -> liftIO $ sendStop sv winfo

data HeapStatus = HContinue | HStop

processWithoutToken :: MonadIO m
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> Stream m a
    -> Int
    -> m ()
processWithoutToken q heap st sv winfo m seqNo = do
    -- we have already decremented the yield limit for m
    let stop = do
            liftIO (incrementYieldLimit sv)
            -- If the stream stops without yielding anything, and we do not put
            -- anything on heap, but if heap was waiting for this seq number
            -- then it will keep waiting forever, because we are never going to
            -- put it on heap. So we have to put a null entry on heap even when
            -- we stop.
            toHeap AheadEntryNull

    foldStreamSVar sv st
        (\a r -> toHeap $ AheadEntryStream $ K.cons a r)
        (toHeap . AheadEntryPure)
        stop
        m

    where

    -- XXX to reduce contention each CPU can have its own heap
    toHeap ent = do
        -- Heap insertion is an expensive affair so we use a non CAS based
        -- modification, otherwise contention and retries can make a thread
        -- context switch and throw it behind other threads which come later in
        -- sequence.
        newHp <- liftIO $ atomicModifyIORef heap $ \(hp, snum) ->
            let hp' = H.insert (Entry seqNo ent) hp
            in assert (heapIsSane snum seqNo) ((hp', snum), hp')

        when (svarInspectMode sv) $
            liftIO $ do
                maxHp <- readIORef (maxHeapSize $ svarStats sv)
                when (H.size newHp > maxHp) $
                    writeIORef (maxHeapSize $ svarStats sv) (H.size newHp)

        heapOk <- liftIO $ underMaxHeap sv newHp
        let drainAndStop = drainHeap q heap st sv winfo
            mainLoop = workLoopAhead q heap st sv winfo
        status <-
            case yieldRateInfo sv of
                Nothing -> return HContinue
                Just yinfo ->
                    case winfo of
                        Just info -> do
                            rateOk <- liftIO $ workerRateControl sv yinfo info
                            if rateOk
                            then return HContinue
                            else return HStop
                        Nothing -> return HContinue

        if heapOk
        then
            case status of
                HContinue -> mainLoop
                HStop -> drainAndStop
        else drainAndStop

processWithToken :: MonadIO m
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> Stream m a
    -> Int
    -> m ()
processWithToken q heap st sv winfo action sno = do
    -- Note, we enter this function with yield limit already decremented
    -- XXX deduplicate stop in all invocations
    let stop = do
            liftIO (incrementYieldLimit sv)
            loopWithToken (sno + 1)

    foldStreamSVar sv st (yieldOutput sno) (singleOutput sno) stop action

    where

    singleOutput seqNo a = do
        continue <- liftIO $ sendYield sv winfo (ChildYield a)
        if continue
        then loopWithToken (seqNo + 1)
        else do
            liftIO $ updateHeapSeq heap (seqNo + 1)
            drainHeap q heap st sv winfo

    -- XXX use a wrapper function around stop so that we never miss
    -- incrementing the yield in a stop continuation. Essentiatlly all
    -- "unstream" calls in this function must increment yield limit on stop.
    yieldOutput seqNo a r = do
        continue <- liftIO $ sendYield sv winfo (ChildYield a)
        yieldLimitOk <- liftIO $ decrementYieldLimit sv
        if continue && yieldLimitOk
        then do
            let stop = do
                    liftIO (incrementYieldLimit sv)
                    loopWithToken (seqNo + 1)
            foldStreamSVar sv st
                          (yieldOutput seqNo)
                          (singleOutput seqNo)
                          stop
                          r
        else do
            let ent = Entry seqNo (AheadEntryStream r)
            liftIO $ requeueOnHeapTop heap ent seqNo
            liftIO $ incrementYieldLimit sv
            drainHeap q heap st sv winfo

    loopWithToken nextSeqNo = do
        work <- dequeueAhead q
        case work of
            Nothing -> do
                liftIO $ updateHeapSeq heap nextSeqNo
                workLoopAhead q heap st sv winfo

            Just (m, seqNo) -> do
                yieldLimitOk <- liftIO $ decrementYieldLimit sv
                let undo = liftIO $ do
                        updateHeapSeq heap nextSeqNo
                        reEnqueueAhead sv q m
                        incrementYieldLimit sv
                if yieldLimitOk
                then
                    if seqNo == nextSeqNo
                    then do
                        let stop = do
                                liftIO (incrementYieldLimit sv)
                                loopWithToken (seqNo + 1)
                        foldStreamSVar sv st
                                      (yieldOutput seqNo)
                                      (singleOutput seqNo)
                                      stop
                                      m
                    else
                        -- To avoid a race when another thread puts something
                        -- on the heap and goes away, the consumer will not get
                        -- a doorBell and we will not clear the heap before
                        -- executing the next action. If the consumer depends
                        -- on the output that is stuck in the heap then this
                        -- will result in a deadlock. So we always clear the
                        -- heap before executing the next action.
                        undo >> workLoopAhead q heap st sv winfo
                else undo >> drainHeap q heap st sv winfo

-- XXX the yield limit changes increased the performance overhead by 30-40%.
-- Just like AsyncT we can use an implementation without yeidlimit and even
-- without pacing code to keep the performance higher in the unlimited and
-- unpaced case.
--
-- XXX The yieldLimit stuff is pretty invasive. We can instead do it by using
-- three hooks, a pre-execute hook, a yield hook and a stop hook. In fact these
-- hooks can be used for a more general implementation to even check predicates
-- and not just yield limit.

-- XXX we can remove the sv parameter as it can be derived from st

workLoopAhead :: MonadIO m
    => IORef ([Stream m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
    -> State Stream m a
    -> SVar Stream m a
    -> Maybe WorkerInfo
    -> m ()
workLoopAhead q heap st sv winfo = do
        r <- liftIO $ dequeueFromHeap heap
        case r of
            Ready (Entry seqNo hent) ->
                processHeap q heap st sv winfo hent seqNo False
            Clearing -> liftIO $ sendStop sv winfo
            Waiting _ -> do
                -- Before we execute the next item from the work queue we check
                -- if we are beyond the yield limit. It is better to check the
                -- yield limit before we pick up the next item. Otherwise we
                -- may have already started more tasks even though we may have
                -- reached the yield limit.  We can avoid this by taking active
                -- workers into account, but that is not as reliable, because
                -- workers may go away without picking up work and yielding a
                -- value.
                --
                -- Rate control can be done either based on actual yields in
                -- the output queue or based on any yield either to the heap or
                -- to the output queue. In both cases we may have one issue or
                -- the other. We chose to do this based on actual yields to the
                -- output queue because it makes the code common to both async
                -- and ahead streams.
                --
                work <- dequeueAhead q
                case work of
                    Nothing -> liftIO $ sendStop sv winfo
                    Just (m, seqNo) -> do
                        yieldLimitOk <- liftIO $ decrementYieldLimit sv
                        if yieldLimitOk
                        then
                            if seqNo == 0
                            then processWithToken q heap st sv winfo m seqNo
                            else processWithoutToken q heap st sv winfo m seqNo
                        -- If some worker decremented the yield limit but then
                        -- did not yield anything and therefore incremented it
                        -- later, then if we did not requeue m here we may find
                        -- the work queue empty and therefore miss executing
                        -- the remaining action.
                        else liftIO $ abortExecution q sv winfo m

-------------------------------------------------------------------------------
-- WAhead
-------------------------------------------------------------------------------

-- XXX To be implemented. Use a linked queue like WAsync and put back the
-- remaining computation at the back of the queue instead of the heap, and
-- increment the sequence number.

-- The only difference between forkSVarAsync and this is that we run the left
-- computation without a shared SVar.
forkSVarAhead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
forkSVarAhead m1 m2 = mkStream $ \st stp sng yld -> do
        sv <- newAheadVar st (concurrently (toStream m1) (toStream m2))
                          workLoopAhead
        foldStream st stp sng yld (fromSVar sv)
    where
    concurrently ma mb = mkStream $ \st stp sng yld -> do
        liftIO $ enqueue (fromJust $ streamVar st) mb
        foldStream st stp sng yld ma

-- | Polymorphic version of the 'Semigroup' operation '<>' of 'AheadT'.
-- Merges two streams sequentially but with concurrent lookahead.
--
-- @since 0.3.0
{-# INLINE ahead #-}
ahead :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
ahead m1 m2 = mkStream $ \st stp sng yld ->
    case streamVar st of
        Just sv | svarStyle sv == AheadVar -> do
            liftIO $ enqueue sv (toStream m2)
            -- Always run the left side on a new SVar to avoid complexity in
            -- sequencing results. This means the left side cannot further
            -- split into more ahead computations on the same SVar.
            foldStream st stp sng yld m1
        _ -> foldStreamShared st stp sng yld (forkSVarAhead m1 m2)

-- | XXX we can implement it more efficienty by directly implementing instead
-- of combining streams using ahead.
{-# INLINE consMAhead #-}
{-# SPECIALIZE consMAhead :: IO a -> AheadT IO a -> AheadT IO a #-}
consMAhead :: MonadAsync m => m a -> AheadT m a -> AheadT m a
consMAhead m r = fromStream $ K.yieldM m `ahead` (toStream r)

------------------------------------------------------------------------------
-- AheadT
------------------------------------------------------------------------------

-- | The 'Semigroup' operation for 'AheadT' appends two streams. The combined
-- stream behaves like a single stream with the actions from the second stream
-- appended to the first stream. The combined stream is evaluated in the
-- speculative style.  This operation can be used to fold an infinite lazy
-- container of streams.
--
-- @
-- import "Streamly"
-- import qualified "Streamly.Prelude" as S
-- import Control.Concurrent
--
-- main = do
--  xs \<- S.'toList' . 'aheadly' $ (p 1 |: p 2 |: nil) <> (p 3 |: p 4 |: nil)
--  print xs
--  where p n = threadDelay 1000000 >> return n
-- @
-- @
-- [1,2,3,4]
-- @
--
-- Any exceptions generated by a constituent stream are propagated to the
-- output stream.
--
-- The monad instance of 'AheadT' may run each monadic continuation (bind)
-- concurrently in a speculative manner, performing side effects in a partially
-- ordered manner but producing the outputs in an ordered manner like
-- 'SerialT'.
--
-- @
-- main = S.drain . 'aheadly' $ do
--     n <- return 3 \<\> return 2 \<\> return 1
--     S.yieldM $ do
--          threadDelay (n * 1000000)
--          myThreadId >>= \\tid -> putStrLn (show tid ++ ": Delay " ++ show n)
-- @
-- @
-- ThreadId 40: Delay 1
-- ThreadId 39: Delay 2
-- ThreadId 38: Delay 3
-- @
--
-- @since 0.3.0
newtype AheadT m a = AheadT {getAheadT :: Stream m a}
    deriving (MonadTrans)

-- | A serial IO stream of elements of type @a@ with concurrent lookahead.  See
-- 'AheadT' documentation for more details.
--
-- @since 0.3.0
type Ahead = AheadT IO

-- | Fix the type of a polymorphic stream as 'AheadT'.
--
-- @since 0.3.0
aheadly :: IsStream t => AheadT m a -> t m a
aheadly = K.adapt

instance IsStream AheadT where
    toStream = getAheadT
    fromStream = AheadT
    consM = consMAhead
    (|:) = consMAhead

------------------------------------------------------------------------------
-- Semigroup
------------------------------------------------------------------------------

{-# INLINE mappendAhead #-}
{-# SPECIALIZE mappendAhead :: AheadT IO a -> AheadT IO a -> AheadT IO a #-}
mappendAhead :: MonadAsync m => AheadT m a -> AheadT m a -> AheadT m a
mappendAhead m1 m2 = fromStream $ ahead (toStream m1) (toStream m2)

instance MonadAsync m => Semigroup (AheadT m a) where
    (<>) = mappendAhead

------------------------------------------------------------------------------
-- Monoid
------------------------------------------------------------------------------

instance MonadAsync m => Monoid (AheadT m a) where
    mempty = K.nil
    mappend = (<>)

------------------------------------------------------------------------------
-- Monad
------------------------------------------------------------------------------

{-# INLINE concatMapAhead #-}
{-# SPECIALIZE concatMapAhead :: (a -> AheadT IO b) -> AheadT IO a -> AheadT IO b #-}
concatMapAhead :: MonadAsync m => (a -> AheadT m b) -> AheadT m a -> AheadT m b
concatMapAhead f m = fromStream $
    K.concatMapBy ahead (\a -> K.adapt $ f a) (K.adapt m)

instance MonadAsync m => Monad (AheadT m) where
    return = pure
    {-# INLINE (>>=) #-}
    (>>=) = flip concatMapAhead

instance (Monad m, MonadAsync m) => Applicative (AheadT m) where
    pure = AheadT . K.yield
    {-# INLINE (<*>) #-}
    (<*>) = ap

------------------------------------------------------------------------------
-- Other instances
------------------------------------------------------------------------------

MONAD_COMMON_INSTANCES(AheadT, MONADPARALLEL)