{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
{-# LANGUAGE CPP, DeriveDataTypeable #-}

#if __GLASGOW_HASKELL__ >= 701
{-# LANGUAGE Trustworthy #-}
#endif

-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Concurrent.STM.TBQueue
-- Copyright   :  (c) The University of Glasgow 2012
-- License     :  BSD-style (see the file libraries/base/LICENSE)
--
-- Maintainer  :  libraries@haskell.org
-- Stability   :  experimental
-- Portability :  non-portable (requires STM)
--
-- 'TBQueue' is a bounded version of 'TQueue'. The queue has a maximum
-- capacity set when it is created.  If the queue already contains the
-- maximum number of elements, then 'writeTBQueue' blocks until an
-- element is removed from the queue.
--
-- The implementation is based on the traditional purely-functional
-- queue representation that uses two lists to obtain amortised /O(1)/
-- enqueue and dequeue operations.
--
-- @since 2.4
-----------------------------------------------------------------------------

module Control.Concurrent.STM.TBQueue (
        -- * TBQueue
        TBQueue,
        newTBQueue,
        newTBQueueIO,
        readTBQueue,
        tryReadTBQueue,
        peekTBQueue,
        tryPeekTBQueue,
        writeTBQueue,
        unGetTBQueue,
        isEmptyTBQueue,
        isFullTBQueue,
  ) where

import Data.Typeable
import GHC.Conc

#define _UPK_(x) {-# UNPACK #-} !(x)

-- | 'TBQueue' is an abstract type representing a bounded FIFO channel.
--
-- @since 2.4
data TBQueue a
   = TBQueue _UPK_(TVar Int)  -- CR: read capacity
             _UPK_(TVar [a])  -- R:  elements waiting to be read
             _UPK_(TVar Int)  -- CW: write capacity
             _UPK_(TVar [a])  -- W:  elements written (head is most recent)
  deriving Typeable

instance Eq (TBQueue a) where
  TBQueue a _ _ _ == TBQueue b _ _ _ = a == b

-- Total channel capacity remaining is CR + CW. Reads only need to
-- access CR, writes usually need to access only CW but sometimes need
-- CR.  So in the common case we avoid contention between CR and CW.
--
--   - when removing an element from R:
--     CR := CR + 1
--
--   - when adding an element to W:
--     if CW is non-zero
--         then CW := CW - 1
--         then if CR is non-zero
--                 then CW := CR - 1; CR := 0
--                 else **FULL**

-- |Build and returns a new instance of 'TBQueue'
newTBQueue :: Int   -- ^ maximum number of elements the queue can hold
           -> STM (TBQueue a)
newTBQueue size = do
  read  <- newTVar []
  write <- newTVar []
  rsize <- newTVar 0
  wsize <- newTVar size
  return (TBQueue rsize read wsize write)

-- |@IO@ version of 'newTBQueue'.  This is useful for creating top-level
-- 'TBQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
-- possible.
newTBQueueIO :: Int -> IO (TBQueue a)
newTBQueueIO size = do
  read  <- newTVarIO []
  write <- newTVarIO []
  rsize <- newTVarIO 0
  wsize <- newTVarIO size
  return (TBQueue rsize read wsize write)

-- |Write a value to a 'TBQueue'; blocks if the queue is full.
writeTBQueue :: TBQueue a -> a -> STM ()
writeTBQueue (TBQueue rsize _read wsize write) a = do
  w <- readTVar wsize
  if (w /= 0)
     then do writeTVar wsize (w - 1)
     else do
          r <- readTVar rsize
          if (r /= 0)
             then do writeTVar rsize 0
                     writeTVar wsize (r - 1)
             else retry
  listend <- readTVar write
  writeTVar write (a:listend)

-- |Read the next value from the 'TBQueue'.
readTBQueue :: TBQueue a -> STM a
readTBQueue (TBQueue rsize read _wsize write) = do
  xs <- readTVar read
  r <- readTVar rsize
  writeTVar rsize (r + 1)
  case xs of
    (x:xs') -> do
      writeTVar read xs'
      return x
    [] -> do
      ys <- readTVar write
      case ys of
        [] -> retry
        _  -> do
          let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be
                                  -- short, otherwise it will conflict
          writeTVar write []
          writeTVar read zs
          return z

-- | A version of 'readTBQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
tryReadTBQueue :: TBQueue a -> STM (Maybe a)
tryReadTBQueue c = fmap Just (readTBQueue c) `orElse` return Nothing

-- | Get the next value from the @TBQueue@ without removing it,
-- retrying if the channel is empty.
peekTBQueue :: TBQueue a -> STM a
peekTBQueue c = do
  x <- readTBQueue c
  unGetTBQueue c x
  return x

-- | A version of 'peekTBQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
tryPeekTBQueue :: TBQueue a -> STM (Maybe a)
tryPeekTBQueue c = do
  m <- tryReadTBQueue c
  case m of
    Nothing -> return Nothing
    Just x  -> do
      unGetTBQueue c x
      return m

-- |Put a data item back onto a channel, where it will be the next item read.
-- Blocks if the queue is full.
unGetTBQueue :: TBQueue a -> a -> STM ()
unGetTBQueue (TBQueue rsize read wsize _write) a = do
  r <- readTVar rsize
  if (r > 0)
     then do writeTVar rsize (r - 1)
     else do
          w <- readTVar wsize
          if (w > 0)
             then writeTVar wsize (w - 1)
             else retry
  xs <- readTVar read
  writeTVar read (a:xs)

-- |Returns 'True' if the supplied 'TBQueue' is empty.
isEmptyTBQueue :: TBQueue a -> STM Bool
isEmptyTBQueue (TBQueue _rsize read _wsize write) = do
  xs <- readTVar read
  case xs of
    (_:_) -> return False
    [] -> do ys <- readTVar write
             case ys of
               [] -> return True
               _  -> return False

-- |Returns 'True' if the supplied 'TBQueue' is full.
--
-- @since 2.4.3
isFullTBQueue :: TBQueue a -> STM Bool
isFullTBQueue (TBQueue rsize _read wsize _write) = do
  w <- readTVar wsize
  if (w > 0)
     then return False
     else do
         r <- readTVar rsize
         if (r > 0)
            then return False
            else return True