{-# LANGUAGE FlexibleContexts #-}

-- | A lock-free channel (queue) data structure.
module Control.CUtils.Channel (Channel, newChannel, writeChannel, readChannel) where

import Control.Concurrent.MVar
import Control.Monad
import Data.IORef
import Data.Bits
import Data.Word
import Data.List
import Data.Array.MArray

count pred f = fst . head . dropWhile (not . pred . snd) . zip [0..] . iterate f

nBits x = count (==0) (`shiftR` 1) x

data Channel a t = Channel
	Word32
	(a Word32 t)
	(IORef Word32){-maybe filled-}
	(IORef Word32){-filled-}
	(IORef Word32){-maybe empty-}
	(IORef Word32){-empty-}
	(MVar ()){-full lock-}
	(MVar ()){-empty lock-}

-- | Create a channel with a buffer at least as big as 'buffer'.
newChannel :: (MArray a t IO) => Word32 -> IO (Channel a t)
newChannel buffer = do
	-- Adjust the buffer up to the next power of two.
	let buffer' = shiftL 1 (nBits buffer)

	-- Create array
	a <- newArray_ (0, buffer' - 1)
	
	-- Create indices
	mf <- newIORef 0
	f <- newIORef 0
	me <- newIORef 0
	e <- newIORef 0

	-- Create locks
	fl <- newMVar ()
	el <- newEmptyMVar

	return (Channel buffer' a mf f me e fl el)

increment ref = atomicModifyIORef ref (\x -> (x + 1, x))

alg buffer off a mx x my y lx ly f = do
	mXN <- increment mx
	let spin = do
		yN <- readIORef y
		if yN + off <= mXN then do
				mYN <- readIORef my
				when (yN == mYN) $ takeMVar lx
				spin
			else do
				val <- f a (mXN `mod` buffer)
				increment x
				tryPutMVar ly ()
				return val
	spin

-- | Write into the channel, blocking when the buffer is full.
writeChannel (Channel buffer a mf f me e fl el) x = alg buffer buffer a mf f me e fl el (\a i -> writeArray a i x)

-- | Read from the channel, blocking when the buffer is empty.
readChannel (Channel buffer a mf f me e fl el) = alg buffer 0 a me e mf f el fl readArray