{-# LANGUAGE BangPatterns , DeriveDataTypeable, CPP #-}
module Control.Concurrent.Chan.Unagi.Internal
#ifdef NOT_optimised
{-# WARNING "This library is unlikely to perform well on architectures other than i386/x64/aarch64" #-}
#endif
(sEGMENT_LENGTH
, InChan(..), OutChan(..), ChanEnd(..), StreamSegment, Cell(..), Stream(..)
, NextSegment(..), StreamHead(..)
, newChanStarting, writeChan, readChan, readChanOnException
, dupChan, tryReadChan
, moveToNextCell, waitingAdvanceStream, newSegmentSource
, assertionCanary
)
where
import Control.Concurrent.MVar
import Data.IORef
import Control.Exception
import Control.Monad.Primitive(RealWorld)
import Data.Atomics.Counter.Fat
import Data.Atomics
import qualified Data.Primitive as P
import Control.Monad
import Control.Applicative
import Data.Bits
import Data.Typeable(Typeable)
import GHC.Exts(inline)
import Control.Concurrent.Chan.Unagi.Constants
import qualified Control.Concurrent.Chan.Unagi.NoBlocking.Types as UT
import Utilities(touchIORef)
import Prelude
data InChan a = InChan !(Ticket (Cell a)) !(ChanEnd (Cell a))
deriving Typeable
instance Eq (InChan a) where
(InChan Ticket (Cell a)
_ (ChanEnd SegSource (Cell a)
_ AtomicCounter
_ IORef (StreamHead (Cell a))
headA)) == :: InChan a -> InChan a -> Bool
== (InChan Ticket (Cell a)
_ (ChanEnd SegSource (Cell a)
_ AtomicCounter
_ IORef (StreamHead (Cell a))
headB))
= IORef (StreamHead (Cell a))
headA IORef (StreamHead (Cell a)) -> IORef (StreamHead (Cell a)) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (StreamHead (Cell a))
headB
newtype OutChan a = OutChan (ChanEnd (Cell a))
deriving (OutChan a -> OutChan a -> Bool
(OutChan a -> OutChan a -> Bool)
-> (OutChan a -> OutChan a -> Bool) -> Eq (OutChan a)
forall a. OutChan a -> OutChan a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: OutChan a -> OutChan a -> Bool
$c/= :: forall a. OutChan a -> OutChan a -> Bool
== :: OutChan a -> OutChan a -> Bool
$c== :: forall a. OutChan a -> OutChan a -> Bool
Eq,Typeable)
data ChanEnd cell_a =
ChanEnd !(SegSource cell_a)
!AtomicCounter
!(IORef (StreamHead cell_a))
deriving Typeable
instance Eq (ChanEnd a) where
(ChanEnd SegSource a
_ AtomicCounter
_ IORef (StreamHead a)
headA) == :: ChanEnd a -> ChanEnd a -> Bool
== (ChanEnd SegSource a
_ AtomicCounter
_ IORef (StreamHead a)
headB)
= IORef (StreamHead a)
headA IORef (StreamHead a) -> IORef (StreamHead a) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (StreamHead a)
headB
data StreamHead cell_a = StreamHead !Int !(Stream cell_a)
type StreamSegment cell_a = P.MutableArray RealWorld cell_a
data Cell a = Empty | Written a | Blocking !(MVar a)
data Stream cell_a =
Stream !(StreamSegment cell_a)
!(IORef (NextSegment cell_a))
data NextSegment cell_a = NoSegment | Next !(Stream cell_a)
newChanStarting :: Int -> IO (InChan a, OutChan a)
{-# INLINE newChanStarting #-}
newChanStarting :: Int -> IO (InChan a, OutChan a)
newChanStarting !Int
startingCellOffset = do
SegSource (Cell a)
segSource <- Cell a -> IO (SegSource (Cell a))
forall cell_a. cell_a -> IO (SegSource cell_a)
newSegmentSource Cell a
forall a. Cell a
Empty
StreamSegment (Cell a)
firstSeg <- SegSource (Cell a)
segSource
Ticket (Cell a)
savedEmptyTkt <- StreamSegment (Cell a) -> Int -> IO (Ticket (Cell a))
forall a. MutableArray RealWorld a -> Int -> IO (Ticket a)
readArrayElem StreamSegment (Cell a)
firstSeg Int
0
Stream (Cell a)
stream <- StreamSegment (Cell a)
-> IORef (NextSegment (Cell a)) -> Stream (Cell a)
forall cell_a.
StreamSegment cell_a -> IORef (NextSegment cell_a) -> Stream cell_a
Stream StreamSegment (Cell a)
firstSeg (IORef (NextSegment (Cell a)) -> Stream (Cell a))
-> IO (IORef (NextSegment (Cell a))) -> IO (Stream (Cell a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> NextSegment (Cell a) -> IO (IORef (NextSegment (Cell a)))
forall a. a -> IO (IORef a)
newIORef NextSegment (Cell a)
forall cell_a. NextSegment cell_a
NoSegment
let end :: IO (ChanEnd (Cell a))
end = SegSource (Cell a)
-> AtomicCounter -> IORef (StreamHead (Cell a)) -> ChanEnd (Cell a)
forall cell_a.
SegSource cell_a
-> AtomicCounter -> IORef (StreamHead cell_a) -> ChanEnd cell_a
ChanEnd SegSource (Cell a)
segSource
(AtomicCounter -> IORef (StreamHead (Cell a)) -> ChanEnd (Cell a))
-> IO AtomicCounter
-> IO (IORef (StreamHead (Cell a)) -> ChanEnd (Cell a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO AtomicCounter
newCounter Int
startingCellOffset
IO (IORef (StreamHead (Cell a)) -> ChanEnd (Cell a))
-> IO (IORef (StreamHead (Cell a))) -> IO (ChanEnd (Cell a))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> StreamHead (Cell a) -> IO (IORef (StreamHead (Cell a)))
forall a. a -> IO (IORef a)
newIORef (Int -> Stream (Cell a) -> StreamHead (Cell a)
forall cell_a. Int -> Stream cell_a -> StreamHead cell_a
StreamHead Int
startingCellOffset Stream (Cell a)
stream)
(InChan a -> OutChan a -> (InChan a, OutChan a))
-> IO (InChan a) -> IO (OutChan a) -> IO (InChan a, OutChan a)
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 (,) (Ticket (Cell a) -> ChanEnd (Cell a) -> InChan a
forall a. Ticket (Cell a) -> ChanEnd (Cell a) -> InChan a
InChan Ticket (Cell a)
savedEmptyTkt (ChanEnd (Cell a) -> InChan a)
-> IO (ChanEnd (Cell a)) -> IO (InChan a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (ChanEnd (Cell a))
end) (ChanEnd (Cell a) -> OutChan a
forall a. ChanEnd (Cell a) -> OutChan a
OutChan (ChanEnd (Cell a) -> OutChan a)
-> IO (ChanEnd (Cell a)) -> IO (OutChan a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (ChanEnd (Cell a))
end)
dupChan :: InChan a -> IO (OutChan a)
{-# INLINE dupChan #-}
dupChan :: InChan a -> IO (OutChan a)
dupChan (InChan Ticket (Cell a)
_ (ChanEnd SegSource (Cell a)
segSource AtomicCounter
counter IORef (StreamHead (Cell a))
streamHead)) = do
StreamHead (Cell a)
hLoc <- IORef (StreamHead (Cell a)) -> IO (StreamHead (Cell a))
forall a. IORef a -> IO a
readIORef IORef (StreamHead (Cell a))
streamHead
IO ()
loadLoadBarrier
Int
wCount <- AtomicCounter -> IO Int
readCounter AtomicCounter
counter
AtomicCounter
counter' <- Int -> IO AtomicCounter
newCounter Int
wCount
IORef (StreamHead (Cell a))
streamHead' <- StreamHead (Cell a) -> IO (IORef (StreamHead (Cell a)))
forall a. a -> IO (IORef a)
newIORef StreamHead (Cell a)
hLoc
OutChan a -> IO (OutChan a)
forall (m :: * -> *) a. Monad m => a -> m a
return (OutChan a -> IO (OutChan a)) -> OutChan a -> IO (OutChan a)
forall a b. (a -> b) -> a -> b
$ ChanEnd (Cell a) -> OutChan a
forall a. ChanEnd (Cell a) -> OutChan a
OutChan (SegSource (Cell a)
-> AtomicCounter -> IORef (StreamHead (Cell a)) -> ChanEnd (Cell a)
forall cell_a.
SegSource cell_a
-> AtomicCounter -> IORef (StreamHead cell_a) -> ChanEnd cell_a
ChanEnd SegSource (Cell a)
segSource AtomicCounter
counter' IORef (StreamHead (Cell a))
streamHead')
writeChan :: InChan a -> a -> IO ()
{-# INLINE writeChan #-}
writeChan :: InChan a -> a -> IO ()
writeChan (InChan Ticket (Cell a)
savedEmptyTkt ce :: ChanEnd (Cell a)
ce@(ChanEnd SegSource (Cell a)
segSource AtomicCounter
_ IORef (StreamHead (Cell a))
_)) = \a
a-> IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
(Int
segIx, (Stream StreamSegment (Cell a)
seg IORef (NextSegment (Cell a))
next), IO ()
maybeUpdateStreamHead) <- ChanEnd (Cell a) -> IO (Int, Stream (Cell a), IO ())
forall cell_a. ChanEnd cell_a -> IO (Int, Stream cell_a, IO ())
moveToNextCell ChanEnd (Cell a)
ce
IO ()
maybeUpdateStreamHead
(Bool
success,Ticket (Cell a)
nonEmptyTkt) <- StreamSegment (Cell a)
-> Int -> Ticket (Cell a) -> Cell a -> IO (Bool, Ticket (Cell a))
forall a.
MutableArray RealWorld a
-> Int -> Ticket a -> a -> IO (Bool, Ticket a)
casArrayElem StreamSegment (Cell a)
seg Int
segIx Ticket (Cell a)
savedEmptyTkt (a -> Cell a
forall a. a -> Cell a
Written a
a)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
segIx Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (Stream (Cell a)) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Stream (Cell a)) -> IO ()) -> IO (Stream (Cell a)) -> IO ()
forall a b. (a -> b) -> a -> b
$
IORef (NextSegment (Cell a))
-> SegSource (Cell a) -> Int -> IO (Stream (Cell a))
forall cell_a.
IORef (NextSegment cell_a)
-> SegSource cell_a -> Int -> IO (Stream cell_a)
waitingAdvanceStream IORef (NextSegment (Cell a))
next SegSource (Cell a)
segSource Int
0
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
success) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
case Ticket (Cell a) -> Cell a
forall a. Ticket a -> a
peekTicket Ticket (Cell a)
nonEmptyTkt of
Blocking MVar a
v -> MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
v a
a
Cell a
Empty -> [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Stored Empty Ticket went stale!"
Written a
_ -> [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Nearly Impossible! Expected Blocking"
readSegIxUnmasked :: (IO a -> IO a) -> (Int, Stream (Cell a), IO ()) -> IO a
{-# INLINE readSegIxUnmasked #-}
readSegIxUnmasked :: (IO a -> IO a) -> (Int, Stream (Cell a), IO ()) -> IO a
readSegIxUnmasked IO a -> IO a
h =
\(Int
segIx, (Stream StreamSegment (Cell a)
seg IORef (NextSegment (Cell a))
_), IO ()
maybeUpdateStreamHead)-> do
IO ()
maybeUpdateStreamHead
Ticket (Cell a)
cellTkt <- StreamSegment (Cell a) -> Int -> IO (Ticket (Cell a))
forall a. MutableArray RealWorld a -> Int -> IO (Ticket a)
readArrayElem StreamSegment (Cell a)
seg Int
segIx
case Ticket (Cell a) -> Cell a
forall a. Ticket a -> a
peekTicket Ticket (Cell a)
cellTkt of
Written a
a -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Cell a
Empty -> do
MVar a
v <- IO (MVar a)
forall a. IO (MVar a)
newEmptyMVar
(Bool
success,Ticket (Cell a)
elseWrittenCell) <- StreamSegment (Cell a)
-> Int -> Ticket (Cell a) -> Cell a -> IO (Bool, Ticket (Cell a))
forall a.
MutableArray RealWorld a
-> Int -> Ticket a -> a -> IO (Bool, Ticket a)
casArrayElem StreamSegment (Cell a)
seg Int
segIx Ticket (Cell a)
cellTkt (MVar a -> Cell a
forall a. MVar a -> Cell a
Blocking MVar a
v)
if Bool
success
then MVar a -> IO a
readBlocking MVar a
v
else case Ticket (Cell a) -> Cell a
forall a. Ticket a -> a
peekTicket Ticket (Cell a)
elseWrittenCell of
Written a
a -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
Blocking MVar a
v2 -> MVar a -> IO a
readBlocking MVar a
v2
Cell a
_ -> [Char] -> IO a
forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible! Expecting Written or Blocking"
Blocking MVar a
v -> MVar a -> IO a
readBlocking MVar a
v
where readBlocking :: MVar a -> IO a
readBlocking MVar a
v = (IO a -> IO a) -> IO a -> IO a
forall a. a -> a
inline IO a -> IO a
h (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ MVar a -> IO a
forall a. MVar a -> IO a
readMVar MVar a
v
tryReadChan :: OutChan a -> IO (UT.Element a, IO a)
{-# INLINE tryReadChan #-}
tryReadChan :: OutChan a -> IO (Element a, IO a)
tryReadChan (OutChan ChanEnd (Cell a)
ce) = do
segStuff :: (Int, Stream (Cell a), IO ())
segStuff@(Int
segIx, (Stream StreamSegment (Cell a)
seg IORef (NextSegment (Cell a))
_), IO ()
maybeUpdateStreamHead) <- ChanEnd (Cell a) -> IO (Int, Stream (Cell a), IO ())
forall cell_a. ChanEnd cell_a -> IO (Int, Stream cell_a, IO ())
moveToNextCell ChanEnd (Cell a)
ce
IO ()
maybeUpdateStreamHead
(Element a, IO a) -> IO (Element a, IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (
IO (Maybe a) -> Element a
forall a. IO (Maybe a) -> Element a
UT.Element (IO (Maybe a) -> Element a) -> IO (Maybe a) -> Element a
forall a b. (a -> b) -> a -> b
$ do
Cell a
cell <- MutableArray (PrimState IO) (Cell a) -> Int -> IO (Cell a)
forall (m :: * -> *) a.
PrimMonad m =>
MutableArray (PrimState m) a -> Int -> m a
P.readArray StreamSegment (Cell a)
MutableArray (PrimState IO) (Cell a)
seg Int
segIx
case Cell a
cell of
Written a
a -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
a
Cell a
Empty -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
Blocking MVar a
v -> MVar a -> IO (Maybe a)
forall a. MVar a -> IO (Maybe a)
tryReadMVar MVar a
v
, (IO a -> IO a) -> (Int, Stream (Cell a), IO ()) -> IO a
forall a. (IO a -> IO a) -> (Int, Stream (Cell a), IO ()) -> IO a
readSegIxUnmasked IO a -> IO a
forall a. a -> a
id (Int, Stream (Cell a), IO ())
segStuff
)
readChan :: OutChan a -> IO a
{-# INLINE readChan #-}
readChan :: OutChan a -> IO a
readChan = \(OutChan ChanEnd (Cell a)
ce)-> ChanEnd (Cell a) -> IO (Int, Stream (Cell a), IO ())
forall cell_a. ChanEnd cell_a -> IO (Int, Stream cell_a, IO ())
moveToNextCell ChanEnd (Cell a)
ce IO (Int, Stream (Cell a), IO ())
-> ((Int, Stream (Cell a), IO ()) -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IO a -> IO a) -> (Int, Stream (Cell a), IO ()) -> IO a
forall a. (IO a -> IO a) -> (Int, Stream (Cell a), IO ()) -> IO a
readSegIxUnmasked IO a -> IO a
forall a. a -> a
id
readChanOnException :: OutChan a -> (IO a -> IO ()) -> IO a
{-# INLINE readChanOnException #-}
readChanOnException :: OutChan a -> (IO a -> IO ()) -> IO a
readChanOnException (OutChan ChanEnd (Cell a)
ce) IO a -> IO ()
h = IO a -> IO a
forall a. IO a -> IO a
mask_ (
ChanEnd (Cell a) -> IO (Int, Stream (Cell a), IO ())
forall cell_a. ChanEnd cell_a -> IO (Int, Stream cell_a, IO ())
moveToNextCell ChanEnd (Cell a)
ce IO (Int, Stream (Cell a), IO ())
-> ((Int, Stream (Cell a), IO ()) -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
(IO a -> IO a) -> (Int, Stream (Cell a), IO ()) -> IO a
forall a. (IO a -> IO a) -> (Int, Stream (Cell a), IO ()) -> IO a
readSegIxUnmasked (\IO a
io-> IO a
io IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`onException` (IO a -> IO ()
h IO a
io)) )
moveToNextCell :: ChanEnd cell_a -> IO (Int, Stream cell_a, IO ())
{-# INLINE moveToNextCell #-}
moveToNextCell :: ChanEnd cell_a -> IO (Int, Stream cell_a, IO ())
moveToNextCell (ChanEnd SegSource cell_a
segSource AtomicCounter
counter IORef (StreamHead cell_a)
streamHead) = do
(StreamHead Int
offset0 Stream cell_a
str0) <- IORef (StreamHead cell_a) -> IO (StreamHead cell_a)
forall a. IORef a -> IO a
readIORef IORef (StreamHead cell_a)
streamHead
Int
ix <- Int -> AtomicCounter -> IO Int
incrCounter Int
1 AtomicCounter
counter
let (Int
segsAway, Int
segIx) = Bool -> (Int, Int) -> (Int, Int)
forall a. HasCallStack => Bool -> a -> a
assert ((Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset0) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0) ((Int, Int) -> (Int, Int)) -> (Int, Int) -> (Int, Int)
forall a b. (a -> b) -> a -> b
$
Int -> (Int, Int)
divMod_sEGMENT_LENGTH (Int -> (Int, Int)) -> Int -> (Int, Int)
forall a b. (a -> b) -> a -> b
$! (Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset0)
{-# INLINE go #-}
go :: t -> Stream cell_a -> IO (Stream cell_a)
go t
0 Stream cell_a
str = Stream cell_a -> IO (Stream cell_a)
forall (m :: * -> *) a. Monad m => a -> m a
return Stream cell_a
str
go !t
n (Stream StreamSegment cell_a
_ IORef (NextSegment cell_a)
next) =
IORef (NextSegment cell_a)
-> SegSource cell_a -> Int -> IO (Stream cell_a)
forall cell_a.
IORef (NextSegment cell_a)
-> SegSource cell_a -> Int -> IO (Stream cell_a)
waitingAdvanceStream IORef (NextSegment cell_a)
next SegSource cell_a
segSource (Int
nEW_SEGMENT_WAITInt -> Int -> Int
forall a. Num a => a -> a -> a
*Int
segIx)
IO (Stream cell_a)
-> (Stream cell_a -> IO (Stream cell_a)) -> IO (Stream cell_a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= t -> Stream cell_a -> IO (Stream cell_a)
go (t
nt -> t -> t
forall a. Num a => a -> a -> a
-t
1)
Stream cell_a
str <- Int -> Stream cell_a -> IO (Stream cell_a)
forall t. (Eq t, Num t) => t -> Stream cell_a -> IO (Stream cell_a)
go Int
segsAway Stream cell_a
str0
let !maybeUpdateStreamHead :: IO ()
maybeUpdateStreamHead = do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
segsAway Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
let !offsetN :: Int
offsetN =
Int
offset0 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
segsAway Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftL` Int
lOG_SEGMENT_LENGTH)
IORef (StreamHead cell_a) -> StreamHead cell_a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (StreamHead cell_a)
streamHead (StreamHead cell_a -> IO ()) -> StreamHead cell_a -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Stream cell_a -> StreamHead cell_a
forall cell_a. Int -> Stream cell_a -> StreamHead cell_a
StreamHead Int
offsetN Stream cell_a
str
IORef (StreamHead cell_a) -> IO ()
forall a. IORef a -> IO ()
touchIORef IORef (StreamHead cell_a)
streamHead
(Int, Stream cell_a, IO ()) -> IO (Int, Stream cell_a, IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
segIx,Stream cell_a
str, IO ()
maybeUpdateStreamHead)
waitingAdvanceStream :: IORef (NextSegment cell_a) -> SegSource cell_a
-> Int -> IO (Stream cell_a)
{-# NOINLINE waitingAdvanceStream #-}
waitingAdvanceStream :: IORef (NextSegment cell_a)
-> SegSource cell_a -> Int -> IO (Stream cell_a)
waitingAdvanceStream IORef (NextSegment cell_a)
nextSegRef SegSource cell_a
segSource = Int -> IO (Stream cell_a)
forall t. (Ord t, Num t) => t -> IO (Stream cell_a)
go where
go :: t -> IO (Stream cell_a)
go !t
wait = Bool -> IO (Stream cell_a) -> IO (Stream cell_a)
forall a. HasCallStack => Bool -> a -> a
assert (t
wait t -> t -> Bool
forall a. Ord a => a -> a -> Bool
>= t
0) (IO (Stream cell_a) -> IO (Stream cell_a))
-> IO (Stream cell_a) -> IO (Stream cell_a)
forall a b. (a -> b) -> a -> b
$ do
Ticket (NextSegment cell_a)
tk <- IORef (NextSegment cell_a) -> IO (Ticket (NextSegment cell_a))
forall a. IORef a -> IO (Ticket a)
readForCAS IORef (NextSegment cell_a)
nextSegRef
case Ticket (NextSegment cell_a) -> NextSegment cell_a
forall a. Ticket a -> a
peekTicket Ticket (NextSegment cell_a)
tk of
NextSegment cell_a
NoSegment
| t
wait t -> t -> Bool
forall a. Ord a => a -> a -> Bool
> t
0 -> t -> IO (Stream cell_a)
go (t
wait t -> t -> t
forall a. Num a => a -> a -> a
- t
1)
| Bool
otherwise -> do
Stream cell_a
potentialStrNext <- StreamSegment cell_a -> IORef (NextSegment cell_a) -> Stream cell_a
forall cell_a.
StreamSegment cell_a -> IORef (NextSegment cell_a) -> Stream cell_a
Stream (StreamSegment cell_a
-> IORef (NextSegment cell_a) -> Stream cell_a)
-> SegSource cell_a
-> IO (IORef (NextSegment cell_a) -> Stream cell_a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> SegSource cell_a
segSource
IO (IORef (NextSegment cell_a) -> Stream cell_a)
-> IO (IORef (NextSegment cell_a)) -> IO (Stream cell_a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NextSegment cell_a -> IO (IORef (NextSegment cell_a))
forall a. a -> IO (IORef a)
newIORef NextSegment cell_a
forall cell_a. NextSegment cell_a
NoSegment
(Bool
_,Ticket (NextSegment cell_a)
tkDone) <- IORef (NextSegment cell_a)
-> Ticket (NextSegment cell_a)
-> NextSegment cell_a
-> IO (Bool, Ticket (NextSegment cell_a))
forall a. IORef a -> Ticket a -> a -> IO (Bool, Ticket a)
casIORef IORef (NextSegment cell_a)
nextSegRef Ticket (NextSegment cell_a)
tk (Stream cell_a -> NextSegment cell_a
forall cell_a. Stream cell_a -> NextSegment cell_a
Next Stream cell_a
potentialStrNext)
case Ticket (NextSegment cell_a) -> NextSegment cell_a
forall a. Ticket a -> a
peekTicket Ticket (NextSegment cell_a)
tkDone of
Next Stream cell_a
strNext -> Stream cell_a -> IO (Stream cell_a)
forall (m :: * -> *) a. Monad m => a -> m a
return Stream cell_a
strNext
NextSegment cell_a
_ -> [Char] -> IO (Stream cell_a)
forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible! This should only have been Next segment"
Next Stream cell_a
strNext -> Stream cell_a -> IO (Stream cell_a)
forall (m :: * -> *) a. Monad m => a -> m a
return Stream cell_a
strNext
type SegSource cell_a = IO (StreamSegment cell_a)
newSegmentSource :: cell_a -> IO (SegSource cell_a)
newSegmentSource :: cell_a -> IO (SegSource cell_a)
newSegmentSource cell_a
cell_empty = do
MutableArray RealWorld cell_a
arr <- cell_a -> IO cell_a
forall a. a -> IO a
evaluate cell_a
cell_empty IO cell_a -> (cell_a -> SegSource cell_a) -> SegSource cell_a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Int -> cell_a -> IO (MutableArray (PrimState IO) cell_a)
forall (m :: * -> *) a.
PrimMonad m =>
Int -> a -> m (MutableArray (PrimState m) a)
P.newArray Int
sEGMENT_LENGTH
SegSource cell_a -> IO (SegSource cell_a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MutableArray (PrimState IO) cell_a
-> Int -> Int -> IO (MutableArray (PrimState IO) cell_a)
forall (m :: * -> *) a.
PrimMonad m =>
MutableArray (PrimState m) a
-> Int -> Int -> m (MutableArray (PrimState m) a)
P.cloneMutableArray MutableArray RealWorld cell_a
MutableArray (PrimState IO) cell_a
arr Int
0 Int
sEGMENT_LENGTH)
assertionCanary :: IO Bool
assertionCanary :: IO Bool
assertionCanary = do
Either AssertionFailed ()
assertionsWorking <- IO () -> IO (Either AssertionFailed ())
forall e a. Exception e => IO a -> IO (Either e a)
try (IO () -> IO (Either AssertionFailed ()))
-> IO () -> IO (Either AssertionFailed ())
forall a b. (a -> b) -> a -> b
$ Bool -> IO () -> IO ()
forall a. HasCallStack => Bool -> a -> a
assert Bool
False (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$
case Either AssertionFailed ()
assertionsWorking of
Left (AssertionFailed [Char]
_) -> Bool
True
Either AssertionFailed ()
_ -> Bool
False