{-# LANGUAGE BangPatterns , DeriveDataTypeable, CPP #-}
module Control.Concurrent.Chan.Unagi.Bounded.Internal
    ( InChan(..), OutChan(..), ChanEnd(..), StreamSegment, Cell(..), Stream(..)
    , writerCheckin, unblockWriters, tryWriterCheckin, WriterCheckpoint(..)
    , NextSegment(..), StreamHead(..)
    , newChanStarting, writeChan, readChan, readChanOnException
    , tryWriteChan, tryReadChan
    , dupChan
    , estimatedLength
    )
    where

-- NOTE: forked from src/Control/Concurrent/Chan/Unagi/Internal.hs 43706b2
--       some commentary not specific to this Bounded variant has been removed.
--       See the Unagi source for that.

import Control.Concurrent.MVar
import Data.IORef
import Control.Exception
import Control.Monad.Primitive(RealWorld)
import Data.Atomics.Counter.Fat
import Data.Atomics
import qualified Data.Primitive as P
import Control.Monad
import Control.Applicative
import Data.Bits
import Data.Maybe(fromMaybe,isJust)
import Data.Typeable(Typeable)
import GHC.Exts(inline)

import Utilities(nextHighestPowerOfTwo)
import qualified Control.Concurrent.Chan.Unagi.NoBlocking.Types as UT

import Prelude

-- | The write end of a channel created with 'newChan'.
data InChan a = InChan (IO Int) -- readCounterReader, for tryWriteChan
                       !(Ticket (Cell a)) 
                       !(ChanEnd a)
    deriving Typeable

-- | The read end of a channel created with 'newChan'.
newtype OutChan a = OutChan (ChanEnd a)
    deriving Typeable

instance Eq (InChan a) where
    (InChan IO Int
_ Ticket (Cell a)
_ (ChanEnd Int
_ Int
_ SegSource a
_ AtomicCounter
_ IORef (StreamHead a)
headA)) == :: InChan a -> InChan a -> Bool
== (InChan IO Int
_ Ticket (Cell a)
_ (ChanEnd Int
_ Int
_ SegSource a
_ AtomicCounter
_ IORef (StreamHead a)
headB))
        = IORef (StreamHead a)
headA IORef (StreamHead a) -> IORef (StreamHead a) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (StreamHead a)
headB
instance Eq (OutChan a) where
    (OutChan (ChanEnd Int
_ Int
_ SegSource a
_ AtomicCounter
_ IORef (StreamHead a)
headA)) == :: OutChan a -> OutChan a -> Bool
== (OutChan (ChanEnd Int
_ Int
_ SegSource a
_ AtomicCounter
_ IORef (StreamHead a)
headB))
        = IORef (StreamHead a)
headA IORef (StreamHead a) -> IORef (StreamHead a) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (StreamHead a)
headB

data ChanEnd a = 
            -- For efficient div and mod:
    ChanEnd !Int  -- logBase 2 BOUNDS
            !Int  -- BOUNDS - 1
            -- an efficient producer of segments of length BOUNDS:
            !(SegSource a)
            -- Both Chan ends must start with the same counter value.
            !AtomicCounter 
            -- the stream head; this must never point to a segment whose offset
            -- is greater than the counter value
            !(IORef (StreamHead a)) -- NOTE [1]
    deriving Typeable
 -- [1] For the writers' ChanEnd: the segment that appears in the StreamHead is
 -- implicitly unlocked for writers (the segment size being equal to the chan
 -- bounds). See 'writeChan' for notes on how we make sure to keep this
 -- invariant.

data StreamHead a = StreamHead !Int !(Stream a)

-- This is always of length BOUNDS
type StreamSegment a = P.MutableArray RealWorld (Cell a)

data Cell a = Empty | Written a | Blocking !(MVar a)

data Stream a = 
    Stream !(StreamSegment a)
           -- The next segment in the stream; new segments are allocated and
           -- put here by the reader of index-0 of the previous segment. That
           -- reader (and the next one or two) also do a tryPutMVar to the MVar
           -- below, to indicate that any blocked writers may proceed.
           !(IORef (Maybe (NextSegment a)))
           -- writers that find Nothing above, must check in with a possibly
           -- blocking readMVar here before proceeding (the slow path):


-- Next segment, installed by either a reader or a writer:
data NextSegment a = NextByWriter (Stream a)       -- the next stream segment
                                  !WriterCheckpoint -- blocking for this segment
                   -- a reader-installed one is implicitly unlocked for writers
                   -- so needs no checkpoint:
                   | NextByReader (Stream a)

-- helper accessors TODO consider making records
getNextRef :: NextSegment a -> IORef (Maybe (NextSegment a))
getNextRef :: NextSegment a -> IORef (Maybe (NextSegment a))
getNextRef NextSegment a
x = (\(Stream StreamSegment a
_ IORef (Maybe (NextSegment a))
nextSegRef)-> IORef (Maybe (NextSegment a))
nextSegRef) (Stream a -> IORef (Maybe (NextSegment a)))
-> Stream a -> IORef (Maybe (NextSegment a))
forall a b. (a -> b) -> a -> b
$ NextSegment a -> Stream a
forall a. NextSegment a -> Stream a
getStr NextSegment a
x

getStr :: NextSegment a -> Stream a
getStr :: NextSegment a -> Stream a
getStr (NextByReader Stream a
str) = Stream a
str
getStr (NextByWriter Stream a
str WriterCheckpoint
_) = Stream a
str

asReader, asWriter :: Bool
asReader :: Bool
asReader = Bool
True
asWriter :: Bool
asWriter = Bool
False


-- WRITER BLOCKING SCHEME OVERVIEW
-- -------------------------------
-- We use segments of size equal to the requested bounds. When a reader reads
-- index 0 of a segment it tries to pre-allocate the next segment, marking it
-- installed by reader (NextByReader), which indicates to writers who read it
-- that they may write and return without blocking (and this makes the queue
-- loosely bounded between size n and n*2).
--
-- Whenever a reader encounters a segment (in waitingAdvanceStream) installed
-- by a writer it unblocks writers and rewrites the NextBy* constructor to
-- NextByReader, replacing the installed stream segment.
--
-- Writers first make their write available (by writing to the segment) before
-- doing any blocking. This is more efficient, lets us handle async exceptions
-- in a principled way without changing the semantics. This also means that in
-- some cases a writer will install the next segment, marked installed by
-- writer, insicating that writers must checkin and block; readers will mark
-- these segments as installed by reader to avoid unnecessary overhead when the
-- segment becomes unlocked as described in the paragraph above.
--
-- The writer StreamHead is only ever updated by a writer that sees that a
-- segment is unlocked for writing (either because the writer has returned from
-- blocking on that segment, or because it sees that it was installed by a
-- reader); in this way a writer knows if its segment is at the StreamHead that
-- it is free to write and return without blocking (writerCheckin).


newChanStarting :: Int -> Int -> IO (InChan a, OutChan a)
{-# INLINE newChanStarting #-}
newChanStarting :: Int -> Int -> IO (InChan a, OutChan a)
newChanStarting !Int
startingCellOffset !Int
sizeDirty = do
    let !size :: Int
size = Int -> Int
nextHighestPowerOfTwo Int
sizeDirty
        !logBounds :: Int
logBounds = Float -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Float -> Int) -> Float -> Int
forall a b. (a -> b) -> a -> b
$ Float -> Float -> Float
forall a. Floating a => a -> a -> a
logBase (Float
2::Float) (Float -> Float) -> Float -> Float
forall a b. (a -> b) -> a -> b
$ Int -> Float
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
size
        !boundsMn1 :: Int
boundsMn1 = Int
size Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1

    SegSource a
segSource <- Int -> IO (SegSource a)
forall a. Int -> IO (SegSource a)
newSegmentSource Int
size
    StreamSegment a
firstSeg <- SegSource a
segSource
    -- collect a ticket to save for writer CAS
    Ticket (Cell a)
savedEmptyTkt <- StreamSegment a -> Int -> IO (Ticket (Cell a))
forall a. MutableArray RealWorld a -> Int -> IO (Ticket a)
readArrayElem StreamSegment a
firstSeg Int
0
    Stream a
stream <- StreamSegment a -> IORef (Maybe (NextSegment a)) -> Stream a
forall a.
StreamSegment a -> IORef (Maybe (NextSegment a)) -> Stream a
Stream StreamSegment a
firstSeg (IORef (Maybe (NextSegment a)) -> Stream a)
-> IO (IORef (Maybe (NextSegment a))) -> IO (Stream a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (NextSegment a) -> IO (IORef (Maybe (NextSegment a)))
forall a. a -> IO (IORef a)
newIORef Maybe (NextSegment a)
forall a. Maybe a
Nothing
    let end :: IO (ChanEnd a)
end = Int
-> Int
-> SegSource a
-> AtomicCounter
-> IORef (StreamHead a)
-> ChanEnd a
forall a.
Int
-> Int
-> SegSource a
-> AtomicCounter
-> IORef (StreamHead a)
-> ChanEnd a
ChanEnd Int
logBounds Int
boundsMn1 SegSource a
segSource 
                  (AtomicCounter -> IORef (StreamHead a) -> ChanEnd a)
-> IO AtomicCounter -> IO (IORef (StreamHead a) -> ChanEnd a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO AtomicCounter
newCounter Int
startingCellOffset
                  IO (IORef (StreamHead a) -> ChanEnd a)
-> IO (IORef (StreamHead a)) -> IO (ChanEnd a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> StreamHead a -> IO (IORef (StreamHead a))
forall a. a -> IO (IORef a)
newIORef (Int -> Stream a -> StreamHead a
forall a. Int -> Stream a -> StreamHead a
StreamHead Int
startingCellOffset Stream a
stream)
    endR :: ChanEnd a
endR@(ChanEnd Int
_ Int
_ SegSource a
_ AtomicCounter
counterR IORef (StreamHead a)
_) <- IO (ChanEnd a)
end
    ChanEnd a
endW <- IO (ChanEnd a)
end
    Bool -> IO (InChan a, OutChan a) -> IO (InChan a, OutChan a)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
size Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 Bool -> Bool -> Bool
&& (Int
boundsMn1 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
2 Int -> Int -> Int
forall a b. (Num a, Integral b) => a -> b -> a
^ Int
logBounds) (IO (InChan a, OutChan a) -> IO (InChan a, OutChan a))
-> IO (InChan a, OutChan a) -> IO (InChan a, OutChan a)
forall a b. (a -> b) -> a -> b
$
       (InChan a, OutChan a) -> IO (InChan a, OutChan a)
forall (m :: * -> *) a. Monad m => a -> m a
return ( IO Int -> Ticket (Cell a) -> ChanEnd a -> InChan a
forall a. IO Int -> Ticket (Cell a) -> ChanEnd a -> InChan a
InChan (AtomicCounter -> IO Int
readCounter AtomicCounter
counterR) Ticket (Cell a)
savedEmptyTkt ChanEnd a
endW
              , ChanEnd a -> OutChan a
forall a. ChanEnd a -> OutChan a
OutChan ChanEnd a
endR )

-- | Duplicate a chan: the returned @OutChan@ begins empty, but data written to
-- the argument @InChan@ from then on will be available from both the original
-- @OutChan@ and the one returned here, creating a kind of broadcast channel.
--
-- Writers will be blocked only when the fastest reader falls behind the
-- bounds; slower readers of duplicated 'OutChan' may fall arbitrarily behind.
dupChan :: InChan a -> IO (OutChan a)
{-# INLINE dupChan #-}
dupChan :: InChan a -> IO (OutChan a)
dupChan (InChan IO Int
_ Ticket (Cell a)
_ (ChanEnd Int
logBounds Int
boundsMn1 SegSource a
segSource AtomicCounter
counter IORef (StreamHead a)
streamHead)) = do
    StreamHead a
hLoc <- IORef (StreamHead a) -> IO (StreamHead a)
forall a. IORef a -> IO a
readIORef IORef (StreamHead a)
streamHead
    IO ()
loadLoadBarrier
    Int
wCount <- AtomicCounter -> IO Int
readCounter AtomicCounter
counter
    
    AtomicCounter
counter' <- Int -> IO AtomicCounter
newCounter Int
wCount 
    IORef (StreamHead a)
streamHead' <- StreamHead a -> IO (IORef (StreamHead a))
forall a. a -> IO (IORef a)
newIORef StreamHead a
hLoc
    OutChan a -> IO (OutChan a)
forall (m :: * -> *) a. Monad m => a -> m a
return (OutChan a -> IO (OutChan a)) -> OutChan a -> IO (OutChan a)
forall a b. (a -> b) -> a -> b
$ ChanEnd a -> OutChan a
forall a. ChanEnd a -> OutChan a
OutChan (ChanEnd a -> OutChan a) -> ChanEnd a -> OutChan a
forall a b. (a -> b) -> a -> b
$ Int
-> Int
-> SegSource a
-> AtomicCounter
-> IORef (StreamHead a)
-> ChanEnd a
forall a.
Int
-> Int
-> SegSource a
-> AtomicCounter
-> IORef (StreamHead a)
-> ChanEnd a
ChanEnd Int
logBounds Int
boundsMn1 SegSource a
segSource AtomicCounter
counter' IORef (StreamHead a)
streamHead'


-- | Write a value to the channel. If the chan is full this will block.
--
-- To be precise this /may/ block when the number of elements in the queue 
-- @>= size@, and will certainly block when @>= size*2@, where @size@ is the
-- argument passed to 'newChan', rounded up to the next highest power of two.
--
-- /Note re. exceptions/: In the case that an async exception is raised 
-- while blocking here, the write will nonetheless succeed. When not blocking,
-- exceptions are masked. Thus writes always succeed once 'writeChan' is
-- entered.
writeChan :: InChan a -> a -> IO ()
{-# INLINE writeChan #-}
writeChan :: InChan a -> a -> IO ()
writeChan InChan a
c = \a
a-> Bool -> InChan a -> a -> IO ()
forall a. Bool -> InChan a -> a -> IO ()
writeChanWithBlocking Bool
True InChan a
c a
a

writeChanWithBlocking :: Bool -> InChan a -> a -> IO ()
{-# INLINE writeChanWithBlocking #-}
writeChanWithBlocking :: Bool -> InChan a -> a -> IO ()
writeChanWithBlocking Bool
canBlock (InChan IO Int
_ Ticket (Cell a)
savedEmptyTkt ChanEnd a
ce) a
a = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do 
    (Int
segIx, NextSegment a
nextSeg, IO ()
updateStreamHeadIfNecessary) <- Bool -> ChanEnd a -> IO (Int, NextSegment a, IO ())
forall a. Bool -> ChanEnd a -> IO (Int, NextSegment a, IO ())
moveToNextCell Bool
asWriter ChanEnd a
ce
    let (StreamSegment a
seg, Maybe WriterCheckpoint
maybeCheckpt) = case NextSegment a
nextSeg of
          NextByWriter (Stream StreamSegment a
s IORef (Maybe (NextSegment a))
_) WriterCheckpoint
checkpt -> (StreamSegment a
s, WriterCheckpoint -> Maybe WriterCheckpoint
forall a. a -> Maybe a
Just WriterCheckpoint
checkpt)
          -- if installed by reader, no need to check in:
          NextByReader (Stream StreamSegment a
s IORef (Maybe (NextSegment a))
_)         -> (StreamSegment a
s, Maybe WriterCheckpoint
forall a. Maybe a
Nothing)

    (Bool
success,Ticket (Cell a)
nonEmptyTkt) <- StreamSegment a
-> Int -> Ticket (Cell a) -> Cell a -> IO (Bool, Ticket (Cell a))
forall a.
MutableArray RealWorld a
-> Int -> Ticket a -> a -> IO (Bool, Ticket a)
casArrayElem StreamSegment a
seg Int
segIx Ticket (Cell a)
savedEmptyTkt (a -> Cell a
forall a. a -> Cell a
Written a
a)
    if Bool
success
      -- NOTE: We must only block AFTER writing to be async exception-safe.
      then IO ()
-> (WriterCheckpoint -> IO ()) -> Maybe WriterCheckpoint -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO ()
updateStreamHeadIfNecessary -- NOTE [2]
                 (\WriterCheckpoint
checkpt-> do
                     Bool
segUnlocked <- if Bool
canBlock 
                                     then Bool
True Bool -> IO () -> IO Bool
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ WriterCheckpoint -> IO ()
writerCheckin WriterCheckpoint
checkpt
                                     else WriterCheckpoint -> IO Bool
tryWriterCheckin WriterCheckpoint
checkpt
                     Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
segUnlocked (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                       IO ()
updateStreamHeadIfNecessary ) -- NOTE [1/2]
                 Maybe WriterCheckpoint
maybeCheckpt
                        
      -- If CAS failed then a reader beat us, so we know we're not out of
      -- bounds and don't need to writerCheckin
      else case Ticket (Cell a) -> Cell a
forall a. Ticket a -> a
peekTicket Ticket (Cell a)
nonEmptyTkt of
                Blocking MVar a
v -> do MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
v a
a
                                 IO ()
updateStreamHeadIfNecessary  -- NOTE [1] 
                Cell a
Empty      -> [Char] -> IO ()
forall a. (?callStack::CallStack) => [Char] -> a
error [Char]
"Stored Empty Ticket went stale!"
                Written a
_  -> [Char] -> IO ()
forall a. (?callStack::CallStack) => [Char] -> a
error [Char]
"Nearly Impossible! Expected Blocking"
 -- [1] At this point we know that 'seg' is unlocked for writers because a
 -- reader unblocked us, so it's safe to update the StreamHead with this
 -- segment (if we moved to a new segment). This way we maintain the invariant
 -- that the StreamHead segment is always known "unlocked" to writers.
 --
 -- [2] Similarly when in tryWriteChan we only update the stream head when
 -- we see that it was installed by reader, or we see that it was unlocked,
 -- but for the latter we check without blocking.



-- | Try to write a value to the channel, aborting if the write is likely to
-- exceed the bounds, returning a @Bool@ indicating whether the write was
-- successful.
--
-- This function never blocks, but may occasionally write successfully to a
-- queue that is already "full". Unlike 'writeChan' this function treats the
-- requested bounds (raised to nearest power of two) strictly, rather than
-- using the @n .. n*2@ range. The more concurrent writes and reads that are
-- happening, the more inaccurate the estimate of the chan's size is likely to
-- be.
tryWriteChan :: InChan a -> a -> IO Bool
{-# INLINE tryWriteChan #-}
tryWriteChan :: InChan a -> a -> IO Bool
tryWriteChan c :: InChan a
c@(InChan IO Int
_ Ticket (Cell a)
_ (ChanEnd Int
_ Int
boundsMn1 SegSource a
_ AtomicCounter
_ IORef (StreamHead a)
_)) = \a
a-> do
    -- Similar caveats w/r/t counter overflow correctness as elsewhere apply
    -- here: where this would lap and give incorrect results we have already
    -- died with OOM:
    Int
len <- InChan a -> IO Int
forall a. InChan a -> IO Int
estimatedLength InChan a
c
    if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
boundsMn1 
        then Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        else Bool -> InChan a -> a -> IO ()
forall a. Bool -> InChan a -> a -> IO ()
writeChanWithBlocking Bool
False InChan a
c a
a IO () -> IO Bool -> IO Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True

-- | Return the estimated length of a bounded queue
--
-- The more concurrent writes and reads that are happening, the more inaccurate
-- the estimate of the chan's size is likely to be.
estimatedLength :: InChan a -> IO Int
{-# INLINE estimatedLength #-}
estimatedLength :: InChan a -> IO Int
estimatedLength (InChan IO Int
readCounterReader Ticket (Cell a)
_ (ChanEnd Int
_ Int
_ SegSource a
_ AtomicCounter
counter IORef (StreamHead a)
_)) = do
    Int
ixR <- IO Int
readCounterReader
    Int
ixW <- AtomicCounter -> IO Int
readCounter AtomicCounter
counter
    Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Int) -> Int -> IO Int
forall a b. (a -> b) -> a -> b
$ Int
ixW Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
ixR

-- The core of our 'read' operations, with exception handler:
readSegIxUnmasked :: (IO a -> IO a) -> (StreamSegment a, Int) -> IO a
{-# INLINE readSegIxUnmasked #-}
readSegIxUnmasked :: (IO a -> IO a) -> (StreamSegment a, Int) -> IO a
readSegIxUnmasked IO a -> IO a
h = \(StreamSegment a
seg,Int
segIx)-> do
    Ticket (Cell a)
cellTkt <- StreamSegment a -> Int -> IO (Ticket (Cell a))
forall a. MutableArray RealWorld a -> Int -> IO (Ticket a)
readArrayElem StreamSegment a
seg Int
segIx
    case Ticket (Cell a) -> Cell a
forall a. Ticket a -> a
peekTicket Ticket (Cell a)
cellTkt of
         Written a
a -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
         Cell a
Empty -> do
            MVar a
v <- IO (MVar a)
forall a. IO (MVar a)
newEmptyMVar
            (Bool
success,Ticket (Cell a)
elseWrittenCell) <- StreamSegment a
-> Int -> Ticket (Cell a) -> Cell a -> IO (Bool, Ticket (Cell a))
forall a.
MutableArray RealWorld a
-> Int -> Ticket a -> a -> IO (Bool, Ticket a)
casArrayElem StreamSegment a
seg Int
segIx Ticket (Cell a)
cellTkt (MVar a -> Cell a
forall a. MVar a -> Cell a
Blocking MVar a
v)
            if Bool
success 
              then MVar a -> IO a
readBlocking MVar a
v
              else case Ticket (Cell a) -> Cell a
forall a. Ticket a -> a
peekTicket Ticket (Cell a)
elseWrittenCell of
                        -- In the meantime a writer has written. Good!
                        Written a
a -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
                        -- ...or a dupChan reader initiated blocking:
                        Blocking MVar a
v2 -> MVar a -> IO a
readBlocking MVar a
v2
                        Cell a
_ -> [Char] -> IO a
forall a. (?callStack::CallStack) => [Char] -> a
error [Char]
"Impossible! Expecting Written or Blocking"
         Blocking MVar a
v -> MVar a -> IO a
readBlocking MVar a
v
  -- N.B. must use `readMVar` here to support `dupChan`:
  where readBlocking :: MVar a -> IO a
readBlocking MVar a
v = (IO a -> IO a) -> IO a -> IO a
forall a. a -> a
inline IO a -> IO a
h (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ MVar a -> IO a
forall a. MVar a -> IO a
readMVar MVar a
v 

-- factored out for `tryReadChan` below:
startReadChan :: OutChan a -> IO (StreamSegment a, Int)
{-# INLINE startReadChan #-}
startReadChan :: OutChan a -> IO (StreamSegment a, Int)
startReadChan (OutChan ce :: ChanEnd a
ce@(ChanEnd Int
_ Int
_ SegSource a
segSource AtomicCounter
_ IORef (StreamHead a)
_)) = do
    (Int
segIx, NextSegment a
nextSeg, IO ()
updateStreamHeadIfNecessary) <- Bool -> ChanEnd a -> IO (Int, NextSegment a, IO ())
forall a. Bool -> ChanEnd a -> IO (Int, NextSegment a, IO ())
moveToNextCell Bool
asReader ChanEnd a
ce
    let (StreamSegment a
seg,IORef (Maybe (NextSegment a))
next) = case NextSegment a
nextSeg of
            NextByReader (Stream StreamSegment a
s IORef (Maybe (NextSegment a))
n) -> (StreamSegment a
s,IORef (Maybe (NextSegment a))
n)
            NextSegment a
_ -> [Char] -> (StreamSegment a, IORef (Maybe (NextSegment a)))
forall a. (?callStack::CallStack) => [Char] -> a
error [Char]
"moveToNextCell returned a non-reader-installed next segment to readSegIxUnmasked"
    -- try to pre-allocate next segment:
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
segIx Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (NextSegment a) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (NextSegment a) -> IO ()) -> IO (NextSegment a) -> IO ()
forall a b. (a -> b) -> a -> b
$
      Bool
-> IORef (Maybe (NextSegment a))
-> SegSource a
-> Int
-> IO (NextSegment a)
forall a.
Bool
-> IORef (Maybe (NextSegment a))
-> SegSource a
-> Int
-> IO (NextSegment a)
waitingAdvanceStream Bool
asReader IORef (Maybe (NextSegment a))
next SegSource a
segSource Int
0

    IO ()
updateStreamHeadIfNecessary
    (StreamSegment a, Int) -> IO (StreamSegment a, Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamSegment a
seg,Int
segIx)



-- | Returns immediately with:
--
--  - an @'UT.Element' a@ future, which returns one unique element when it
--  becomes available via 'UT.tryRead'.
--
--  - a blocking @IO@ action that returns the element when it becomes available.
--
-- /Note/: This is a destructive operation. See 'UT.Element' for more details.
--
-- /Note re. exceptions/: When an async exception is raised during a @tryReadChan@ 
-- the message that the read would have returned is likely to be lost, just as
-- it would be when raised directly after this function returns.
tryReadChan :: OutChan a -> IO (UT.Element a, IO a)
{-# INLINE tryReadChan #-}
tryReadChan :: OutChan a -> IO (Element a, IO a)
tryReadChan OutChan a
oc = do -- no mask necessary
    (StreamSegment a
seg,Int
segIx) <- OutChan a -> IO (StreamSegment a, Int)
forall a. OutChan a -> IO (StreamSegment a, Int)
startReadChan OutChan a
oc

    (Element a, IO a) -> IO (Element a, IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return ( 
       IO (Maybe a) -> Element a
forall a. IO (Maybe a) -> Element a
UT.Element (IO (Maybe a) -> Element a) -> IO (Maybe a) -> Element a
forall a b. (a -> b) -> a -> b
$ do
        Cell a
cell <- MutableArray (PrimState IO) (Cell a) -> Int -> IO (Cell a)
forall (m :: * -> *) a.
PrimMonad m =>
MutableArray (PrimState m) a -> Int -> m a
P.readArray StreamSegment a
MutableArray (PrimState IO) (Cell a)
seg Int
segIx
        case Cell a
cell of
             Written a
a -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
a
             Cell a
Empty -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
             Blocking MVar a
v -> MVar a -> IO (Maybe a)
forall a. MVar a -> IO (Maybe a)
tryReadMVar MVar a
v

     , (IO a -> IO a) -> (StreamSegment a, Int) -> IO a
forall a. (IO a -> IO a) -> (StreamSegment a, Int) -> IO a
readSegIxUnmasked IO a -> IO a
forall a. a -> a
id (StreamSegment a
seg,Int
segIx)
     )



-- | Read an element from the chan, blocking if the chan is empty.
--
-- /Note re. exceptions/: When an async exception is raised during a @readChan@ 
-- the message that the read would have returned is likely to be lost, even when
-- the read is known to be blocked on an empty queue. If you need to handle
-- this scenario, you can use 'readChanOnException'.
readChan :: OutChan a -> IO a
{-# INLINE readChan #-}
readChan :: OutChan a -> IO a
readChan = \OutChan a
oc-> OutChan a -> IO (StreamSegment a, Int)
forall a. OutChan a -> IO (StreamSegment a, Int)
startReadChan OutChan a
oc IO (StreamSegment a, Int)
-> ((StreamSegment a, Int) -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IO a -> IO a) -> (StreamSegment a, Int) -> IO a
forall a. (IO a -> IO a) -> (StreamSegment a, Int) -> IO a
readSegIxUnmasked IO a -> IO a
forall a. a -> a
id

-- | Like 'readChan' but allows recovery of the queue element which would have
-- been read, in the case that an async exception is raised during the read. To
-- be precise exceptions are raised, and the handler run, only when
-- @readChanOnException@ is blocking.
--
-- The second argument is a handler that takes a blocking IO action returning
-- the element, and performs some recovery action.  When the handler is called,
-- the passed @IO a@ is the only way to access the element.
readChanOnException :: OutChan a -> (IO a -> IO ()) -> IO a
{-# INLINE readChanOnException #-}
readChanOnException :: OutChan a -> (IO a -> IO ()) -> IO a
readChanOnException OutChan a
oc IO a -> IO ()
h = IO a -> IO a
forall a. IO a -> IO a
mask_ (
    OutChan a -> IO (StreamSegment a, Int)
forall a. OutChan a -> IO (StreamSegment a, Int)
startReadChan OutChan a
oc IO (StreamSegment a, Int)
-> ((StreamSegment a, Int) -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= 
      (IO a -> IO a) -> (StreamSegment a, Int) -> IO a
forall a. (IO a -> IO a) -> (StreamSegment a, Int) -> IO a
readSegIxUnmasked (\IO a
io-> IO a
io IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`onException` (IO a -> IO ()
h IO a
io))
    )


-- increments counter, finds stream segment of corresponding cell (updating the
-- stream head pointer as needed), and returns the stream segment and relative
-- index of our cell.
moveToNextCell :: Bool -> ChanEnd a -> IO (Int, NextSegment a, IO ())
{-# INLINE moveToNextCell #-}
moveToNextCell :: Bool -> ChanEnd a -> IO (Int, NextSegment a, IO ())
moveToNextCell Bool
isReader (ChanEnd Int
logBounds Int
boundsMn1 SegSource a
segSource AtomicCounter
counter IORef (StreamHead a)
streamHead) = do
    (StreamHead Int
offset0 Stream a
str0) <- IORef (StreamHead a) -> IO (StreamHead a)
forall a. IORef a -> IO a
readIORef IORef (StreamHead a)
streamHead
    Int
ix <- Int -> AtomicCounter -> IO Int
incrCounter Int
1 AtomicCounter
counter
    let !relIx :: Int
relIx = Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset0
        !segsAway :: Int
segsAway = Int
relIx Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftR` Int
logBounds -- `div` bounds
        !segIx :: Int
segIx    = Int
relIx Int -> Int -> Int
forall a. Bits a => a -> a -> a
.&. Int
boundsMn1            -- `mod` bounds
        ~Int
nEW_SEGMENT_WAIT = (Int
boundsMn1 Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
12) Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
25 
    
        go :: t -> NextSegment a -> IO (NextSegment a)
go  t
0 NextSegment a
nextSeg = NextSegment a -> IO (NextSegment a)
forall (m :: * -> *) a. Monad m => a -> m a
return NextSegment a
nextSeg
        go !t
n NextSegment a
nextSeg =
            Bool
-> IORef (Maybe (NextSegment a))
-> SegSource a
-> Int
-> IO (NextSegment a)
forall a.
Bool
-> IORef (Maybe (NextSegment a))
-> SegSource a
-> Int
-> IO (NextSegment a)
waitingAdvanceStream Bool
isReader (NextSegment a -> IORef (Maybe (NextSegment a))
forall a. NextSegment a -> IORef (Maybe (NextSegment a))
getNextRef NextSegment a
nextSeg) SegSource a
segSource (Int
nEW_SEGMENT_WAITInt -> Int -> Int
forall a. Num a => a -> a -> a
*Int
segIx) -- NOTE [1]
              IO (NextSegment a)
-> (NextSegment a -> IO (NextSegment a)) -> IO (NextSegment a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= t -> NextSegment a -> IO (NextSegment a)
go (t
nt -> t -> t
forall a. Num a => a -> a -> a
-t
1)
 
    NextSegment a
nextSeg <- Bool -> IO (NextSegment a) -> IO (NextSegment a)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
relIx Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0) (IO (NextSegment a) -> IO (NextSegment a))
-> IO (NextSegment a) -> IO (NextSegment a)
forall a b. (a -> b) -> a -> b
$
              -- go segsAway $ NextByReader str0  -- NOTE [2]
                 -- NOTE: this is redundant, since `go` doesn't want to get
                 -- inlined/unrolled
                 if Int
segsAway Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 
                     then NextSegment a -> IO (NextSegment a)
forall (m :: * -> *) a. Monad m => a -> m a
return      (NextSegment a -> IO (NextSegment a))
-> NextSegment a -> IO (NextSegment a)
forall a b. (a -> b) -> a -> b
$ Stream a -> NextSegment a
forall a. Stream a -> NextSegment a
NextByReader Stream a
str0 
                     else Int -> NextSegment a -> IO (NextSegment a)
forall t. (Eq t, Num t) => t -> NextSegment a -> IO (NextSegment a)
go Int
segsAway (NextSegment a -> IO (NextSegment a))
-> NextSegment a -> IO (NextSegment a)
forall a b. (a -> b) -> a -> b
$ Stream a -> NextSegment a
forall a. Stream a -> NextSegment a
NextByReader Stream a
str0  -- NOTE [2]

    -- writers and readers must perform this continuation at different points:
    let updateStreamHeadIfNecessary :: IO ()
updateStreamHeadIfNecessary = 
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
segsAway Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            let !offsetN :: Int
offsetN = --(segsAway * bounds)
                   Int
offset0 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
segsAway Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftL` Int
logBounds) 
            IORef (StreamHead a) -> StreamHead a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (StreamHead a)
streamHead (StreamHead a -> IO ()) -> StreamHead a -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Stream a -> StreamHead a
forall a. Int -> Stream a -> StreamHead a
StreamHead Int
offsetN (Stream a -> StreamHead a) -> Stream a -> StreamHead a
forall a b. (a -> b) -> a -> b
$ NextSegment a -> Stream a
forall a. NextSegment a -> Stream a
getStr NextSegment a
nextSeg

    (Int, NextSegment a, IO ()) -> IO (Int, NextSegment a, IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
segIx, NextSegment a
nextSeg, IO ()
updateStreamHeadIfNecessary)
  -- [1] All readers or writers needing to work with a not-yet-created segment
  -- race to create it, but those past index 0 have progressively long waits.
  -- The constant here is an approximation of the way we calculate it in
  -- Control.Concurrent.Chan.Unagi.Constants.nEW_SEGMENT_WAIT
  --
  -- [2] We start the loop with 'NextByReader' effectively meaning that the head
  -- segment was installed by a reader, or really just indicating that the
  -- writer has no need to check-in for blocking. This is always the case for
  -- the head stream; see `writeChan` NOTE 1.



-- TODO play with inlining and look at core; we'd like the conditionals to disappear
-- INVARIANTS: 
--   - if isReader, after returning, the nextSegRef will be marked NextByReader
--   - the 'nextSegRef' is only ever modified from Nothing -> Just (NextBy*)
waitingAdvanceStream :: Bool -> IORef (Maybe (NextSegment a)) -> SegSource a 
                     -> Int -> IO (NextSegment a)
waitingAdvanceStream :: Bool
-> IORef (Maybe (NextSegment a))
-> SegSource a
-> Int
-> IO (NextSegment a)
waitingAdvanceStream Bool
isReader IORef (Maybe (NextSegment a))
nextSegRef SegSource a
segSource = Int -> IO (NextSegment a)
forall t. (Ord t, Num t) => t -> IO (NextSegment a)
go where
  cas :: Ticket (Maybe (NextSegment a))
-> NextSegment a -> IO (Bool, Ticket (Maybe (NextSegment a)))
cas Ticket (Maybe (NextSegment a))
tk = IORef (Maybe (NextSegment a))
-> Ticket (Maybe (NextSegment a))
-> Maybe (NextSegment a)
-> IO (Bool, Ticket (Maybe (NextSegment a)))
forall a. IORef a -> Ticket a -> a -> IO (Bool, Ticket a)
casIORef IORef (Maybe (NextSegment a))
nextSegRef Ticket (Maybe (NextSegment a))
tk (Maybe (NextSegment a)
 -> IO (Bool, Ticket (Maybe (NextSegment a))))
-> (NextSegment a -> Maybe (NextSegment a))
-> NextSegment a
-> IO (Bool, Ticket (Maybe (NextSegment a)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NextSegment a -> Maybe (NextSegment a)
forall a. a -> Maybe a
Just

  -- extract the installed Just NextSegment from the result of the cas
  peekInstalled :: (a, Ticket (Maybe a)) -> a
peekInstalled (a
_, Ticket (Maybe a)
nextSegTk) =
     a -> Maybe a -> a
forall a. a -> Maybe a -> a
fromMaybe ([Char] -> a
forall a. (?callStack::CallStack) => [Char] -> a
error [Char]
"Impossible! This should only have been a Just NextBy* segment") (Maybe a -> a) -> Maybe a -> a
forall a b. (a -> b) -> a -> b
$
       Ticket (Maybe a) -> Maybe a
forall a. Ticket a -> a
peekTicket Ticket (Maybe a)
nextSegTk

  readerUnblockAndReturn :: NextSegment a -> IO (NextSegment a)
readerUnblockAndReturn NextSegment a
nextSeg = Bool -> IO (NextSegment a) -> IO (NextSegment a)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert Bool
isReader (IO (NextSegment a) -> IO (NextSegment a))
-> IO (NextSegment a) -> IO (NextSegment a)
forall a b. (a -> b) -> a -> b
$ case NextSegment a
nextSeg of
      -- if a writer won, try to set as NextByReader so that every writer
      -- to this seg doesn't have to check in, and unblockWriters
      NextByWriter Stream a
strAlreadyInstalled WriterCheckpoint
checkpt -> do
          WriterCheckpoint -> IO ()
unblockWriters WriterCheckpoint
checkpt  -- idempotent
          let nextSeg' :: NextSegment a
nextSeg' = Stream a -> NextSegment a
forall a. Stream a -> NextSegment a
NextByReader Stream a
strAlreadyInstalled
          IORef (Maybe (NextSegment a)) -> Maybe (NextSegment a) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe (NextSegment a))
nextSegRef (Maybe (NextSegment a) -> IO ()) -> Maybe (NextSegment a) -> IO ()
forall a b. (a -> b) -> a -> b
$ NextSegment a -> Maybe (NextSegment a)
forall a. a -> Maybe a
Just NextSegment a
nextSeg'
          NextSegment a -> IO (NextSegment a)
forall (m :: * -> *) a. Monad m => a -> m a
return NextSegment a
nextSeg'

      NextSegment a
nextByReader -> NextSegment a -> IO (NextSegment a)
forall (m :: * -> *) a. Monad m => a -> m a
return NextSegment a
nextByReader

  go :: t -> IO (NextSegment a)
go t
wait = Bool -> IO (NextSegment a) -> IO (NextSegment a)
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (t
wait t -> t -> Bool
forall a. Ord a => a -> a -> Bool
>= t
0) (IO (NextSegment a) -> IO (NextSegment a))
-> IO (NextSegment a) -> IO (NextSegment a)
forall a b. (a -> b) -> a -> b
$ do
    Ticket (Maybe (NextSegment a))
tk <- IORef (Maybe (NextSegment a))
-> IO (Ticket (Maybe (NextSegment a)))
forall a. IORef a -> IO (Ticket a)
readForCAS IORef (Maybe (NextSegment a))
nextSegRef
    case Ticket (Maybe (NextSegment a)) -> Maybe (NextSegment a)
forall a. Ticket a -> a
peekTicket Ticket (Maybe (NextSegment a))
tk of
         -- Rare, slow path: 
         --   In readers: we outran reader 0 of the previous segment (or it was
         --  descheduled) who was tasked with setting this up.
         --   In writers: there are number writer threads > bounds, or reader 0
         --  of previous segment was slow or descheduled.
         Maybe (NextSegment a)
Nothing 
           | t
wait t -> t -> Bool
forall a. Ord a => a -> a -> Bool
> t
0 -> t -> IO (NextSegment a)
go (t
wait t -> t -> t
forall a. Num a => a -> a -> a
- t
1)
             -- Create a potential next segment and try to insert it:
           | Bool
otherwise -> do 
               Stream a
potentialStrNext <- StreamSegment a -> IORef (Maybe (NextSegment a)) -> Stream a
forall a.
StreamSegment a -> IORef (Maybe (NextSegment a)) -> Stream a
Stream (StreamSegment a -> IORef (Maybe (NextSegment a)) -> Stream a)
-> SegSource a -> IO (IORef (Maybe (NextSegment a)) -> Stream a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SegSource a
segSource IO (IORef (Maybe (NextSegment a)) -> Stream a)
-> IO (IORef (Maybe (NextSegment a))) -> IO (Stream a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe (NextSegment a) -> IO (IORef (Maybe (NextSegment a)))
forall a. a -> IO (IORef a)
newIORef Maybe (NextSegment a)
forall a. Maybe a
Nothing
               if Bool
isReader
                 then do
                   -- This may fail because of either a competing reader or
                   -- writer which certainly modified this to a Just value
                   (Bool, Ticket (Maybe (NextSegment a)))
installed <- Ticket (Maybe (NextSegment a))
-> NextSegment a -> IO (Bool, Ticket (Maybe (NextSegment a)))
cas Ticket (Maybe (NextSegment a))
tk (NextSegment a -> IO (Bool, Ticket (Maybe (NextSegment a))))
-> NextSegment a -> IO (Bool, Ticket (Maybe (NextSegment a)))
forall a b. (a -> b) -> a -> b
$ Stream a -> NextSegment a
forall a. Stream a -> NextSegment a
NextByReader Stream a
potentialStrNext
                   -- The segment we're reading from (or any *behind* the one
                   -- we're reading from) is always unblocked for writers:
                   NextSegment a -> IO (NextSegment a)
readerUnblockAndReturn (NextSegment a -> IO (NextSegment a))
-> NextSegment a -> IO (NextSegment a)
forall a b. (a -> b) -> a -> b
$ (Bool, Ticket (Maybe (NextSegment a))) -> NextSegment a
forall a a. (a, Ticket (Maybe a)) -> a
peekInstalled (Bool, Ticket (Maybe (NextSegment a)))
installed
                 else do
                   WriterCheckpoint
potentialCheckpt <- MVar () -> WriterCheckpoint
WriterCheckpoint (MVar () -> WriterCheckpoint)
-> IO (MVar ()) -> IO WriterCheckpoint
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
                   -- This may fail because of either a competing reader or
                   -- writer which certainly modified this to a Just value
                   (Bool, Ticket (Maybe (NextSegment a))) -> NextSegment a
forall a a. (a, Ticket (Maybe a)) -> a
peekInstalled ((Bool, Ticket (Maybe (NextSegment a))) -> NextSegment a)
-> IO (Bool, Ticket (Maybe (NextSegment a))) -> IO (NextSegment a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Ticket (Maybe (NextSegment a))
-> NextSegment a -> IO (Bool, Ticket (Maybe (NextSegment a)))
cas Ticket (Maybe (NextSegment a))
tk (NextSegment a -> IO (Bool, Ticket (Maybe (NextSegment a))))
-> NextSegment a -> IO (Bool, Ticket (Maybe (NextSegment a)))
forall a b. (a -> b) -> a -> b
$ 
                           Stream a -> WriterCheckpoint -> NextSegment a
forall a. Stream a -> WriterCheckpoint -> NextSegment a
NextByWriter Stream a
potentialStrNext WriterCheckpoint
potentialCheckpt)
   
         -- Fast path: Another reader or writer has already advanced the
         -- stream. Most likely reader 0 of the last segment.
         Just NextSegment a
nextSeg 
           | Bool
isReader  -> NextSegment a -> IO (NextSegment a)
readerUnblockAndReturn NextSegment a
nextSeg
           | Bool
otherwise -> NextSegment a -> IO (NextSegment a)
forall (m :: * -> *) a. Monad m => a -> m a
return NextSegment a
nextSeg

type SegSource a = IO (StreamSegment a)

newSegmentSource :: Int -> IO (SegSource a)
newSegmentSource :: Int -> IO (SegSource a)
newSegmentSource Int
size = do
    -- NOTE: evaluate Empty seems to be required here in order to not raise
    -- "Stored Empty Ticket went stale!"  exception when in GHCi.
    MutableArray RealWorld (Cell a)
arr <- Cell a -> IO (Cell a)
forall a. a -> IO a
evaluate Cell a
forall a. Cell a
Empty IO (Cell a) -> (Cell a -> SegSource a) -> SegSource a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int -> Cell a -> IO (MutableArray (PrimState IO) (Cell a))
forall (m :: * -> *) a.
PrimMonad m =>
Int -> a -> m (MutableArray (PrimState m) a)
P.newArray Int
size
    SegSource a -> IO (SegSource a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MutableArray (PrimState IO) (Cell a)
-> Int -> Int -> IO (MutableArray (PrimState IO) (Cell a))
forall (m :: * -> *) a.
PrimMonad m =>
MutableArray (PrimState m) a
-> Int -> Int -> m (MutableArray (PrimState m) a)
P.cloneMutableArray MutableArray RealWorld (Cell a)
MutableArray (PrimState IO) (Cell a)
arr Int
0 Int
size)


-- This begins empty, but several readers will `put` without coordination, to
-- ensure it's filled. Meanwhile writers are blocked on a `readMVar` (see
-- writerCheckin) waiting to proceed. 
newtype WriterCheckpoint = WriterCheckpoint (MVar ())

-- idempotent
unblockWriters :: WriterCheckpoint -> IO ()
unblockWriters :: WriterCheckpoint -> IO ()
unblockWriters (WriterCheckpoint MVar ()
v) =
    IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar ()
v ()

-- A writer knows that it doesn't need to call this when:
--   - its segment is in the StreamHead, or...
--   - its segment was reached by a NextByReader
writerCheckin :: WriterCheckpoint -> IO ()
writerCheckin :: WriterCheckpoint -> IO ()
writerCheckin (WriterCheckpoint MVar ()
v) = do
-- On GHC > 7.8 we have an atomic `readMVar`.  On earlier GHC readMVar is
-- take+put, creating a race condition; in this case we use take+tryPut
-- ensuring the MVar stays full even if a reader's tryPut slips an () in:
#if __GLASGOW_HASKELL__ < 708
    takeMVar v >>= void . tryPutMVar v
#else
    IO () -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
readMVar MVar ()
v
#endif

-- returns immediately indicating whether the checkpt is currently unblocked.
tryWriterCheckin :: WriterCheckpoint -> IO Bool
tryWriterCheckin :: WriterCheckpoint -> IO Bool
tryWriterCheckin (WriterCheckpoint MVar ()
v) =
-- On GHC > 7.8 we have an atomic `tryReadMVar`.  On earlier GHC readMVar is
-- take+put, creating a race condition; in this case we use take+tryPut
-- ensuring the MVar stays full even if a reader's tryPut slips an () in.
-- HOWEVER, tryReadMVar is also buggy in GHC < 7.8.3
--   https://ghc.haskell.org/trac/ghc/ticket/9148
#ifdef TRYREADMVAR
    Maybe () -> Bool
forall a. Maybe a -> Bool
isJust (Maybe () -> Bool) -> IO (Maybe ()) -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVar () -> IO (Maybe ())
forall a. MVar a -> IO (Maybe a)
tryReadMVar MVar ()
v
#else
    tryTakeMVar v >>= maybe (return False) ((True <$) . tryPutMVar v)
#endif