{-# LANGUAGE BangPatterns , DeriveDataTypeable, CPP #-}
module Control.Concurrent.Chan.Unagi.NoBlocking.Unboxed.Internal
#ifdef NOT_optimised
    {-# WARNING "This library is unlikely to perform well on architectures other than i386/x64/aarch64" #-}
#endif
    (sEGMENT_LENGTH
    , InChan(..), OutChan(..), ChanEnd(..), Cell, Stream(..)
    , NextSegment(..), StreamHead(..)
    , newChanStarting, writeChan, tryReadChan, readChan, UT.Element(..)
    , dupChan
    , streamChan
    , isActive
    )
    where

-- Forked from src/Control/Concurrent/Chan/Unagi/NoBlocking/Internal.hs at
-- 9e2306330e with some code copied and modified from Unagi.Unboxed.
--
-- The main motivation for this variant is that it lets us take full advantage
-- of the atomicUnicorn trick, so in both read and write we need only use
-- sigArr when the value to be written == atomicUnicorn.
--
-- Some detailed NOTEs present in Control.Concurrent.Chan.Unagi.Unboxed have
-- been removed here although they still pertain. If you intend to work on this
-- module, please be sure you're familiar with those concerns.

import Data.IORef
import Control.Exception
import Data.Atomics.Counter.Fat
import Data.Atomics
import qualified Data.Primitive as P
import Control.Monad
import Control.Applicative
import Data.Bits
import Data.Typeable(Typeable)
import Data.Maybe

import Control.Concurrent.Chan.Unagi.Constants

import Prelude

-- We can re-use much of the Unagi.Unboxed implementation here, and some of
-- Unagi.NoBlocking (at least our types, which is important):
import Control.Concurrent.Chan.Unagi.Unboxed.Internal(
          ChanEnd(..), StreamHead(..), Cell, Stream(..)
        , NextSegment(..), moveToNextCell, waitingAdvanceStream, segSource
        , cellEmpty, readElementArray, writeElementArray
        , SignalIntArray, ElementArray, UnagiPrim(..))
import qualified Control.Concurrent.Chan.Unagi.NoBlocking.Types as UT


-- | The write end of a channel created with 'newChan'.
data InChan a = InChan !(IORef Bool) -- Used for creating an OutChan in dupChan
                       !(ChanEnd a)
    deriving (Typeable)

-- | The read end of a channel created with 'newChan'.
data OutChan a = OutChan !(IORef Bool) -- Is corresponding InChan still alive?
                         !(ChanEnd a) 
    deriving (Typeable)

instance Eq (InChan a) where
    (InChan IORef Bool
_ (ChanEnd AtomicCounter
_ IORef (StreamHead a)
headA)) == :: InChan a -> InChan a -> Bool
== (InChan IORef Bool
_ (ChanEnd AtomicCounter
_ IORef (StreamHead a)
headB))
        = IORef (StreamHead a)
headA IORef (StreamHead a) -> IORef (StreamHead a) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (StreamHead a)
headB
instance Eq (OutChan a) where
    (OutChan IORef Bool
_ (ChanEnd AtomicCounter
_ IORef (StreamHead a)
headA)) == :: OutChan a -> OutChan a -> Bool
== (OutChan IORef Bool
_ (ChanEnd AtomicCounter
_ IORef (StreamHead a)
headB))
        = IORef (StreamHead a)
headA IORef (StreamHead a) -> IORef (StreamHead a) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (StreamHead a)
headB


newChanStarting :: (UnagiPrim a)=> Int -> IO (InChan a, OutChan a)
{-# INLINE newChanStarting #-}
newChanStarting :: Int -> IO (InChan a, OutChan a)
newChanStarting !Int
startingCellOffset = do
    let undefinedNewIndexedMVar :: IO a
undefinedNewIndexedMVar = a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> IO a) -> a -> IO a
forall a b. (a -> b) -> a -> b
$ -- NOTE [1]
          [Char] -> a
forall a. HasCallStack => [Char] -> a
error [Char]
"Unagi.NoBlocking.Unboxed tried to use initial fake IndexedMVar"
    Stream a
stream <- (SignalIntArray
 -> ElementArray a
 -> IndexedMVar a
 -> IORef (NextSegment a)
 -> Stream a)
-> (SignalIntArray, ElementArray a)
-> IndexedMVar a
-> IORef (NextSegment a)
-> Stream a
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry SignalIntArray
-> ElementArray a
-> IndexedMVar a
-> IORef (NextSegment a)
-> Stream a
forall a.
SignalIntArray
-> ElementArray a
-> IndexedMVar a
-> IORef (NextSegment a)
-> Stream a
Stream ((SignalIntArray, ElementArray a)
 -> IndexedMVar a -> IORef (NextSegment a) -> Stream a)
-> IO (SignalIntArray, ElementArray a)
-> IO (IndexedMVar a -> IORef (NextSegment a) -> Stream a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (SignalIntArray, ElementArray a)
forall a. UnagiPrim a => IO (SignalIntArray, ElementArray a)
segSource 
                             IO (IndexedMVar a -> IORef (NextSegment a) -> Stream a)
-> IO (IndexedMVar a) -> IO (IORef (NextSegment a) -> Stream a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO (IndexedMVar a)
forall a. IO a
undefinedNewIndexedMVar 
                             IO (IORef (NextSegment a) -> Stream a)
-> IO (IORef (NextSegment a)) -> IO (Stream a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NextSegment a -> IO (IORef (NextSegment a))
forall a. a -> IO (IORef a)
newIORef NextSegment a
forall a. NextSegment a
NoSegment
    let end :: IO (ChanEnd a)
end = AtomicCounter -> IORef (StreamHead a) -> ChanEnd a
forall a. AtomicCounter -> IORef (StreamHead a) -> ChanEnd a
ChanEnd
                  (AtomicCounter -> IORef (StreamHead a) -> ChanEnd a)
-> IO AtomicCounter -> IO (IORef (StreamHead a) -> ChanEnd a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO AtomicCounter
newCounter Int
startingCellOffset
                  IO (IORef (StreamHead a) -> ChanEnd a)
-> IO (IORef (StreamHead a)) -> IO (ChanEnd a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> StreamHead a -> IO (IORef (StreamHead a))
forall a. a -> IO (IORef a)
newIORef (Int -> Stream a -> StreamHead a
forall a. Int -> Stream a -> StreamHead a
StreamHead Int
startingCellOffset Stream a
stream)
    inEnd :: ChanEnd a
inEnd@(ChanEnd AtomicCounter
_ IORef (StreamHead a)
inHeadRef) <- IO (ChanEnd a)
end
    IORef Bool
finalizee <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
True
    IO (Weak (IORef (StreamHead a))) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Weak (IORef (StreamHead a))) -> IO ())
-> IO (Weak (IORef (StreamHead a))) -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef (StreamHead a) -> IO () -> IO (Weak (IORef (StreamHead a)))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef (StreamHead a)
inHeadRef (IO () -> IO (Weak (IORef (StreamHead a))))
-> IO () -> IO (Weak (IORef (StreamHead a)))
forall a b. (a -> b) -> a -> b
$ do
        IO ()
writeBarrier
        IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
finalizee Bool
False
    (,) (IORef Bool -> ChanEnd a -> InChan a
forall a. IORef Bool -> ChanEnd a -> InChan a
InChan IORef Bool
finalizee ChanEnd a
inEnd) (OutChan a -> (InChan a, OutChan a))
-> IO (OutChan a) -> IO (InChan a, OutChan a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (IORef Bool -> ChanEnd a -> OutChan a
forall a. IORef Bool -> ChanEnd a -> OutChan a
OutChan IORef Bool
finalizee (ChanEnd a -> OutChan a) -> IO (ChanEnd a) -> IO (OutChan a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (ChanEnd a)
end)
  -- [1] We reuse most of Unagi.Unboxed's internals here, but unfortunately
  -- that implementation uses a Stream type with an IndexedMVar to coordinate
  -- blocking reads. Rather than do a lot of refactoring of Unagi.Unboxed, for
  -- now we just fake it here. Unagi.Unboxed.waitingAdvanceStream will actually
  -- create new IndexedMVars for each segment, but we hope at worst that they
  -- will be GC'd immediately even when many segments-worth of elements are in
  -- the queue; the main concern is not to accumulate lots of mutable boxed
  -- objects. TODO better later, maybe.


-- | An action that returns @False@ sometime after the chan no longer has any
-- writers.
--
-- After @False@ is returned, any 'UT.tryRead' which returns @Nothing@ can
-- be considered to be dead. Likewise for 'UT.tryReadNext'. Note that in the
-- blocking implementations a @BlockedIndefinitelyOnMVar@ exception is raised,
-- so this function is unnecessary.
isActive :: OutChan a -> IO Bool
isActive :: OutChan a -> IO Bool
isActive (OutChan IORef Bool
finalizee ChanEnd a
_) = do
    Bool
b <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
finalizee
    -- make sure that a tryRead that follows is not moved ahead:
    IO ()
loadLoadBarrier 
    Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
b


-- | Duplicate a chan: the returned @OutChan@ begins empty, but data written to
-- the argument @InChan@ from then on will be available from both the original
-- @OutChan@ and the one returned here, creating a kind of broadcast channel.
--
-- See also 'streamChan' for a faster alternative that might be appropriate.
dupChan :: InChan a -> IO (OutChan a)
{-# INLINE dupChan #-}
dupChan :: InChan a -> IO (OutChan a)
dupChan (InChan IORef Bool
finalizee (ChanEnd AtomicCounter
counter IORef (StreamHead a)
streamHead)) = do
    StreamHead a
hLoc <- IORef (StreamHead a) -> IO (StreamHead a)
forall a. IORef a -> IO a
readIORef IORef (StreamHead a)
streamHead
    IO ()
loadLoadBarrier
    Int
wCount <- AtomicCounter -> IO Int
readCounter AtomicCounter
counter
    AtomicCounter
counter' <- Int -> IO AtomicCounter
newCounter Int
wCount 
    IORef (StreamHead a)
streamHead' <- StreamHead a -> IO (IORef (StreamHead a))
forall a. a -> IO (IORef a)
newIORef StreamHead a
hLoc
    OutChan a -> IO (OutChan a)
forall (m :: * -> *) a. Monad m => a -> m a
return (OutChan a -> IO (OutChan a)) -> OutChan a -> IO (OutChan a)
forall a b. (a -> b) -> a -> b
$ IORef Bool -> ChanEnd a -> OutChan a
forall a. IORef Bool -> ChanEnd a -> OutChan a
OutChan IORef Bool
finalizee (ChanEnd a -> OutChan a) -> ChanEnd a -> OutChan a
forall a b. (a -> b) -> a -> b
$ AtomicCounter -> IORef (StreamHead a) -> ChanEnd a
forall a. AtomicCounter -> IORef (StreamHead a) -> ChanEnd a
ChanEnd AtomicCounter
counter' IORef (StreamHead a)
streamHead'


-- READING AND WRITING
--
--  We re-use the internals of Unagi.Unboxed, but use them a bit differently;
--  in particular where Unagi.Unboxed uses its SignalIntArray to indicate the
--  status of the corresponding ElementArray cell, we use it only to
--  disambiguate an unwritten cell from a written cell of a "magic" value,
--  which we'll describe below.
--  
--  When we're reading and writing values that can be written atomically (see
--  atomicUnicorn), and when that particular value is not equal to that magic
--  value we get a fast write path: simply write to the eArr. Likewise when a
--  reader reads from eArr and sees something /= atomicUnicorn, it can simply
--  return with it. In all other cases readers and writers must check in at the
--  sigArr, as in Unagi.Unboxed.

nonMagicCellWritten :: Int
nonMagicCellWritten :: Int
nonMagicCellWritten = Int
1
-- and also: `cellEmpty` (imported)



-- | Write a value to the channel.
writeChan :: UnagiPrim a=> InChan a -> a -> IO ()
{-# INLINE writeChan #-}
writeChan :: InChan a -> a -> IO ()
writeChan (InChan IORef Bool
_ ChanEnd a
ce) = \a
a-> IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do 
    (Int
segIx, (Stream SignalIntArray
sigArr ElementArray a
eArr IndexedMVar a
_ IORef (NextSegment a)
next), IO ()
maybeUpdateStreamHead) <- ChanEnd a -> IO (Int, Stream a, IO ())
forall a. UnagiPrim a => ChanEnd a -> IO (Int, Stream a, IO ())
moveToNextCell ChanEnd a
ce
    -- NOTE!: must write element both before updating stream head (see
    -- NoBlocking), and before signaling with CAS (if applicable):
    ElementArray a -> Int -> a -> IO ()
forall a. Prim a => ElementArray a -> Int -> a -> IO ()
writeElementArray ElementArray a
eArr Int
segIx a
a

    let magic :: Maybe a
magic = Maybe a
forall a. UnagiPrim a => Maybe a
atomicUnicorn
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing Maybe a
magic Bool -> Bool -> Bool
|| a -> Maybe a
forall a. a -> Maybe a
Just a
a Maybe a -> Maybe a -> Bool
forall a. Eq a => a -> a -> Bool
== Maybe a
magic) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      -- in which case a reader can't tell we've written just from a (possibly
      -- non-atomic) read from eArr:
      IO ()
writeBarrier -- NOTE [1]
      MutableByteArray (PrimState IO) -> Int -> Int -> IO ()
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MutableByteArray (PrimState m) -> Int -> a -> m ()
P.writeByteArray SignalIntArray
MutableByteArray (PrimState IO)
sigArr Int
segIx Int
nonMagicCellWritten
              
    IO ()
maybeUpdateStreamHead -- NOTE [2]
    -- try to pre-allocate next segment:
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
segIx Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (Stream a) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Stream a) -> IO ()) -> IO (Stream a) -> IO ()
forall a b. (a -> b) -> a -> b
$
      IORef (NextSegment a) -> Int -> IO (Stream a)
forall a.
UnagiPrim a =>
IORef (NextSegment a) -> Int -> IO (Stream a)
waitingAdvanceStream IORef (NextSegment a)
next Int
0
  -- [1] we need a write barrier here to make sure GHC maintains our ordering
  -- such that the element is written before we signal its availability with
  -- the write to sigArr that follows. See [2] in readChanOnExceptionUnmasked.
  --
  -- [2] Our final use of the head reference. We must make sure this IORef is
  -- not GC'd (and its finalizer run) until after our writes to the arrays
  -- above. See definition of maybeUpdateStreamHead.


-- | Returns immediately with an @'UT.Element' a@ future, which returns one
-- unique element when it becomes available via 'UT.tryRead'.
--
-- /Note/: This is a destructive operation. See 'UT.Element' for more details.
--
-- /Note re. exceptions/: When an async exception is raised during a @tryReadChan@ 
-- the message that the read would have returned is likely to be lost, just as
-- it would be when raised directly after this function returns.
tryReadChan :: UnagiPrim a=> OutChan a -> IO (UT.Element a)
{-# INLINE tryReadChan #-}
tryReadChan :: OutChan a -> IO (Element a)
tryReadChan (OutChan IORef Bool
_ ChanEnd a
ce) = do  -- see NoBlocking re. not masking
    (Int
segIx, (Stream SignalIntArray
sigArr ElementArray a
eArr IndexedMVar a
_ IORef (NextSegment a)
_), IO ()
maybeUpdateStreamHead) <- ChanEnd a -> IO (Int, Stream a, IO ())
forall a. UnagiPrim a => ChanEnd a -> IO (Int, Stream a, IO ())
moveToNextCell ChanEnd a
ce
    IO ()
maybeUpdateStreamHead
    Element a -> IO (Element a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Element a -> IO (Element a)) -> Element a -> IO (Element a)
forall a b. (a -> b) -> a -> b
$ IO (Maybe a) -> Element a
forall a. IO (Maybe a) -> Element a
UT.Element (IO (Maybe a) -> Element a) -> IO (Maybe a) -> Element a
forall a b. (a -> b) -> a -> b
$ 
        Int -> SignalIntArray -> ElementArray a -> IO (Maybe a)
forall a.
UnagiPrim a =>
Int -> SignalIntArray -> ElementArray a -> IO (Maybe a)
tryReadChanInternals Int
segIx SignalIntArray
sigArr ElementArray a
eArr

tryReadChanInternals :: UnagiPrim a=> Int -> SignalIntArray -> ElementArray a -> IO (Maybe a)
{-# INLINE tryReadChanInternals #-}
tryReadChanInternals :: Int -> SignalIntArray -> ElementArray a -> IO (Maybe a)
tryReadChanInternals Int
segIx SignalIntArray
sigArr ElementArray a
eArr = do
      let readElem :: IO a
readElem = ElementArray a -> Int -> IO a
forall a. Prim a => ElementArray a -> Int -> IO a
readElementArray ElementArray a
eArr Int
segIx
          slowRead :: IO (Maybe a)
slowRead = do 
             Int
sig <- MutableByteArray (PrimState IO) -> Int -> IO Int
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MutableByteArray (PrimState m) -> Int -> m a
P.readByteArray SignalIntArray
MutableByteArray (PrimState IO)
sigArr Int
segIx
             if Int
sig Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
nonMagicCellWritten
               then do 
                 IO ()
loadLoadBarrier -- see [1] in writeChan
                 a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> IO a -> IO (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO a
readElem
               else Bool -> IO (Maybe a) -> IO (Maybe a)
forall a. HasCallStack => Bool -> a -> a
assert (Int
sig Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
cellEmpty) (IO (Maybe a) -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$
                 Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
      -- If we know writes of this type are atomic, we can determine if the
      -- element has been written, and possibly return it without checking
      -- sigArr.
      case Maybe a
forall a. UnagiPrim a => Maybe a
atomicUnicorn of
           Just a
magic -> do
              a
el <- IO a
readElem
              if (a
el a -> a -> Bool
forall a. Eq a => a -> a -> Bool
/= a
magic) 
                -- Then we know `el` was atomically written:
                then Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
el
                else IO (Maybe a)
slowRead
           Maybe a
Nothing -> IO (Maybe a)
slowRead
 

-- | @readChan io c@ returns the next element from @c@, calling 'tryReadChan'
-- and looping on the 'UT.Element' returned, and calling @io@ at each iteration
-- when the element is not yet available. It throws 'BlockedIndefinitelyOnMVar'
-- when 'isActive' determines that a value will never be returned.
--
-- When used like @readChan 'yield'@ or @readChan ('threadDelay' 10)@ this is
-- the semantic equivalent to the blocking @readChan@ in the other
-- implementations.
readChan :: UnagiPrim a=> IO () -> OutChan a -> IO a
{-# INLINE readChan #-}
readChan :: IO () -> OutChan a -> IO a
readChan IO ()
io OutChan a
oc = OutChan a -> IO (Element a)
forall a. UnagiPrim a => OutChan a -> IO (Element a)
tryReadChan OutChan a
oc IO (Element a) -> (Element a -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Element a
el->
    let peekMaybe :: IO a -> IO a
peekMaybe IO a
f = Element a -> IO (Maybe a)
forall a. Element a -> IO (Maybe a)
UT.tryRead Element a
el IO (Maybe a) -> (Maybe a -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO a -> (a -> IO a) -> Maybe a -> IO a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe IO a
f a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return 
        go :: IO a
go = IO a -> IO a
peekMaybe IO a
checkAndGo
        checkAndGo :: IO a
checkAndGo = do 
            Bool
b <- OutChan a -> IO Bool
forall a. OutChan a -> IO Bool
isActive OutChan a
oc
            if Bool
b then IO ()
io IO () -> IO a -> IO a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO a
go
                 -- Do a necessary final check of the element:
                 else IO a -> IO a
peekMaybe (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ BlockedIndefinitelyOnMVar -> IO a
forall e a. Exception e => e -> IO a
throwIO BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar
     in IO a
go


-- | Produce the specified number of interleaved \"streams\" from a chan.
-- Nextuming a 'UI.Stream' is much faster than calling 'tryReadChan', and
-- might be useful when an MPSC queue is needed, or when multiple consumers
-- should be load-balanced in a round-robin fashion. 
--
-- Usage example:
--
-- @
--   do mapM_ ('writeChan' i) [1..9]
--      [str1, str2, str2] <- 'streamChan' 3 o
--      forkIO $ printStream str1   -- prints: 1,4,7
--      forkIO $ printStream str2   -- prints: 2,5,8
--      forkIO $ printStream str3   -- prints: 3,6,9
--    where 
--      printStream str = do
--        h <- 'UT.tryReadNext' str
--        case h of
--          'UT.Next' a str' -> print a >> printStream str'
--          -- We know that all values were already written, so a Pending tells 
--          -- us we can exit; in other cases we might call 'yield' and then 
--          -- retry that same @'UT.tryReadNext' str@:
--          'UT.Pending' -> return ()
-- @
--
-- Be aware: if one stream consumer falls behind another (e.g. because it is
-- slower) the number of elements in the queue which can't be GC'd will grow.
-- You may want to do some coordination of 'UT.Stream' consumers to prevent
-- this.
streamChan :: UnagiPrim a=> Int -> OutChan a -> IO [UT.Stream a]
{-# INLINE streamChan #-}
streamChan :: Int -> OutChan a -> IO [Stream a]
streamChan Int
period (OutChan IORef Bool
_ (ChanEnd AtomicCounter
counter IORef (StreamHead a)
streamHead)) = do
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
period Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Argument to streamChan must be > 0"

    (StreamHead Int
offsetInitial Stream a
strInitial) <- IORef (StreamHead a) -> IO (StreamHead a)
forall a. IORef a -> IO a
readIORef IORef (StreamHead a)
streamHead
    -- Make sure the read above occurs before our readCounter:
    IO ()
loadLoadBarrier
    -- Linearizable as the first unread element
    !Int
ix0 <- AtomicCounter -> IO Int
readCounter AtomicCounter
counter

    -- Adapted from moveToNextCell, given a stream segment location `str0` and
    -- its offset, `offset0`, this navigates to the UT.Stream segment holding `ix`
    -- and begins recursing in our UT.Stream wrappers
    let stream :: Int -> Stream a -> Int -> Stream a
stream !Int
offset0 Stream a
str0 !Int
ix = IO (Next a) -> Stream a
forall a. IO (Next a) -> Stream a
UT.Stream (IO (Next a) -> Stream a) -> IO (Next a) -> Stream a
forall a b. (a -> b) -> a -> b
$ do
            -- Find our stream segment and relative index:
            let (Int
segsAway, Int
segIx) = Bool -> (Int, Int) -> (Int, Int)
forall a. HasCallStack => Bool -> a -> a
assert ((Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset0) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0) ((Int, Int) -> (Int, Int)) -> (Int, Int) -> (Int, Int)
forall a b. (a -> b) -> a -> b
$ 
                         Int -> (Int, Int)
divMod_sEGMENT_LENGTH (Int -> (Int, Int)) -> Int -> (Int, Int)
forall a b. (a -> b) -> a -> b
$! (Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset0)
                      -- (ix - offset0) `quotRem` sEGMENT_LENGTH
                {-# INLINE go #-}
                go :: t -> Stream a -> IO (Stream a)
go t
0 Stream a
str = Stream a -> IO (Stream a)
forall (m :: * -> *) a. Monad m => a -> m a
return Stream a
str
                go !t
n (Stream SignalIntArray
_ ElementArray a
_ IndexedMVar a
_ IORef (NextSegment a)
next) =
                    IORef (NextSegment a) -> Int -> IO (Stream a)
forall a.
UnagiPrim a =>
IORef (NextSegment a) -> Int -> IO (Stream a)
waitingAdvanceStream IORef (NextSegment a)
next (Int
nEW_SEGMENT_WAITInt -> Int -> Int
forall a. Num a => a -> a -> a
*Int
segIx)
                      IO (Stream a) -> (Stream a -> IO (Stream a)) -> IO (Stream a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= t -> Stream a -> IO (Stream a)
go (t
nt -> t -> t
forall a. Num a => a -> a -> a
-t
1)
            -- the stream segment holding `ix`, and its calculated offset:
            str :: Stream a
str@(Stream SignalIntArray
sigArr ElementArray a
eArr IndexedMVar a
_ IORef (NextSegment a)
_) <- Int -> Stream a -> IO (Stream a)
forall t a.
(Num t, UnagiPrim a, Eq t) =>
t -> Stream a -> IO (Stream a)
go Int
segsAway Stream a
str0
            let !strOffset :: Int
strOffset = Int
offset0Int -> Int -> Int
forall a. Num a => a -> a -> a
+(Int
segsAway Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftL` Int
lOG_SEGMENT_LENGTH)  
            --                       (segsAway  *                 sEGMENT_LENGTH)
            Maybe a
mbEl <- Int -> SignalIntArray -> ElementArray a -> IO (Maybe a)
forall a.
UnagiPrim a =>
Int -> SignalIntArray -> ElementArray a -> IO (Maybe a)
tryReadChanInternals Int
segIx SignalIntArray
sigArr ElementArray a
eArr
            Next a -> IO (Next a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Next a -> IO (Next a)) -> Next a -> IO (Next a)
forall a b. (a -> b) -> a -> b
$ case Maybe a
mbEl of
                 Maybe a
Nothing -> Next a
forall a. Next a
UT.Pending
                 Just a
el -> a -> Stream a -> Next a
forall a. a -> Stream a -> Next a
UT.Next a
el (Stream a -> Next a) -> Stream a -> Next a
forall a b. (a -> b) -> a -> b
$ Int -> Stream a -> Int -> Stream a
stream Int
strOffset Stream a
str (Int
ixInt -> Int -> Int
forall a. Num a => a -> a -> a
+Int
period)

    [Stream a] -> IO [Stream a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([Stream a] -> IO [Stream a]) -> [Stream a] -> IO [Stream a]
forall a b. (a -> b) -> a -> b
$ (Int -> Stream a) -> [Int] -> [Stream a]
forall a b. (a -> b) -> [a] -> [b]
map (Int -> Stream a -> Int -> Stream a
forall a. UnagiPrim a => Int -> Stream a -> Int -> Stream a
stream Int
offsetInitial Stream a
strInitial) ([Int] -> [Stream a]) -> [Int] -> [Stream a]
forall a b. (a -> b) -> a -> b
$
     -- [ix0..(ix0+period-1)] -- WRONG (hint: overflow)!
        Int -> [Int] -> [Int]
forall a. Int -> [a] -> [a]
take Int
period ([Int] -> [Int]) -> [Int] -> [Int]
forall a b. (a -> b) -> a -> b
$ (Int -> Int) -> Int -> [Int]
forall a. (a -> a) -> a -> [a]
iterate (Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1) Int
ix0