{-# LANGUAGE BangPatterns , DeriveDataTypeable, CPP , ScopedTypeVariables #-}
{-# LANGUAGE TypeSynonymInstances, FlexibleInstances #-}
module Control.Concurrent.Chan.Unagi.Unboxed.Internal
#ifdef NOT_optimised
{-# WARNING "This library is unlikely to perform well on architectures other than i386/x64/aarch64" #-}
#endif
(sEGMENT_LENGTH
, UnagiPrim(..)
, InChan(..), OutChan(..), ChanEnd(..), Cell, Stream(..), ElementArray(..), SignalIntArray
, readElementArray, writeElementArray
, NextSegment(..), StreamHead(..), segSource
, newChanStarting, writeChan, readChan, readChanOnException
, dupChan, tryReadChan
, moveToNextCell, waitingAdvanceStream, cellEmpty
)
where
import Data.IORef
import Control.Exception
import Control.Monad.Primitive(RealWorld)
import Data.Atomics.Counter.Fat
import Data.Atomics
import qualified Data.Primitive as P
import Control.Monad
import Control.Applicative
import Data.Bits
import GHC.Exts(inline)
import Data.Typeable(Typeable)
import Data.Int(Int8,Int16,Int32,Int64)
import Data.Word
import Utilities
import Control.Concurrent.Chan.Unagi.Constants
import qualified Control.Concurrent.Chan.Unagi.NoBlocking.Types as UT
import Prelude
#if MIN_VERSION_primitive(0,7,0)
import qualified Data.Primitive.Ptr as P
type Addr = P.Ptr Word8
nullAddr :: Addr
nullAddr :: Addr
nullAddr = Addr
forall a. Ptr a
P.nullPtr
#else
import Data.Primitive (Addr, nullAddr)
#endif
newtype InChan a = InChan (ChanEnd a)
deriving Typeable
newtype OutChan a = OutChan (ChanEnd a)
deriving Typeable
instance Eq (InChan a) where
(InChan (ChanEnd AtomicCounter
_ IORef (StreamHead a)
headA)) == :: InChan a -> InChan a -> Bool
== (InChan (ChanEnd AtomicCounter
_ IORef (StreamHead a)
headB))
= IORef (StreamHead a)
headA IORef (StreamHead a) -> IORef (StreamHead a) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (StreamHead a)
headB
instance Eq (OutChan a) where
(OutChan (ChanEnd AtomicCounter
_ IORef (StreamHead a)
headA)) == :: OutChan a -> OutChan a -> Bool
== (OutChan (ChanEnd AtomicCounter
_ IORef (StreamHead a)
headB))
= IORef (StreamHead a)
headA IORef (StreamHead a) -> IORef (StreamHead a) -> Bool
forall a. Eq a => a -> a -> Bool
== IORef (StreamHead a)
headB
data ChanEnd a =
ChanEnd
!AtomicCounter
!(IORef (StreamHead a))
deriving Typeable
data StreamHead a = StreamHead !Int !(Stream a)
newtype ElementArray a = ElementArray (P.MutableByteArray RealWorld)
readElementArray :: (P.Prim a)=> ElementArray a -> Int -> IO a
{-# INLINE readElementArray #-}
readElementArray :: ElementArray a -> Int -> IO a
readElementArray (ElementArray MutableByteArray RealWorld
arr) Int
i = MutableByteArray (PrimState IO) -> Int -> IO a
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MutableByteArray (PrimState m) -> Int -> m a
P.readByteArray MutableByteArray RealWorld
MutableByteArray (PrimState IO)
arr Int
i
writeElementArray :: (P.Prim a)=> ElementArray a -> Int -> a -> IO ()
{-# INLINE writeElementArray #-}
writeElementArray :: ElementArray a -> Int -> a -> IO ()
writeElementArray (ElementArray MutableByteArray RealWorld
arr) Int
i a
a = MutableByteArray (PrimState IO) -> Int -> a -> IO ()
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MutableByteArray (PrimState m) -> Int -> a -> m ()
P.writeByteArray MutableByteArray RealWorld
MutableByteArray (PrimState IO)
arr Int
i a
a
type SignalIntArray = P.MutableByteArray RealWorld
type Cell = Int
cellEmpty, cellWritten, cellBlocking :: Cell
cellEmpty :: Int
cellEmpty = Int
0
cellWritten :: Int
cellWritten = Int
1
cellBlocking :: Int
cellBlocking = Int
2
segSource :: forall a. (UnagiPrim a)=> IO (SignalIntArray, ElementArray a)
{-# INLINE segSource #-}
segSource :: IO (MutableByteArray RealWorld, ElementArray a)
segSource = do
MutableByteArray RealWorld
sigArr <- Int -> Int -> IO (MutableByteArray (PrimState IO))
forall (m :: * -> *).
PrimMonad m =>
Int -> Int -> m (MutableByteArray (PrimState m))
P.newAlignedPinnedByteArray
(Int -> Int
forall a. Prim a => a -> Int
P.sizeOf Int
cellEmpty Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftL` Int
lOG_SEGMENT_LENGTH)
(Int -> Int
forall a. Prim a => a -> Int
P.alignment Int
cellEmpty)
MutableByteArray RealWorld
eArr <- Int -> Int -> IO (MutableByteArray (PrimState IO))
forall (m :: * -> *).
PrimMonad m =>
Int -> Int -> m (MutableByteArray (PrimState m))
P.newAlignedPinnedByteArray
(a -> Int
forall a. Prim a => a -> Int
P.sizeOf (a
forall a. HasCallStack => a
undefined :: a) Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftL` Int
lOG_SEGMENT_LENGTH)
(a -> Int
forall a. Prim a => a -> Int
P.alignment (a
forall a. HasCallStack => a
undefined :: a))
MutableByteArray (PrimState IO) -> Int -> Int -> Int -> IO ()
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MutableByteArray (PrimState m) -> Int -> Int -> a -> m ()
P.setByteArray MutableByteArray RealWorld
MutableByteArray (PrimState IO)
sigArr Int
0 Int
sEGMENT_LENGTH Int
cellEmpty
IO () -> (a -> IO ()) -> Maybe a -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
(MutableByteArray (PrimState IO) -> Int -> Int -> a -> IO ()
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MutableByteArray (PrimState m) -> Int -> Int -> a -> m ()
P.setByteArray MutableByteArray RealWorld
MutableByteArray (PrimState IO)
eArr Int
0 Int
sEGMENT_LENGTH) (Maybe a
forall a. UnagiPrim a => Maybe a
atomicUnicorn :: Maybe a)
(MutableByteArray RealWorld, ElementArray a)
-> IO (MutableByteArray RealWorld, ElementArray a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MutableByteArray RealWorld
sigArr, MutableByteArray RealWorld -> ElementArray a
forall a. MutableByteArray RealWorld -> ElementArray a
ElementArray MutableByteArray RealWorld
eArr)
class (P.Prim a, Eq a)=> UnagiPrim a where
atomicUnicorn :: Maybe a
atomicUnicorn = Maybe a
forall a. Maybe a
Nothing
instance UnagiPrim Char where
atomicUnicorn :: Maybe Char
atomicUnicorn = Char -> Maybe Char
forall a. a -> Maybe a
Just Char
'\1010101'
instance UnagiPrim Float where
atomicUnicorn :: Maybe Float
atomicUnicorn = Float -> Maybe Float
forall a. a -> Maybe a
Just Float
0xDADADA
instance UnagiPrim Int where
atomicUnicorn :: Maybe Int
atomicUnicorn = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
0xDADADA
instance UnagiPrim Int8 where
atomicUnicorn :: Maybe Int8
atomicUnicorn = Int8 -> Maybe Int8
forall a. a -> Maybe a
Just Int8
113
instance UnagiPrim Int16 where
atomicUnicorn :: Maybe Int16
atomicUnicorn = Int16 -> Maybe Int16
forall a. a -> Maybe a
Just Int16
0xDAD
instance UnagiPrim Int32 where
atomicUnicorn :: Maybe Int32
atomicUnicorn = Int32 -> Maybe Int32
forall a. a -> Maybe a
Just Int32
0xDADADA
instance UnagiPrim Word where
atomicUnicorn :: Maybe Word
atomicUnicorn = Word -> Maybe Word
forall a. a -> Maybe a
Just Word
0xDADADA
instance UnagiPrim Word8 where
atomicUnicorn :: Maybe Word8
atomicUnicorn = Word8 -> Maybe Word8
forall a. a -> Maybe a
Just Word8
0xDA
instance UnagiPrim Word16 where
atomicUnicorn :: Maybe Word16
atomicUnicorn = Word16 -> Maybe Word16
forall a. a -> Maybe a
Just Word16
0xDADA
instance UnagiPrim Word32 where
atomicUnicorn :: Maybe Word32
atomicUnicorn = Word32 -> Maybe Word32
forall a. a -> Maybe a
Just Word32
0xDADADADA
instance UnagiPrim Addr where
atomicUnicorn :: Maybe Addr
atomicUnicorn = Addr -> Maybe Addr
forall a. a -> Maybe a
Just Addr
nullAddr
instance UnagiPrim Int64 where
#ifdef IS_64_BIT
atomicUnicorn :: Maybe Int64
atomicUnicorn = Int64 -> Maybe Int64
forall a. a -> Maybe a
Just Int64
0xDADADADADADA
#endif
instance UnagiPrim Word64 where
#ifdef IS_64_BIT
atomicUnicorn :: Maybe Word64
atomicUnicorn = Word64 -> Maybe Word64
forall a. a -> Maybe a
Just Word64
0xDADADADADADA
#endif
instance UnagiPrim Double where
#ifdef IS_64_BIT
atomicUnicorn :: Maybe Double
atomicUnicorn = Double -> Maybe Double
forall a. a -> Maybe a
Just Double
0xDADADADADADA
#endif
data Stream a =
Stream !SignalIntArray
!(ElementArray a)
(IndexedMVar a)
!(IORef (NextSegment a))
data NextSegment a = NoSegment | Next !(Stream a)
newChanStarting :: UnagiPrim a=> Int -> IO (InChan a, OutChan a)
{-# INLINE newChanStarting #-}
newChanStarting :: Int -> IO (InChan a, OutChan a)
newChanStarting !Int
startingCellOffset = do
(MutableByteArray RealWorld
sigArr0,ElementArray a
eArr0) <- IO (MutableByteArray RealWorld, ElementArray a)
forall a.
UnagiPrim a =>
IO (MutableByteArray RealWorld, ElementArray a)
segSource
Stream a
stream <- MutableByteArray RealWorld
-> ElementArray a
-> IndexedMVar a
-> IORef (NextSegment a)
-> Stream a
forall a.
MutableByteArray RealWorld
-> ElementArray a
-> IndexedMVar a
-> IORef (NextSegment a)
-> Stream a
Stream MutableByteArray RealWorld
sigArr0 ElementArray a
eArr0 (IndexedMVar a -> IORef (NextSegment a) -> Stream a)
-> IO (IndexedMVar a) -> IO (IORef (NextSegment a) -> Stream a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (IndexedMVar a)
forall a. IO (IndexedMVar a)
newIndexedMVar IO (IORef (NextSegment a) -> Stream a)
-> IO (IORef (NextSegment a)) -> IO (Stream a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NextSegment a -> IO (IORef (NextSegment a))
forall a. a -> IO (IORef a)
newIORef NextSegment a
forall a. NextSegment a
NoSegment
let end :: IO (ChanEnd a)
end = AtomicCounter -> IORef (StreamHead a) -> ChanEnd a
forall a. AtomicCounter -> IORef (StreamHead a) -> ChanEnd a
ChanEnd
(AtomicCounter -> IORef (StreamHead a) -> ChanEnd a)
-> IO AtomicCounter -> IO (IORef (StreamHead a) -> ChanEnd a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO AtomicCounter
newCounter Int
startingCellOffset
IO (IORef (StreamHead a) -> ChanEnd a)
-> IO (IORef (StreamHead a)) -> IO (ChanEnd a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> StreamHead a -> IO (IORef (StreamHead a))
forall a. a -> IO (IORef a)
newIORef (Int -> Stream a -> StreamHead a
forall a. Int -> Stream a -> StreamHead a
StreamHead Int
startingCellOffset Stream a
stream)
(InChan a -> OutChan a -> (InChan a, OutChan a))
-> IO (InChan a) -> IO (OutChan a) -> IO (InChan a, OutChan a)
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 (,) (ChanEnd a -> InChan a
forall a. ChanEnd a -> InChan a
InChan (ChanEnd a -> InChan a) -> IO (ChanEnd a) -> IO (InChan a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (ChanEnd a)
end) (ChanEnd a -> OutChan a
forall a. ChanEnd a -> OutChan a
OutChan (ChanEnd a -> OutChan a) -> IO (ChanEnd a) -> IO (OutChan a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (ChanEnd a)
end)
dupChan :: InChan a -> IO (OutChan a)
{-# INLINE dupChan #-}
dupChan :: InChan a -> IO (OutChan a)
dupChan (InChan (ChanEnd AtomicCounter
counter IORef (StreamHead a)
streamHead)) = do
StreamHead a
hLoc <- IORef (StreamHead a) -> IO (StreamHead a)
forall a. IORef a -> IO a
readIORef IORef (StreamHead a)
streamHead
IO ()
loadLoadBarrier
Int
wCount <- AtomicCounter -> IO Int
readCounter AtomicCounter
counter
ChanEnd a -> OutChan a
forall a. ChanEnd a -> OutChan a
OutChan (ChanEnd a -> OutChan a) -> IO (ChanEnd a) -> IO (OutChan a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (AtomicCounter -> IORef (StreamHead a) -> ChanEnd a
forall a. AtomicCounter -> IORef (StreamHead a) -> ChanEnd a
ChanEnd (AtomicCounter -> IORef (StreamHead a) -> ChanEnd a)
-> IO AtomicCounter -> IO (IORef (StreamHead a) -> ChanEnd a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO AtomicCounter
newCounter Int
wCount IO (IORef (StreamHead a) -> ChanEnd a)
-> IO (IORef (StreamHead a)) -> IO (ChanEnd a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> StreamHead a -> IO (IORef (StreamHead a))
forall a. a -> IO (IORef a)
newIORef StreamHead a
hLoc)
writeChan :: UnagiPrim a=> InChan a -> a -> IO ()
{-# INLINE writeChan #-}
writeChan :: InChan a -> a -> IO ()
writeChan (InChan ChanEnd a
ce) = \a
a-> IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
(Int
segIx, (Stream MutableByteArray RealWorld
sigArr ElementArray a
eArr IndexedMVar a
mvarIndexed IORef (NextSegment a)
next), IO ()
maybeUpdateStreamHead) <- ChanEnd a -> IO (Int, Stream a, IO ())
forall a. UnagiPrim a => ChanEnd a -> IO (Int, Stream a, IO ())
moveToNextCell ChanEnd a
ce
IO ()
maybeUpdateStreamHead
ElementArray a -> Int -> a -> IO ()
forall a. Prim a => ElementArray a -> Int -> a -> IO ()
writeElementArray ElementArray a
eArr Int
segIx a
a
Int
actuallyWas <- MutableByteArray RealWorld -> Int -> Int -> Int -> IO Int
casByteArrayInt MutableByteArray RealWorld
sigArr Int
segIx Int
cellEmpty Int
cellWritten
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
segIx Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (Stream a) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Stream a) -> IO ()) -> IO (Stream a) -> IO ()
forall a b. (a -> b) -> a -> b
$
IORef (NextSegment a) -> Int -> IO (Stream a)
forall a.
UnagiPrim a =>
IORef (NextSegment a) -> Int -> IO (Stream a)
waitingAdvanceStream IORef (NextSegment a)
next Int
0
case Int
actuallyWas of
Int
0 -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Int
2 -> IndexedMVar a -> Int -> a -> IO ()
forall a. IndexedMVar a -> Int -> a -> IO ()
putMVarIx IndexedMVar a
mvarIndexed Int
segIx a
a
Int
1 -> [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Nearly Impossible! Expected Blocking"
Int
_ -> [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Invalid signal seen in writeChan!"
readSegIxUnmasked :: UnagiPrim a=> (IO a -> IO a) -> (Int, Stream a, IO ()) -> IO a
{-# INLINE readSegIxUnmasked #-}
readSegIxUnmasked :: (IO a -> IO a) -> (Int, Stream a, IO ()) -> IO a
readSegIxUnmasked IO a -> IO a
h =
\(Int
segIx, (Stream MutableByteArray RealWorld
sigArr ElementArray a
eArr IndexedMVar a
mvarIndexed IORef (NextSegment a)
_), IO ()
maybeUpdateStreamHead)-> do
IO ()
maybeUpdateStreamHead
let readBlocking :: IO a
readBlocking = (IO a -> IO a) -> IO a -> IO a
forall a. a -> a
inline IO a -> IO a
h (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ IndexedMVar a -> Int -> IO a
forall a. IndexedMVar a -> Int -> IO a
readMVarIx IndexedMVar a
mvarIndexed Int
segIx
readElem :: IO a
readElem = ElementArray a -> Int -> IO a
forall a. Prim a => ElementArray a -> Int -> IO a
readElementArray ElementArray a
eArr Int
segIx
slowRead :: IO a
slowRead = do
Int
actuallyWas <- MutableByteArray RealWorld -> Int -> Int -> Int -> IO Int
casByteArrayInt MutableByteArray RealWorld
sigArr Int
segIx Int
cellEmpty Int
cellBlocking
case Int
actuallyWas of
Int
0 -> IO a
readBlocking
Int
1 -> IO a
readElem
Int
2 -> IO a
readBlocking
Int
_ -> [Char] -> IO a
forall a. HasCallStack => [Char] -> a
error [Char]
"Invalid signal seen in readSegIxUnmasked!"
case Maybe a
forall a. UnagiPrim a => Maybe a
atomicUnicorn of
Just a
magic -> do
a
el <- IO a
readElem
if (a
el a -> a -> Bool
forall a. Eq a => a -> a -> Bool
/= a
magic)
then a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
el
else IO a
slowRead
Maybe a
Nothing -> IO a
slowRead
tryReadChan :: UnagiPrim a=> OutChan a -> IO (UT.Element a, IO a)
{-# INLINE tryReadChan #-}
tryReadChan :: OutChan a -> IO (Element a, IO a)
tryReadChan (OutChan ChanEnd a
ce) = do
segStuff :: (Int, Stream a, IO ())
segStuff@(Int
segIx, (Stream MutableByteArray RealWorld
sigArr ElementArray a
eArr IndexedMVar a
mvarIndexed IORef (NextSegment a)
_), IO ()
maybeUpdateStreamHead) <- ChanEnd a -> IO (Int, Stream a, IO ())
forall a. UnagiPrim a => ChanEnd a -> IO (Int, Stream a, IO ())
moveToNextCell ChanEnd a
ce
IO ()
maybeUpdateStreamHead
let readElem :: IO a
readElem = ElementArray a -> Int -> IO a
forall a. Prim a => ElementArray a -> Int -> IO a
readElementArray ElementArray a
eArr Int
segIx
slowRead :: IO (Maybe a)
slowRead = do
Int
sig <- MutableByteArray (PrimState IO) -> Int -> IO Int
forall a (m :: * -> *).
(Prim a, PrimMonad m) =>
MutableByteArray (PrimState m) -> Int -> m a
P.readByteArray MutableByteArray RealWorld
MutableByteArray (PrimState IO)
sigArr Int
segIx
case (Int
sig :: Int) of
Int
0 -> Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
Int
1 -> IO ()
loadLoadBarrier IO () -> IO (Maybe a) -> IO (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> IO a -> IO (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO a
readElem
Int
2 -> IndexedMVar a -> Int -> IO (Maybe a)
forall a. IndexedMVar a -> Int -> IO (Maybe a)
tryReadMVarIx IndexedMVar a
mvarIndexed Int
segIx
Int
_ -> [Char] -> IO (Maybe a)
forall a. HasCallStack => [Char] -> a
error [Char]
"Invalid signal seen in tryReadChan!"
(Element a, IO a) -> IO (Element a, IO a)
forall (m :: * -> *) a. Monad m => a -> m a
return (
IO (Maybe a) -> Element a
forall a. IO (Maybe a) -> Element a
UT.Element (IO (Maybe a) -> Element a) -> IO (Maybe a) -> Element a
forall a b. (a -> b) -> a -> b
$
case Maybe a
forall a. UnagiPrim a => Maybe a
atomicUnicorn of
Just a
magic -> do
a
el <- IO a
readElem
if (a
el a -> a -> Bool
forall a. Eq a => a -> a -> Bool
/= a
magic)
then Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
el
else IO (Maybe a)
slowRead
Maybe a
Nothing -> IO (Maybe a)
slowRead
, (IO a -> IO a) -> (Int, Stream a, IO ()) -> IO a
forall a.
UnagiPrim a =>
(IO a -> IO a) -> (Int, Stream a, IO ()) -> IO a
readSegIxUnmasked IO a -> IO a
forall a. a -> a
id (Int, Stream a, IO ())
segStuff
)
readChan :: UnagiPrim a=> OutChan a -> IO a
{-# INLINE readChan #-}
readChan :: OutChan a -> IO a
readChan = \(OutChan ChanEnd a
ce)-> ChanEnd a -> IO (Int, Stream a, IO ())
forall a. UnagiPrim a => ChanEnd a -> IO (Int, Stream a, IO ())
moveToNextCell ChanEnd a
ce IO (Int, Stream a, IO ())
-> ((Int, Stream a, IO ()) -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (IO a -> IO a) -> (Int, Stream a, IO ()) -> IO a
forall a.
UnagiPrim a =>
(IO a -> IO a) -> (Int, Stream a, IO ()) -> IO a
readSegIxUnmasked IO a -> IO a
forall a. a -> a
id
readChanOnException :: UnagiPrim a=> OutChan a -> (IO a -> IO ()) -> IO a
{-# INLINE readChanOnException #-}
readChanOnException :: OutChan a -> (IO a -> IO ()) -> IO a
readChanOnException (OutChan ChanEnd a
ce) IO a -> IO ()
h = IO a -> IO a
forall a. IO a -> IO a
mask_ (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$
ChanEnd a -> IO (Int, Stream a, IO ())
forall a. UnagiPrim a => ChanEnd a -> IO (Int, Stream a, IO ())
moveToNextCell ChanEnd a
ce IO (Int, Stream a, IO ())
-> ((Int, Stream a, IO ()) -> IO a) -> IO a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
(IO a -> IO a) -> (Int, Stream a, IO ()) -> IO a
forall a.
UnagiPrim a =>
(IO a -> IO a) -> (Int, Stream a, IO ()) -> IO a
readSegIxUnmasked (\IO a
io-> IO a
io IO a -> IO () -> IO a
forall a b. IO a -> IO b -> IO a
`onException` (IO a -> IO ()
h IO a
io))
moveToNextCell :: UnagiPrim a=> ChanEnd a -> IO (Int, Stream a, IO ())
{-# INLINE moveToNextCell #-}
moveToNextCell :: ChanEnd a -> IO (Int, Stream a, IO ())
moveToNextCell (ChanEnd AtomicCounter
counter IORef (StreamHead a)
streamHead) = do
(StreamHead Int
offset0 Stream a
str0) <- IORef (StreamHead a) -> IO (StreamHead a)
forall a. IORef a -> IO a
readIORef IORef (StreamHead a)
streamHead
Int
ix <- Int -> AtomicCounter -> IO Int
incrCounter Int
1 AtomicCounter
counter
let (Int
segsAway, Int
segIx) = Bool -> (Int, Int) -> (Int, Int)
forall a. HasCallStack => Bool -> a -> a
assert ((Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset0) Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
0) ((Int, Int) -> (Int, Int)) -> (Int, Int) -> (Int, Int)
forall a b. (a -> b) -> a -> b
$
Int -> (Int, Int)
divMod_sEGMENT_LENGTH (Int -> (Int, Int)) -> Int -> (Int, Int)
forall a b. (a -> b) -> a -> b
$! (Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
offset0)
{-# INLINE go #-}
go :: t -> Stream a -> IO (Stream a)
go t
0 Stream a
str = Stream a -> IO (Stream a)
forall (m :: * -> *) a. Monad m => a -> m a
return Stream a
str
go !t
n (Stream MutableByteArray RealWorld
_ ElementArray a
_ IndexedMVar a
_ IORef (NextSegment a)
next) =
IORef (NextSegment a) -> Int -> IO (Stream a)
forall a.
UnagiPrim a =>
IORef (NextSegment a) -> Int -> IO (Stream a)
waitingAdvanceStream IORef (NextSegment a)
next (Int
nEW_SEGMENT_WAITInt -> Int -> Int
forall a. Num a => a -> a -> a
*Int
segIx)
IO (Stream a) -> (Stream a -> IO (Stream a)) -> IO (Stream a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= t -> Stream a -> IO (Stream a)
go (t
nt -> t -> t
forall a. Num a => a -> a -> a
-t
1)
Stream a
str <- Int -> Stream a -> IO (Stream a)
forall t a.
(Num t, UnagiPrim a, Eq t) =>
t -> Stream a -> IO (Stream a)
go Int
segsAway Stream a
str0
let !maybeUpdateStreamHead :: IO ()
maybeUpdateStreamHead = do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
segsAway Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
let !offsetN :: Int
offsetN =
Int
offset0 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
segsAway Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftL` Int
lOG_SEGMENT_LENGTH)
IORef (StreamHead a) -> StreamHead a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (StreamHead a)
streamHead (StreamHead a -> IO ()) -> StreamHead a -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Stream a -> StreamHead a
forall a. Int -> Stream a -> StreamHead a
StreamHead Int
offsetN Stream a
str
IORef (StreamHead a) -> IO ()
forall a. IORef a -> IO ()
touchIORef IORef (StreamHead a)
streamHead
(Int, Stream a, IO ()) -> IO (Int, Stream a, IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
segIx,Stream a
str, IO ()
maybeUpdateStreamHead)
waitingAdvanceStream :: (UnagiPrim a)=> IORef (NextSegment a) -> Int -> IO (Stream a)
waitingAdvanceStream :: IORef (NextSegment a) -> Int -> IO (Stream a)
waitingAdvanceStream IORef (NextSegment a)
nextSegRef = Int -> IO (Stream a)
forall t. (Ord t, Num t) => t -> IO (Stream a)
go where
go :: t -> IO (Stream a)
go !t
wait = Bool -> IO (Stream a) -> IO (Stream a)
forall a. HasCallStack => Bool -> a -> a
assert (t
wait t -> t -> Bool
forall a. Ord a => a -> a -> Bool
>= t
0) (IO (Stream a) -> IO (Stream a)) -> IO (Stream a) -> IO (Stream a)
forall a b. (a -> b) -> a -> b
$ do
Ticket (NextSegment a)
tk <- IORef (NextSegment a) -> IO (Ticket (NextSegment a))
forall a. IORef a -> IO (Ticket a)
readForCAS IORef (NextSegment a)
nextSegRef
case Ticket (NextSegment a) -> NextSegment a
forall a. Ticket a -> a
peekTicket Ticket (NextSegment a)
tk of
NextSegment a
NoSegment
| t
wait t -> t -> Bool
forall a. Ord a => a -> a -> Bool
> t
0 -> t -> IO (Stream a)
go (t
wait t -> t -> t
forall a. Num a => a -> a -> a
- t
1)
| Bool
otherwise -> do
Stream a
potentialStrNext <- (MutableByteArray RealWorld
-> ElementArray a
-> IndexedMVar a
-> IORef (NextSegment a)
-> Stream a)
-> (MutableByteArray RealWorld, ElementArray a)
-> IndexedMVar a
-> IORef (NextSegment a)
-> Stream a
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry MutableByteArray RealWorld
-> ElementArray a
-> IndexedMVar a
-> IORef (NextSegment a)
-> Stream a
forall a.
MutableByteArray RealWorld
-> ElementArray a
-> IndexedMVar a
-> IORef (NextSegment a)
-> Stream a
Stream
((MutableByteArray RealWorld, ElementArray a)
-> IndexedMVar a -> IORef (NextSegment a) -> Stream a)
-> IO (MutableByteArray RealWorld, ElementArray a)
-> IO (IndexedMVar a -> IORef (NextSegment a) -> Stream a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (MutableByteArray RealWorld, ElementArray a)
forall a.
UnagiPrim a =>
IO (MutableByteArray RealWorld, ElementArray a)
segSource
IO (IndexedMVar a -> IORef (NextSegment a) -> Stream a)
-> IO (IndexedMVar a) -> IO (IORef (NextSegment a) -> Stream a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO (IndexedMVar a)
forall a. IO (IndexedMVar a)
newIndexedMVar
IO (IORef (NextSegment a) -> Stream a)
-> IO (IORef (NextSegment a)) -> IO (Stream a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> NextSegment a -> IO (IORef (NextSegment a))
forall a. a -> IO (IORef a)
newIORef NextSegment a
forall a. NextSegment a
NoSegment
(Bool
_,Ticket (NextSegment a)
tkDone) <- IORef (NextSegment a)
-> Ticket (NextSegment a)
-> NextSegment a
-> IO (Bool, Ticket (NextSegment a))
forall a. IORef a -> Ticket a -> a -> IO (Bool, Ticket a)
casIORef IORef (NextSegment a)
nextSegRef Ticket (NextSegment a)
tk (Stream a -> NextSegment a
forall a. Stream a -> NextSegment a
Next Stream a
potentialStrNext)
case Ticket (NextSegment a) -> NextSegment a
forall a. Ticket a -> a
peekTicket Ticket (NextSegment a)
tkDone of
Next Stream a
strNext -> Stream a -> IO (Stream a)
forall (m :: * -> *) a. Monad m => a -> m a
return Stream a
strNext
NextSegment a
_ -> [Char] -> IO (Stream a)
forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible! This should only have been Next segment"
Next Stream a
strNext -> Stream a -> IO (Stream a)
forall (m :: * -> *) a. Monad m => a -> m a
return Stream a
strNext