module Simulation.Aivika.Distributed.Optimistic.Internal.Channel
(Channel,
newChannel,
channelEmpty,
readChannel,
writeChannel,
awaitChannel) where
import Data.List
import Data.IORef
import Control.Concurrent.STM
import Control.Monad
data Channel a =
Channel { channelList :: TVar [a],
channelListEmpty :: TVar Bool,
channelListEmptyIO :: IORef Bool
}
newChannel :: IO (Channel a)
newChannel =
do list <- newTVarIO []
listEmpty <- newTVarIO True
listEmptyIO <- newIORef True
return Channel { channelList = list,
channelListEmpty = listEmpty,
channelListEmptyIO = listEmptyIO }
channelEmpty :: Channel a -> IO Bool
channelEmpty ch =
readIORef (channelListEmptyIO ch)
readChannel :: Channel a -> IO [a]
readChannel ch =
do empty <- readIORef (channelListEmptyIO ch)
if empty
then return []
else do atomicWriteIORef (channelListEmptyIO ch) True
xs <- atomically $
do xs <- readTVar (channelList ch)
writeTVar (channelList ch) []
writeTVar (channelListEmpty ch) True
return xs
return (reverse xs)
writeChannel :: Channel a -> a -> IO ()
writeChannel ch a =
do atomically $
do xs <- readTVar (channelList ch)
writeTVar (channelList ch) (a : xs)
writeTVar (channelListEmpty ch) False
atomicWriteIORef (channelListEmptyIO ch) False
awaitChannel :: Channel a -> IO ()
awaitChannel ch =
atomically $
do empty <- readTVar (channelListEmpty ch)
when empty retry