module Control.CUtils.Channel (Channel, newChannel, writeChannel, readChannel) where
import Control.Concurrent.MVar
import Control.Monad
import Data.IORef
import Data.Bits
import Data.Word
import Data.List
import Data.Array.MArray
count pred f = fst . head . dropWhile (not . pred . snd) . zip [0..] . iterate f
nBits x = count (==0) (`shiftR` 1) x
data Channel a t = Channel
Word32
(a Word32 t)
(IORef Word32)
(IORef Word32)
(IORef Word32)
(IORef Word32)
(MVar ())
(MVar ())
newChannel :: (MArray a t IO) => Word32 -> IO (Channel a t)
newChannel buffer = do
let buffer' = shiftL 1 (nBits buffer)
a <- newArray_ (0, buffer' 1)
mf <- newIORef 0
f <- newIORef 0
me <- newIORef 0
e <- newIORef 0
fl <- newMVar ()
el <- newEmptyMVar
return (Channel buffer' a mf f me e fl el)
increment ref = atomicModifyIORef ref (\x -> (x + 1, x))
alg buffer off a mx x my y lx ly f = do
mXN <- increment mx
let spin = do
yN <- readIORef y
if yN + off <= mXN then do
mYN <- readIORef my
when (yN == mYN) $ takeMVar lx
spin
else do
val <- f a (mXN `mod` buffer)
increment x
tryPutMVar ly ()
return val
spin
writeChannel (Channel buffer a mf f me e fl el) x = alg buffer buffer a mf f me e fl el (\a i -> writeArray a i x)
readChannel (Channel buffer a mf f me e fl el) = alg buffer 0 a me e mf f el fl readArray