{-# LANGUAGE DeriveDataTypeable #-}
-- | 
-- Module      :  Control.Concurrent.MSemN2
-- Copyright   :  (c) Chris Kuklewicz 2011
-- License     :  3 clause BSD-style (see the file LICENSE)
-- 
-- Maintainer  :  haskell@list.mightyreason.com
-- Stability   :  experimental
-- Portability :  non-portable (concurrency)
--
-- Quantity semaphores in which each thread may wait for an arbitrary amount.  This modules is
-- intended to improve on "Control.Concurrent.QSemN".
-- 
-- This semaphore gracefully handles threads which die while blocked waiting for quantity.  The
-- fairness guarantee is that blocked threads are FIFO.  An early thread waiting for a large
-- quantity will prevent a later thread waiting for a small quantity from jumping the queue.
--
-- If 'with' is used to guard a critical section then no quantity of the semaphore will be lost
-- if the activity throws an exception.
--
-- The functions below are generic in (Integral i) with specialization to Int and Integer.
--
-- Overflow warning: These operations do not check for overflow errors.  If the Integral type is too
-- small to accept the new total then the behavior of these operations is undefined.  Using (MSem
-- Integer) prevents the possibility of an overflow error.
module Control.Concurrent.MSemN2
    (MSemN
    ,new
    ,with
    ,wait
    ,signal
    ,withF
    ,waitF
    ,signalF
    ,peekAvail
    ) where

import Prelude( Integral,Eq,IO,Int,Integer,Maybe(Just,Nothing),Num((+),(-)),Bool(False,True)
              , return,const,fmap,snd,seq
              , (.),(<=),($),($!) )
import Control.Concurrent.MVar( MVar
                              , withMVar,modifyMVar,newMVar
                              , newEmptyMVar,tryPutMVar,takeMVar,tryTakeMVar )
import Control.Exception(bracket,bracket_,uninterruptibleMask_,evaluate,mask_)
import Control.Monad(when,void)
import Data.Maybe(fromMaybe)
import Data.Typeable(Typeable)
import Data.Word(Word)

{- 

The only MVars allocated are the three created be 'new'.  Their three roles are
1) to have a FIFO queue of waiters
2) for the head waiter to block on
3) to protect the quantity state of the semaphore and the head waiter

-}

-- MS has an invariant that "maybe True (> avail) headWants" is always True.
data MS i = MS { avail :: !i             -- ^ This is the quantity available to be taken from the semaphore.
               , headWants :: !(Maybe i) -- ^ If there is waiter then this is Just the amount being waited for.
               }
  deriving (Eq,Typeable)

-- | A 'MSemN' is a quantity semaphore, in which the available quantity may be signalled or
-- waited for in arbitrary amounts.
data MSemN i = MSemN { quantityStore :: !(MVar (MS i))  -- ^ Used to lock access to state of semaphore quantity.
                     , queueWait :: !(MVar ()) -- ^ Used as FIFO queue for waiter, held by head of queue.
                     , headWait :: !(MVar i)  -- ^ The head of the waiter queue blocks on headWait.
                     }
  deriving (Eq,Typeable)

-- |'new' allows positive, zero, and negative initial values.  The initial value is forced here to
-- better localize errors.
new :: Integral i => i -> IO (MSemN i)
{-# SPECIALIZE new :: Int -> IO (MSemN Int) #-}
{-# SPECIALIZE new :: Word -> IO (MSemN Word) #-}
{-# SPECIALIZE new :: Integer -> IO (MSemN Integer) #-}
new initial = do
  newMS <- newMVar $! (MS { avail = initial  -- this forces 'initial'
                          , headWants = Nothing })
  newQueueWait <- newMVar ()
  newHeadWait <- newEmptyMVar
  return (MSemN { quantityStore = newMS
                , queueWait = newQueueWait
                , headWait = newHeadWait })

-- | 'with' takes a quantity of the semaphore to take and hold while performing the provided
-- operation.  'with' ensures the quantity of the sempahore cannot be lost if there are exceptions.
-- This uses 'bracket' to ensure 'wait' and 'signal' get called correctly.
with :: Integral i => MSemN i -> i -> IO a -> IO a
{-# SPECIALIZE with :: MSemN Int -> Int -> IO a -> IO a #-}
{-# SPECIALIZE with :: MSemN Word -> Word -> IO a -> IO a #-}
{-# SPECIALIZE with :: MSemN Integer -> Integer -> IO a -> IO a #-}
with m wanted = seq wanted $ bracket_ (wait m wanted)  (uninterruptibleMask_ $ signal m wanted)

-- | 'withF' takes a pure function and an operation.  The pure function converts the available
-- quantity to a pair of the wanted quantity and a returned value.  The operation takes the result
-- of the pure function.  'withF' ensures the quantity of the sempahore cannot be lost if there
-- are exceptions.  This uses 'bracket' to ensure 'waitF' and 'signal' get called correctly.
--
-- Note: A long running pure function will block all other access to the 'MSemN' while it is
-- evaluated.
withF :: Integral i 
      => MSemN i
      -> (i -> (i,b))
      -> ((i,b) -> IO a)
      -> IO a
{-# SPECIALIZE withF :: MSemN Int -> (Int -> (Int,b)) -> ((Int,b) -> IO a) -> IO a #-}
{-# SPECIALIZE withF :: MSemN Word -> (Word -> (Word,b)) -> ((Word,b) -> IO a) -> IO a #-}
{-# SPECIALIZE withF :: MSemN Integer -> (Integer -> (Integer,b)) -> ((Integer,b) -> IO a) -> IO a #-}
withF m f = bracket (waitF m f)  (\(wanted,_) -> uninterruptibleMask_ $ signal m wanted)

-- |'wait' allow positive, zero, and negative wanted values.  Waiters may block, and will be handled
-- fairly in FIFO order.
--
-- If 'wait' returns without interruption then it left the 'MSemN' with a remaining quantity that was
-- greater than or equal to zero.  If 'wait' is interrupted then no quantity is lost.  If 'wait'
-- returns without interruption then it is known that each earlier waiter has definitely either been
-- interrupted or has retured without interruption.
wait :: Integral i => MSemN i -> i -> IO ()
{-# SPECIALIZE wait :: MSemN Int -> Int -> IO () #-}
{-# SPECIALIZE wait :: MSemN Word -> Word -> IO () #-}
{-# SPECIALIZE wait :: MSemN Integer -> Integer -> IO () #-}
wait m wanted = seq wanted $ fmap snd $ waitF m (const (wanted,()))

-- | 'waitWith' takes the 'MSemN' and a pure function that takes the available quantity and computes the
-- amount wanted and a second value.  The value wanted is stricly evaluated but the second value is
-- returned lazily.
--
-- 'waitF' allow positive, zero, and negative wanted values.  Waiters may block, and will be handled
-- fairly in FIFO order.
--
-- If 'waitF' returns without interruption then it left the 'MSemN' with a remaining quantity that was
-- greater than or equal to zero.  If 'waitF' or the provided function are interrupted then no
-- quantity is lost.  If 'waitF' returns without interruption then it is known that each previous
-- waiter has each definitely either been interrupted or has retured without interruption.
--
-- Note: A long running pure function will block all other access to the 'MSemN' while it is
-- evaluated.
waitF :: Integral i => MSemN i -> (i -> (i,b)) -> IO (i,b)
{-# SPECIALIZE waitF :: MSemN Int -> (Int -> (Int,b)) -> IO (Int,b) #-}
{-# SPECIALIZE waitF :: MSemN Word -> (Word -> (Word,b)) -> IO (Word,b) #-}
{-# SPECIALIZE waitF :: MSemN Integer -> (Integer -> (Integer,b)) -> IO (Integer,b) #-}
waitF m f = seq f $ mask_ . withMVar (queueWait m) $ \ () -> do
  (out,mustWait) <- modifyMVar (quantityStore m) $ \ ms -> do
    -- Assume: ((headWait is empty) OR (headWants is Nothing))
    -- Nothing in this scope can block
    --
    -- headWait might be full here if the predecessor waitF blocked and died and signal (tried to)
    -- feed it.
    recovered <- fmap (fromMaybe 0) (tryTakeMVar (headWait m))
    let total = avail ms + recovered
        outVal@(wantedVal,_) = f total
    if wantedVal <= total  -- forces wantedVal
      then do
        ms' <- evaluate MS { avail = total - wantedVal, headWants = Nothing }
        return (ms', (outVal,False))
      else do
        ms' <- evaluate MS { avail = total, headWants = Just wantedVal }
        return (ms', (outVal,True))
  -- quantityStore is now released, queueWait is still held, race with signal now possible
  -- Assert: (headWait is empty) AND (mustWait == (headWants is Just)) at release
  -- Proof: tryTakeMVar forced (headWait is empty), and
  --        the if-then-else branches ensured (mustWait == (headWants is Just))
  -- This assertion implies ((headWait is empty) OR (headWants is Nothing)) invariant holds (point X)
  when mustWait (void (takeMVar (headWait m)))
  return out
  -- Invariant: ((headWait is empty) OR (headWants is Nothing))
  -- Proof: 1) mustWait was false
  --           nothing happened since (point X) except perhaps race with signal
  --           signal maintained invariant
  --   2) mustWait was true
  --   2a) takeMVar succeeded so headWait became full since (point X)
  --         this implies signal filled headWait and thus signal ended with (headWait is full)
  --         signal invariant ((headWait is empty) OR (headWants is Nothing)) implies (headWants is Nothing) was set
  --          (headWait is empty) by takeMVar and (headWants is Nothing) by implication
  --   2b) takeMVar was interrupted and thus did nothing
  --         nothing happened since (point X) except perhaps race with signal
  --         signal maintained invariant

-- |'signal' allows positive, zero, and negative values, thus this is also way to remove quantity
-- that skips any threads in the 'wait'/'waitF' queue.  If the new total is greater than the next
-- value being waited for (if present) then the first waiter is woken.  If there are queued waiters
-- then the next one will wake after a waiter has proceeded and notice the remaining value; thus a
-- single 'signal' may result in several waiters obtaining values.  Waking waiting threads is
-- asynchronous.
--
-- 'signal' may block, but it cannot be interrupted, which allows it to dependably restore value to
-- the 'MSemN'.  All 'signal', 'signalF', 'peekAvail', and the head waiter may momentarily block in a
-- fair FIFO manner.
signal :: Integral i => MSemN i -> i -> IO ()
{-# SPECIALIZE signal :: MSemN Int -> Int -> IO () #-}
{-# SPECIALIZE signal :: MSemN Word -> Word -> IO () #-}
{-# SPECIALIZE signal :: MSemN Integer -> Integer -> IO () #-}
signal _ 0 = return () -- this case forces 'size'
signal m size = fmap snd $ signalF m (const (size,()))

-- | Instead of providing a fixed change to the available quantity, 'signalF' applies a provided
-- pure function to the available quantity to compute the change and a second value.  The
-- requested change is stricly evaluated but the second value is returned lazily.  If the new total is
-- greater than the next value being waited for then the first waiter is woken.  If there are queued
-- waiters then the next one will wake after a waiter has proceeded and notice the remaining value;
-- thus a single 'signalF' may result in several waiters obtaining values.  Waking waiting threads
-- is asynchronous.
--
-- 'signalF' may block, and it can be safely interrupted.  If the provided function throws an error
-- or is interrupted then it leaves the 'MSemN' unchanged.  All 'signal', 'signalF', 'peekAvail', and
-- the head waiter may momentarily block in a fair FIFO manner.
--
-- Note: A long running pure function will block all other access to the 'MSemN' while it is
-- evaluated.
signalF :: Integral i
        => MSemN i
        -> (i -> (i,b))
        -> IO (i,b)
{-# SPECIALIZE signalF :: MSemN Int -> (Int -> (Int,b)) -> IO (Int,b) #-}
{-# SPECIALIZE signalF :: MSemN Word -> (Word -> (Word,b)) -> IO (Word,b) #-}
{-# SPECIALIZE signalF :: MSemN Integer -> (Integer -> (Integer,b)) -> IO (Integer,b) #-}
signalF m f = seq f $ mask_ . modifyMVar (quantityStore m) $ \ ms -> do
  -- Assume: ((headWait is empty) OR (headWants is Nothing))
  -- Nothing in this scope can block
  let out@(size,_) = f (avail ms)
  ms' <- case headWants ms of
           Nothing -> evaluate ms { avail = avail ms + size }
           Just wantedVal -> do
             -- Because headWants is Just _ the assumption implies headWait is empty
             let total = avail ms + size
             if wantedVal <= total
                then do
                  _didPlace <- tryPutMVar (headWait m) wantedVal -- _didPlace is always True
                  evaluate MS { avail = total - wantedVal, headWants = Nothing }
                else do
                  evaluate ms { avail = total }
  return (ms',out)
  -- Invariant: ((headWait is empty) OR (headWants is Nothing))
  -- Proof: Assume invariant originally holds when taking quantityStore
  --   1) headWants originally Nothing, headWants and headWait unchanged, invariant still holds
  --   2) headWants originally Just _ implies, by assumption, that (headWait is empty)
  --      if-then-branch: headWants changed to Nothing and headWait changed to filled, invariant satisfied
  --      if-else-branch: headWants and headWait unchanged, invariant still holds

-- | 'peekAvail' skips the queue of any blocked 'wait' and 'waitF' threads, but may momentarily
-- block on 'signal', 'signalF', other 'peekAvail', and the head waiter. This returns the amount of
-- value available to be taken.  Using this value without producing unwanted race conditions is left
-- up to the programmer.
--
-- 'peekAvail' is an optimized form of \"signalF m (\x -> (0,x))\".
--
-- Quantity that has been passed to a blocked waiter but not picked up is not counted.  If the
-- blocked waiter is killed before picking it up then the passed quantity will be recovered by the
-- next waiter.  In this exceptional case this next waiter may see an available total that is
-- different than returned by peekAvail.
--
-- A version of 'peekAvail' that joins the FIFO queue of 'wait' and 'waitF' can be acheived by
-- \"waitF m (\x -> (0,x))\" but this will block if x is negative.  On the other hand this method
-- will see the total including any recovered quantity.
peekAvail :: Integral i => MSemN i -> IO i
{-# SPECIALIZE peekAvail :: MSemN Int -> IO Int #-}
{-# SPECIALIZE peekAvail :: MSemN Word -> IO Word #-}
{-# SPECIALIZE peekAvail :: MSemN Integer -> IO Integer #-}
peekAvail m = withMVar (quantityStore m) (return . avail)