{-# LANGUAGE BangPatterns , DeriveDataTypeable, CPP #-}
module Control.Concurrent.Chan.Unagi.Internal
#ifdef NOT_x86
    {-# WARNING "This library is unlikely to perform well on architectures without a fetch-and-add instruction" #-}
#endif
    (sEGMENT_LENGTH
    , InChan(..), OutChan(..), ChanEnd(..), StreamSegment, Cell(..), Stream(..)
    , NextSegment(..), StreamHead(..)
    , newChanStarting, writeChan, readChan, readChanOnException
    , dupChan, tryReadChan
    -- For Unagi.NoBlocking:
    , moveToNextCell, waitingAdvanceStream, newSegmentSource
    )
    where

-- Internals exposed for testing.

import Control.Concurrent.MVar
import Data.IORef
import Control.Exception
import Control.Monad.Primitive(RealWorld)
import Data.Atomics.Counter.Fat
import Data.Atomics
import qualified Data.Primitive as P
import Control.Monad
import Control.Applicative
import Data.Bits
import Data.Typeable(Typeable)
import GHC.Exts(inline)

import Control.Concurrent.Chan.Unagi.Constants
import qualified Control.Concurrent.Chan.Unagi.NoBlocking.Types as UT
import Utilities(touchIORef)

import Prelude

-- | The write end of a channel created with 'newChan'.
data InChan a = InChan !(Ticket (Cell a)) !(ChanEnd (Cell a))
    deriving Typeable

instance Eq (InChan a) where
    (InChan _ (ChanEnd _ _ headA)) == (InChan _ (ChanEnd _ _ headB))
        = headA == headB

-- | The read end of a channel created with 'newChan'.
newtype OutChan a = OutChan (ChanEnd (Cell a))
    deriving (Eq,Typeable)


-- TODO POTENTIAL CPP FLAGS (or functions)
--   - Strict element (or lazy? maybe also expose a writeChan' when relevant?)
--   - sEGMENT_LENGTH
--   - reads that clear the element immediately (or export as a special function?)

-- InChan & OutChan are mostly identical, sharing a stream, but with
-- independent counters.
--
--   NOTE: we parameterize this, and its child types, by `cell_a` (instantiated
--   to `Cell a` in this module) instead of `a` so that we can use
--   `moveToNextCell`, `waitingAdvanceStream`, and `newSegmentSource` and all
--   the types below in Unagi.NoBlocking, which uses a different type `Cell a`;
--   Sorry!
data ChanEnd cell_a = 
           -- an efficient producer of segments of length sEGMENT_LENGTH:
    ChanEnd !(SegSource cell_a)
            -- Both Chan ends must start with the same counter value.
            !AtomicCounter 
            -- the stream head; this must never point to a segment whose offset
            -- is greater than the counter value
            !(IORef (StreamHead cell_a))
    deriving Typeable

instance Eq (ChanEnd a) where
     (ChanEnd _ _ headA) == (ChanEnd _ _ headB)
        = headA == headB


data StreamHead cell_a = StreamHead !Int !(Stream cell_a)

--TODO later see if we get a benefit from the small array primops in 7.10,
--     which omit card-marking overhead and might have faster clone.
type StreamSegment cell_a = P.MutableArray RealWorld cell_a

-- TRANSITIONS and POSSIBLE VALUES:
--   During Read:
--     Empty   -> Blocking
--     Written
--     Blocking (only when dupChan used)
--   During Write:
--     Empty   -> Written 
--     Blocking
data Cell a = Empty | Written a | Blocking !(MVar a)


-- NOTE In general we'll have two segments allocated at any given time in
-- addition to the segment template, so in the worst case, when the program
-- exits we will have allocated ~ 3 segments extra memory than was actually
-- required.

data Stream cell_a = 
    Stream !(StreamSegment cell_a)
           -- The next segment in the stream; new segments are allocated and
           -- put here as we go, with threads cooperating to allocate new
           -- segments:
           !(IORef (NextSegment cell_a))

data NextSegment cell_a = NoSegment | Next !(Stream cell_a)

-- we expose `startingCellOffset` for debugging correct behavior with overflow:
newChanStarting :: Int -> IO (InChan a, OutChan a)
{-# INLINE newChanStarting #-}
newChanStarting !startingCellOffset = do
    segSource <- newSegmentSource Empty
    firstSeg <- segSource
    -- collect a ticket to save for writer CAS
    savedEmptyTkt <- readArrayElem firstSeg 0
    stream <- Stream firstSeg <$> newIORef NoSegment
    let end = ChanEnd segSource 
                  <$> newCounter startingCellOffset
                  <*> newIORef (StreamHead startingCellOffset stream)
    liftA2 (,) (InChan savedEmptyTkt <$> end) (OutChan <$> end)

-- | Duplicate a chan: the returned @OutChan@ begins empty, but data written to
-- the argument @InChan@ from then on will be available from both the original
-- @OutChan@ and the one returned here, creating a kind of broadcast channel.
dupChan :: InChan a -> IO (OutChan a)
{-# INLINE dupChan #-}
dupChan (InChan _ (ChanEnd segSource counter streamHead)) = do
    hLoc <- readIORef streamHead
    loadLoadBarrier  -- NOTE [1]
    wCount <- readCounter counter
    
    counter' <- newCounter wCount 
    streamHead' <- newIORef hLoc
    return $ OutChan (ChanEnd segSource counter' streamHead')
  -- [1] We must read the streamHead before inspecting the counter; otherwise,
  -- as writers write, the stream head pointer may advance past the cell
  -- indicated by wCount. For the corresponding store-store barrier see [*] in
  -- moveToNextCell

-- | Write a value to the channel.
writeChan :: InChan a -> a -> IO ()
{-# INLINE writeChan #-}
writeChan (InChan savedEmptyTkt ce@(ChanEnd segSource _ _)) = \a-> mask_ $ do 
    (segIx, (Stream seg next), maybeUpdateStreamHead) <- moveToNextCell ce
    maybeUpdateStreamHead
    (success,nonEmptyTkt) <- casArrayElem seg segIx savedEmptyTkt (Written a)
    -- try to pre-allocate next segment; NOTE [1]
    when (segIx == 0) $ void $
      waitingAdvanceStream next segSource 0
    when (not success) $
        case peekTicket nonEmptyTkt of
             Blocking v -> putMVar v a
             Empty      -> error "Stored Empty Ticket went stale!" -- NOTE [2]
             Written _  -> error "Nearly Impossible! Expected Blocking"
  -- [1] the writer which arrives first to the first cell of a new segment is
  -- tasked (somewhat arbitrarily) with trying to pre-allocate the *next*
  -- segment hopefully ahead of any readers or writers who might need it. This
  -- will race with any reader *or* writer that tries to read the next segment
  -- and finds it's empty (see `waitingAdvanceStream`); when this wins
  -- (hopefully the vast majority of the time) we avoid a throughput hit.
  --
  -- [2] this assumes that the compiler is statically-allocating Empty, sharing
  -- the constructor among all uses, and that it never moves it between
  -- checking the pointer stored in the array and checking the pointer in the
  -- cached Any Empty value. If this is incorrect then the Ticket approach to
  -- CAS is equally incorrect (though maybe less likely to fail).


-- We would like our queue to behave like Chan in that an async exception
-- raised in a reader known to be blocked or about to block on an empty queue
-- never results in a lost message; this matches our simple intuition about the
-- mechanics of a queue: if you're last in line for a cupcake and have to leave
-- you wouldn't expect the cake that would have gone to you to disappear.
--
-- But there are other systems that a busy cake shop might use that you could
-- easily imagine resulting in a lost cake, and that's a different question
-- from whether cakes are given to well-behaved customers in the order they
-- came out of the oven, or whether a customer leaving at the wrong moment
-- might cause the cake shop to burn down...
readSegIxUnmasked :: (IO a -> IO a) -> (Int, Stream (Cell a), IO ()) -> IO a
{-# INLINE readSegIxUnmasked #-}
readSegIxUnmasked h =
  \(segIx, (Stream seg _), maybeUpdateStreamHead)-> do
    maybeUpdateStreamHead
    cellTkt <- readArrayElem seg segIx
    case peekTicket cellTkt of
         Written a -> return a
         Empty -> do
            v <- newEmptyMVar
            (success,elseWrittenCell) <- casArrayElem seg segIx cellTkt (Blocking v)
            if success 
              then readBlocking v
              else case peekTicket elseWrittenCell of
                        -- In the meantime a writer has written. Good!
                        Written a -> return a
                        -- ...or a dupChan reader initiated blocking:
                        Blocking v2 -> readBlocking v2
                        _ -> error "Impossible! Expecting Written or Blocking"
         Blocking v -> readBlocking v
  -- N.B. must use `readMVar` here to support `dupChan`:
  where readBlocking v = inline h $ readMVar v 


-- | Returns immediately with:
--
--  - an @'UT.Element' a@ future, which returns one unique element when it
--  becomes available via 'UT.tryRead'.
--
--  - a blocking @IO@ action that returns the element when it becomes available.
--
-- If you're using this function exclusively you might find the implementation
-- in "Control.Concurrent.Chan.Unagi.NoBlocking" is faster.
--
-- /Note re. exceptions/: When an async exception is raised during a @tryReadChan@ 
-- the message that the read would have returned is likely to be lost, just as
-- it would be when raised directly after this function returns.
tryReadChan :: OutChan a -> IO (UT.Element a, IO a)
{-# INLINE tryReadChan #-}
tryReadChan (OutChan ce) = do -- no masking needed
    segStuff@(segIx, (Stream seg _), maybeUpdateStreamHead) <- moveToNextCell ce
    maybeUpdateStreamHead
    return ( 
        UT.Element $ do
          cell <- P.readArray seg segIx
          case cell of
               Written a  -> return $ Just a
               Empty      -> return Nothing
               Blocking v -> tryReadMVar v

      , readSegIxUnmasked id segStuff 
      )


-- | Read an element from the chan, blocking if the chan is empty.
--
-- /Note re. exceptions/: When an async exception is raised during a @readChan@ 
-- the message that the read would have returned is likely to be lost, even when
-- the read is known to be blocked on an empty queue. If you need to handle
-- this scenario, you can use 'readChanOnException'.
readChan :: OutChan a -> IO a
{-# INLINE readChan #-}
readChan = \(OutChan ce)-> moveToNextCell ce >>= readSegIxUnmasked id

-- | Like 'readChan' but allows recovery of the queue element which would have
-- been read, in the case that an async exception is raised during the read. To
-- be precise exceptions are raised, and the handler run, only when
-- @readChanOnException@ is blocking.
--
-- The second argument is a handler that takes a blocking IO action returning
-- the element, and performs some recovery action.  When the handler is called,
-- the passed @IO a@ is the only way to access the element.
readChanOnException :: OutChan a -> (IO a -> IO ()) -> IO a
{-# INLINE readChanOnException #-}
readChanOnException (OutChan ce) h = mask_ ( 
    moveToNextCell ce >>= 
      readSegIxUnmasked (\io-> io `onException` (h io)) )


------------ NOTE: ALL CODE BELOW IS RE-USED IN Unagi.NoBlocking --------------


-- increments counter, finds stream segment of corresponding cell (updating the
-- stream head pointer as needed), and returns the stream segment and relative
-- index of our cell.
moveToNextCell :: ChanEnd cell_a -> IO (Int, Stream cell_a, IO ())
{-# INLINE moveToNextCell #-}
moveToNextCell (ChanEnd segSource counter streamHead) = do
    (StreamHead offset0 str0) <- readIORef streamHead
    -- NOTE [3/4]
    ix <- incrCounter 1 counter  -- [*]
    let (segsAway, segIx) = assert ((ix - offset0) >= 0) $ 
                 divMod_sEGMENT_LENGTH $! (ix - offset0)
              -- (ix - offset0) `quotRem` sEGMENT_LENGTH
        {-# INLINE go #-}
        go 0 str = return str
        go !n (Stream _ next) =
            waitingAdvanceStream next segSource (nEW_SEGMENT_WAIT*segIx) -- NOTE [1]
              >>= go (n-1)
    str <- go segsAway str0
    -- In Unagi.NoBlocking we need to control when this is run (see also [5]):
    let !maybeUpdateStreamHead = do
          when (segsAway > 0) $ do
            let !offsetN = 
                  offset0 + (segsAway `unsafeShiftL` lOG_SEGMENT_LENGTH) --(segsAway*sEGMENT_LENGTH)
            writeIORef streamHead $ StreamHead offsetN str
          touchIORef streamHead -- NOTE [5]
    return (segIx,str, maybeUpdateStreamHead)
  -- [1] All readers or writers needing to work with a not-yet-created segment
  -- race to create it, but those past index 0 have progressively long waits; 20
  -- is chosen as 20 readIORefs should be more than enough time for writer/reader
  -- 0 to add the new segment (if it's not descheduled).
  --
  -- [2] advancing the stream head pointer on segIx == sEGMENT_LENGTH - 1 would
  -- be more correct, but this is simpler here. This may move the head pointer
  -- BACKWARDS if the thread was descheduled, but that's not a correctness
  -- issue.
  --
  -- [3] There is a theoretical race condition here: thread reads head and is
  -- descheduled, meanwhile other readers/writers increment counter one full
  -- lap; when we increment we think we've found our cell in what is actually a
  -- very old segment. However in this scenario all addressable memory will
  -- have been consumed just by the array pointers which haven't been able to
  -- be GC'd. So I don't think this is something to worry about.
  --
  -- [4] We must ensure the read above doesn't move ahead of our incrCounter
  -- below. But fetchAddByteArrayInt is meant to be a full barrier (for
  -- compiler and processor) across architectures, so no explicit barrier is
  -- needed here.
  --
  -- [[5]] FOR Unagi.NoBlocking: This helps ensure that our (possibly last) use
  -- of streamHead occurs after our (possibly last) write, for correctness of
  -- 'isActive'.  See NOTE 1 of 'Unagi.NoBlocking.writeChan'


-- thread-safely try to fill `nextSegRef` at the next offset with a new
-- segment, waiting some number of iterations (for other threads to handle it).
-- Returns nextSegRef's StreamSegment.
waitingAdvanceStream :: IORef (NextSegment cell_a) -> SegSource cell_a 
                     -> Int -> IO (Stream cell_a)
{-# NOINLINE waitingAdvanceStream #-}
waitingAdvanceStream nextSegRef segSource = go where
  go !wait = assert (wait >= 0) $ do
    tk <- readForCAS nextSegRef
    case peekTicket tk of
         NoSegment 
           | wait > 0 -> go (wait - 1)
             -- Create a potential next segment and try to insert it:
           | otherwise -> do 
               potentialStrNext <- Stream <$> segSource 
                                          <*> newIORef NoSegment
               (_,tkDone) <- casIORef nextSegRef tk (Next potentialStrNext)
               -- If that failed another thread succeeded (no false negatives)
               case peekTicket tkDone of
                 Next strNext -> return strNext
                 _ -> error "Impossible! This should only have been Next segment"
         Next strNext -> return strNext


-- copying a template array with cloneMutableArray is much faster than creating
-- a new one; in fact it seems we need this in order to scale, since as cores
-- increase we don't have enough "runway" and can't allocate fast enough:
type SegSource cell_a = IO (StreamSegment cell_a)

newSegmentSource :: cell_a -> IO (SegSource cell_a)
newSegmentSource cell_empty = do
    -- NOTE: evaluate Empty seems to be required here in order to not raise
    -- "Stored Empty Ticket went stale!"  exception when in GHCi.
    arr <- evaluate cell_empty >>= P.newArray sEGMENT_LENGTH
    return (P.cloneMutableArray arr 0 sEGMENT_LENGTH)

-- ----------
-- CELLS AND GC:
--
--   Each cell in a segment is assigned at most one reader and one writer
--
--   When all readers disappear and writers continue we'll have at most one
--   segment-worth of garbage that can't be collected at a time; when writers
--   advance the head segment pointer, the previous may be GC'd.
--
--   Readers blocked indefinitely should eventually raise a
--   BlockedIndefinitelyOnMVar.
-- ----------