{-# LANGUAGE Trustworthy #-}
{-# LANGUAGE CPP #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Concurrent.Chan
-- Copyright   :  (c) The University of Glasgow 2001
-- License     :  BSD-style (see the file libraries/base/LICENSE)
--
-- Maintainer  :  libraries@haskell.org
-- Stability   :  experimental
-- Portability :  non-portable (concurrency)
--
-- Unbounded channels.
--
-- The channels are implemented with @MVar@s and therefore inherit all the
-- caveats that apply to @MVar@s (possibility of races, deadlocks etc). The
-- stm (software transactional memory) library has a more robust implementation
-- of channels called @TChan@s.
--
-----------------------------------------------------------------------------

module Control.Concurrent.Chan
  (
          -- * The 'Chan' type
        Chan,                   -- abstract

          -- * Operations
        newChan,
        writeChan,
        readChan,
        dupChan,

          -- * Stream interface
        getChanContents,
        writeList2Chan,
   ) where

import System.IO.Unsafe         ( unsafeInterleaveIO )
import Control.Concurrent.MVar
import Control.Exception (mask_)

#define _UPK_(x) {-# UNPACK #-} !(x)

-- A channel is represented by two @MVar@s keeping track of the two ends
-- of the channel contents,i.e.,  the read- and write ends. Empty @MVar@s
-- are used to handle consumers trying to read from an empty channel.

-- |'Chan' is an abstract type representing an unbounded FIFO channel.
data Chan a
 = Chan _UPK_(MVar (Stream a))
        _UPK_(MVar (Stream a)) -- Invariant: the Stream a is always an empty MVar
   deriving Eq -- ^ @since 4.4.0.0

type Stream a = MVar (ChItem a)

data ChItem a = ChItem a _UPK_(Stream a)
  -- benchmarks show that unboxing the MVar here is worthwhile, because
  -- although it leads to higher allocation, the channel data takes up
  -- less space and is therefore quicker to GC.

-- See the Concurrent Haskell paper for a diagram explaining the
-- how the different channel operations proceed.

-- @newChan@ sets up the read and write end of a channel by initialising
-- these two @MVar@s with an empty @MVar@.

-- |Build and returns a new instance of 'Chan'.
newChan :: IO (Chan a)
newChan :: IO (Chan a)
newChan = do
   MVar (ChItem a)
hole  <- IO (MVar (ChItem a))
forall a. IO (MVar a)
newEmptyMVar
   MVar (MVar (ChItem a))
readVar  <- MVar (ChItem a) -> IO (MVar (MVar (ChItem a)))
forall a. a -> IO (MVar a)
newMVar MVar (ChItem a)
hole
   MVar (MVar (ChItem a))
writeVar <- MVar (ChItem a) -> IO (MVar (MVar (ChItem a)))
forall a. a -> IO (MVar a)
newMVar MVar (ChItem a)
hole
   Chan a -> IO (Chan a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (MVar (ChItem a)) -> MVar (MVar (ChItem a)) -> Chan a
forall a. MVar (Stream a) -> MVar (Stream a) -> Chan a
Chan MVar (MVar (ChItem a))
readVar MVar (MVar (ChItem a))
writeVar)

-- To put an element on a channel, a new hole at the write end is created.
-- What was previously the empty @MVar@ at the back of the channel is then
-- filled in with a new stream element holding the entered value and the
-- new hole.

-- |Write a value to a 'Chan'.
writeChan :: Chan a -> a -> IO ()
writeChan :: Chan a -> a -> IO ()
writeChan (Chan _ writeVar :: MVar (Stream a)
writeVar) val :: a
val = do
  Stream a
new_hole <- IO (Stream a)
forall a. IO (MVar a)
newEmptyMVar
  IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Stream a
old_hole <- MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
takeMVar MVar (Stream a)
writeVar
    Stream a -> ChItem a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar Stream a
old_hole (a -> Stream a -> ChItem a
forall a. a -> Stream a -> ChItem a
ChItem a
val Stream a
new_hole)
    MVar (Stream a) -> Stream a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Stream a)
writeVar Stream a
new_hole

-- The reason we don't simply do this:
--
--    modifyMVar_ writeVar $ \old_hole -> do
--      putMVar old_hole (ChItem val new_hole)
--      return new_hole
--
-- is because if an asynchronous exception is received after the 'putMVar'
-- completes and before modifyMVar_ installs the new value, it will set the
-- Chan's write end to a filled hole.

-- |Read the next value from the 'Chan'. Blocks when the channel is empty. Since
-- the read end of a channel is an 'MVar', this operation inherits fairness
-- guarantees of 'MVar's (e.g. threads blocked in this operation are woken up in
-- FIFO order).
--
-- Throws 'Control.Exception.BlockedIndefinitelyOnMVar' when the channel is
-- empty and no other thread holds a reference to the channel.
readChan :: Chan a -> IO a
readChan :: Chan a -> IO a
readChan (Chan readVar :: MVar (Stream a)
readVar _) = do
  MVar (Stream a) -> (Stream a -> IO (Stream a, a)) -> IO a
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar (Stream a)
readVar ((Stream a -> IO (Stream a, a)) -> IO a)
-> (Stream a -> IO (Stream a, a)) -> IO a
forall a b. (a -> b) -> a -> b
$ \read_end :: Stream a
read_end -> do
    (ChItem val :: a
val new_read_end :: Stream a
new_read_end) <- Stream a -> IO (ChItem a)
forall a. MVar a -> IO a
readMVar Stream a
read_end
        -- Use readMVar here, not takeMVar,
        -- else dupChan doesn't work
    (Stream a, a) -> IO (Stream a, a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream a
new_read_end, a
val)

-- |Duplicate a 'Chan': the duplicate channel begins empty, but data written to
-- either channel from then on will be available from both.  Hence this creates
-- a kind of broadcast channel, where data written by anyone is seen by
-- everyone else.
--
-- (Note that a duplicated channel is not equal to its original.
-- So: @fmap (c /=) $ dupChan c@ returns @True@ for all @c@.)
dupChan :: Chan a -> IO (Chan a)
dupChan :: Chan a -> IO (Chan a)
dupChan (Chan _ writeVar :: MVar (Stream a)
writeVar) = do
   Stream a
hole       <- MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
readMVar MVar (Stream a)
writeVar
   MVar (Stream a)
newReadVar <- Stream a -> IO (MVar (Stream a))
forall a. a -> IO (MVar a)
newMVar Stream a
hole
   Chan a -> IO (Chan a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (Stream a) -> MVar (Stream a) -> Chan a
forall a. MVar (Stream a) -> MVar (Stream a) -> Chan a
Chan MVar (Stream a)
newReadVar MVar (Stream a)
writeVar)

-- Operators for interfacing with functional streams.

-- |Return a lazy list representing the contents of the supplied
-- 'Chan', much like 'System.IO.hGetContents'.
getChanContents :: Chan a -> IO [a]
getChanContents :: Chan a -> IO [a]
getChanContents ch :: Chan a
ch
  = IO [a] -> IO [a]
forall a. IO a -> IO a
unsafeInterleaveIO (do
        a
x  <- Chan a -> IO a
forall a. Chan a -> IO a
readChan Chan a
ch
        [a]
xs <- Chan a -> IO [a]
forall a. Chan a -> IO [a]
getChanContents Chan a
ch
        [a] -> IO [a]
forall (m :: * -> *) a. Monad m => a -> m a
return (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
xs)
    )

-- |Write an entire list of items to a 'Chan'.
writeList2Chan :: Chan a -> [a] -> IO ()
writeList2Chan :: Chan a -> [a] -> IO ()
writeList2Chan ch :: Chan a
ch ls :: [a]
ls = [IO ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, Monad m) =>
t (m a) -> m ()
sequence_ ((a -> IO ()) -> [a] -> [IO ()]
forall a b. (a -> b) -> [a] -> [b]
map (Chan a -> a -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan a
ch) [a]
ls)