{-# LANGUAGE BangPatterns , DeriveDataTypeable, CPP #-}
module Control.Concurrent.Chan.Unagi.NoBlocking.Unboxed.Internal
#ifdef NOT_x86
    {-# WARNING "This library is unlikely to perform well on architectures without a fetch-and-add instruction" #-}
#endif
    (sEGMENT_LENGTH
    , InChan(..), OutChan(..), ChanEnd(..), Cell, Stream(..)
    , NextSegment(..), StreamHead(..)
    , newChanStarting, writeChan, tryReadChan, readChan, UT.Element(..)
    , dupChan
    , streamChan
    , isActive
    )
    where

-- Forked from src/Control/Concurrent/Chan/Unagi/NoBlocking/Internal.hs at
-- 9e2306330e with some code copied and modified from Unagi.Unboxed.
--
-- The main motivation for this variant is that it lets us take full advantage
-- of the atomicUnicorn trick, so in both read and write we need only use
-- sigArr when the value to be written == atomicUnicorn.
--
-- Some detailed NOTEs present in Control.Concurrent.Chan.Unagi.Unboxed have
-- been removed here although they still pertain. If you intend to work on this
-- module, please be sure you're familiar with those concerns.

import Data.IORef
import Control.Exception
import Data.Atomics.Counter.Fat
import Data.Atomics
import qualified Data.Primitive as P
import Control.Monad
import Control.Applicative
import Data.Bits
import Data.Typeable(Typeable)
import Data.Maybe

import Control.Concurrent.Chan.Unagi.Constants

import Prelude

-- We can re-use much of the Unagi.Unboxed implementation here, and some of
-- Unagi.NoBlocking (at least our types, which is important):
import Control.Concurrent.Chan.Unagi.Unboxed.Internal(
          ChanEnd(..), StreamHead(..), Cell, Stream(..)
        , NextSegment(..), moveToNextCell, waitingAdvanceStream, segSource
        , cellEmpty, readElementArray, writeElementArray
        , SignalIntArray, ElementArray, UnagiPrim(..))
import qualified Control.Concurrent.Chan.Unagi.NoBlocking.Types as UT


-- | The write end of a channel created with 'newChan'.
data InChan a = InChan !(IORef Bool) -- Used for creating an OutChan in dupChan
                       !(ChanEnd a)
    deriving (Typeable)

-- | The read end of a channel created with 'newChan'.
data OutChan a = OutChan !(IORef Bool) -- Is corresponding InChan still alive?
                         !(ChanEnd a)
    deriving (Typeable)

instance Eq (InChan a) where
    (InChan _ (ChanEnd _ headA)) == (InChan _ (ChanEnd _ headB))
        = headA == headB
instance Eq (OutChan a) where
    (OutChan _ (ChanEnd _ headA)) == (OutChan _ (ChanEnd _ headB))
        = headA == headB


newChanStarting :: (UnagiPrim a)=> Int -> IO (InChan a, OutChan a)
{-# INLINE newChanStarting #-}
newChanStarting !startingCellOffset = do
    let undefinedNewIndexedMVar = return $ -- NOTE [1]
          error "Unagi.NoBlocking.Unboxed tried to use initial fake IndexedMVar"
    stream <- uncurry Stream <$> segSource
                             <*> undefinedNewIndexedMVar
                             <*> newIORef NoSegment
    let end = ChanEnd
                  <$> newCounter startingCellOffset
                  <*> newIORef (StreamHead startingCellOffset stream)
    inEnd@(ChanEnd _ inHeadRef) <- end
    finalizee <- newIORef True
    void $ mkWeakIORef inHeadRef $ do
        writeBarrier
        writeIORef finalizee False
    (,) (InChan finalizee inEnd) <$> (OutChan finalizee <$> end)
  -- [1] We reuse most of Unagi.Unboxed's internals here, but unfortunately
  -- that implementation uses a Stream type with an IndexedMVar to coordinate
  -- blocking reads. Rather than do a lot of refactoring of Unagi.Unboxed, for
  -- now we just fake it here. Unagi.Unboxed.waitingAdvanceStream will actually
  -- create new IndexedMVars for each segment, but we hope at worst that they
  -- will be GC'd immediately even when many segments-worth of elements are in
  -- the queue; the main concern is not to accumulate lots of mutable boxed
  -- objects. TODO better later, maybe.


-- | An action that returns @False@ sometime after the chan no longer has any
-- writers.
--
-- After @False@ is returned, any 'UT.tryRead' which returns @Nothing@ can
-- be considered to be dead. Likewise for 'UT.tryReadNext'. Note that in the
-- blocking implementations a @BlockedIndefinitelyOnMVar@ exception is raised,
-- so this function is unnecessary.
isActive :: OutChan a -> IO Bool
isActive (OutChan finalizee _) = do
    b <- readIORef finalizee
    -- make sure that a tryRead that follows is not moved ahead:
    loadLoadBarrier
    return b


-- | Duplicate a chan: the returned @OutChan@ begins empty, but data written to
-- the argument @InChan@ from then on will be available from both the original
-- @OutChan@ and the one returned here, creating a kind of broadcast channel.
--
-- See also 'streamChan' for a faster alternative that might be appropriate.
dupChan :: InChan a -> IO (OutChan a)
{-# INLINE dupChan #-}
dupChan (InChan finalizee (ChanEnd counter streamHead)) = do
    hLoc <- readIORef streamHead
    loadLoadBarrier
    wCount <- readCounter counter
    counter' <- newCounter wCount
    streamHead' <- newIORef hLoc
    return $ OutChan finalizee $ ChanEnd counter' streamHead'


-- READING AND WRITING
--
--  We re-use the internals of Unagi.Unboxed, but use them a bit differently;
--  in particular where Unagi.Unboxed uses its SignalIntArray to indicate the
--  status of the corresponding ElementArray cell, we use it only to
--  disambiguate an unwritten cell from a written cell of a "magic" value,
--  which we'll describe below.
--  
--  When we're reading and writing values that can be written atomically (see
--  atomicUnicorn), and when that particular value is not equal to that magic
--  value we get a fast write path: simply write to the eArr. Likewise when a
--  reader reads from eArr and sees something /= atomicUnicorn, it can simply
--  return with it. In all other cases readers and writers must check in at the
--  sigArr, as in Unagi.Unboxed.

nonMagicCellWritten :: Int
nonMagicCellWritten = 1
-- and also: `cellEmpty` (imported)



-- | Write a value to the channel.
writeChan :: UnagiPrim a=> InChan a -> a -> IO ()
{-# INLINE writeChan #-}
writeChan (InChan _ ce) = \a-> mask_ $ do
    (segIx, (Stream sigArr eArr _ next), maybeUpdateStreamHead) <- moveToNextCell ce
    -- NOTE!: must write element both before updating stream head (see
    -- NoBlocking), and before signaling with CAS (if applicable):
    writeElementArray eArr segIx a

    let magic = atomicUnicorn
    when (isNothing magic || Just a == magic) $ do
      -- in which case a reader can't tell we've written just from a (possibly
      -- non-atomic) read from eArr:
      writeBarrier -- NOTE [1]
      P.writeByteArray sigArr segIx nonMagicCellWritten

    maybeUpdateStreamHead -- NOTE [2]
    -- try to pre-allocate next segment:
    when (segIx == 0) $ void $
      waitingAdvanceStream next 0
  -- [1] we need a write barrier here to make sure GHC maintains our ordering
  -- such that the element is written before we signal its availability with
  -- the write to sigArr that follows. See [2] in readChanOnExceptionUnmasked.
  --
  -- [2] Our final use of the head reference. We must make sure this IORef is
  -- not GC'd (and its finalizer run) until after our writes to the arrays
  -- above. See definition of maybeUpdateStreamHead.


-- | Returns immediately with an @'UT.Element' a@ future, which returns one
-- unique element when it becomes available via 'UT.tryRead'.
--
-- /Note/: This is a destructive operation. See 'UT.Element' for more details.
--
-- /Note re. exceptions/: When an async exception is raised during a @tryReadChan@ 
-- the message that the read would have returned is likely to be lost, just as
-- it would be when raised directly after this function returns.
tryReadChan :: UnagiPrim a=> OutChan a -> IO (UT.Element a)
{-# INLINE tryReadChan #-}
tryReadChan (OutChan _ ce) = do  -- see NoBlocking re. not masking
    (segIx, (Stream sigArr eArr _ _), maybeUpdateStreamHead) <- moveToNextCell ce
    maybeUpdateStreamHead
    return $ UT.Element $
        tryReadChanInternals segIx sigArr eArr

tryReadChanInternals :: UnagiPrim a=> Int -> SignalIntArray -> ElementArray a -> IO (Maybe a)
{-# INLINE tryReadChanInternals #-}
tryReadChanInternals segIx sigArr eArr = do
      let readElem = readElementArray eArr segIx
          slowRead = do
             sig <- P.readByteArray sigArr segIx
             if sig == nonMagicCellWritten
               then do
                 loadLoadBarrier -- see [1] in writeChan
                 Just <$> readElem
               else assert (sig == cellEmpty) $
                 return Nothing
      -- If we know writes of this type are atomic, we can determine if the
      -- element has been written, and possibly return it without checking
      -- sigArr.
      case atomicUnicorn of
           Just magic -> do
              el <- readElem
              if (el /= magic)
                -- Then we know `el` was atomically written:
                then return $ Just el
                else slowRead
           Nothing -> slowRead


-- | @readChan io c@ returns the next element from @c@, calling 'tryReadChan'
-- and looping on the 'UT.Element' returned, and calling @io@ at each iteration
-- when the element is not yet available. It throws 'BlockedIndefinitelyOnMVar'
-- when 'isActive' determines that a value will never be returned.
--
-- When used like @readChan 'yield'@ or @readChan ('threadDelay' 10)@ this is
-- the semantic equivalent to the blocking @readChan@ in the other
-- implementations.
readChan :: UnagiPrim a=> IO () -> OutChan a -> IO a
{-# INLINE readChan #-}
readChan io oc = tryReadChan oc >>= \el->
    let peekMaybe f = UT.tryRead el >>= maybe f return
        go = peekMaybe checkAndGo
        checkAndGo = do
            b <- isActive oc
            if b then io >> go
                 -- Do a necessary final check of the element:
                 else peekMaybe $ throwIO BlockedIndefinitelyOnMVar
     in go


-- | Produce the specified number of interleaved \"streams\" from a chan.
-- Nextuming a 'UI.Stream' is much faster than calling 'tryReadChan', and
-- might be useful when an MPSC queue is needed, or when multiple consumers
-- should be load-balanced in a round-robin fashion. 
--
-- Usage example:
--
-- @
--   do mapM_ ('writeChan' i) [1..9]
--      [str1, str2, str2] <- 'streamChan' 3 o
--      forkIO $ printStream str1   -- prints: 1,4,7
--      forkIO $ printStream str2   -- prints: 2,5,8
--      forkIO $ printStream str3   -- prints: 3,6,9
--    where 
--      printStream str = do
--        h <- 'UT.tryReadNext' str
--        case h of
--          'UT.Next' a str' -> print a >> printStream str'
--          -- We know that all values were already written, so a Pending tells 
--          -- us we can exit; in other cases we might call 'yield' and then 
--          -- retry that same @'UT.tryReadNext' str@:
--          'UT.Pending' -> return ()
-- @
--
-- Be aware: if one stream consumer falls behind another (e.g. because it is
-- slower) the number of elements in the queue which can't be GC'd will grow.
-- You may want to do some coordination of 'UT.Stream' consumers to prevent
-- this.
streamChan :: UnagiPrim a=> Int -> OutChan a -> IO [UT.Stream a]
{-# INLINE streamChan #-}
streamChan period (OutChan _ (ChanEnd counter streamHead)) = do
    when (period < 1) $ error "Argument to streamChan must be > 0"

    (StreamHead offsetInitial strInitial) <- readIORef streamHead
    -- Make sure the read above occurs before our readCounter:
    loadLoadBarrier
    -- Linearizable as the first unread element
    !ix0 <- readCounter counter

    -- Adapted from moveToNextCell, given a stream segment location `str0` and
    -- its offset, `offset0`, this navigates to the UT.Stream segment holding `ix`
    -- and begins recursing in our UT.Stream wrappers
    let stream !offset0 str0 !ix = UT.Stream $ do
            -- Find our stream segment and relative index:
            let (segsAway, segIx) = assert ((ix - offset0) >= 0) $
                         divMod_sEGMENT_LENGTH $! (ix - offset0)
                      -- (ix - offset0) `quotRem` sEGMENT_LENGTH
                {-# INLINE go #-}
                go 0 str = return str
                go !n (Stream _ _ _ next) =
                    waitingAdvanceStream next (nEW_SEGMENT_WAIT*segIx)
                      >>= go (n-1)
            -- the stream segment holding `ix`, and its calculated offset:
            str@(Stream sigArr eArr _ _) <- go segsAway str0
            let !strOffset = offset0+(segsAway `unsafeShiftL` lOG_SEGMENT_LENGTH)
            --                       (segsAway  *                 sEGMENT_LENGTH)
            mbEl <- tryReadChanInternals segIx sigArr eArr
            return $ case mbEl of
                 Nothing -> UT.Pending
                 Just el -> UT.Next el $ stream strOffset str (ix+period)

    return $ map (stream offsetInitial strInitial) $
     -- [ix0..(ix0+period-1)] -- WRONG (hint: overflow)!
        take period $ iterate (+1) ix0