{-# LANGUAGE FlexibleContexts #-} -- | A lock-free channel (queue) data structure. module Control.CUtils.Channel (Channel, newChannel, writeChannel, readChannel) where import Control.Concurrent.SampleVar import Control.Monad import Data.IORef import Data.Bits import Data.Word import Data.List import Data.Array.MArray count pred f = fst . head . dropWhile (not . pred . snd) . zip [0..] . iterate f nBits x = count (==0) (`shiftR` 1) x data Channel a t = Channel Word32 (a Word32 t) (IORef Word32){-maybe filled-} (IORef Word32){-filled-} (IORef Word32){-maybe empty-} (IORef Word32){-empty-} (SampleVar ()){-full lock-} (SampleVar ()){-empty lock-} -- | Create a channel with a buffer at least as big as 'buffer'. newChannel :: (MArray a t IO) => Word32 -> IO (Channel a t) newChannel buffer = do -- Adjust the buffer up to the next power of two. let buffer' = shiftL 1 (nBits buffer) -- Create array a <- newArray_ (0, buffer' - 1) -- Create indices mf <- newIORef 0 f <- newIORef 0 me <- newIORef 0 e <- newIORef 0 -- Create locks fl <- newSampleVar () el <- newEmptySampleVar return (Channel buffer' a mf f me e fl el) increment ref = atomicModifyIORef ref (\x -> (x + 1, x)) alg buffer off a mx x my y lx ly f = do mXN <- increment mx let spin = do yN <- readIORef y if yN + off <= mXN then do mYN <- readIORef my when (yN == mYN) $ readSampleVar lx spin else do val <- f a (mXN `mod` buffer) increment x writeSampleVar ly () return val spin -- | Write into the channel, blocking when the buffer is full. writeChannel (Channel buffer a mf f me e fl el) x = alg buffer buffer a mf f me e fl el (\a i -> writeArray a i x) -- | Read from the channel, blocking when the buffer is empty. readChannel (Channel buffer a mf f me e fl el) = alg buffer 0 a me e mf f el fl readArray