-- |
-- Module      : Control.Concurrent.Classy.QSemN
-- Copyright   : (c) 2016 Michael Walker
-- License     : MIT
-- Maintainer  : Michael Walker <mike@barrucadu.co.uk>
-- Stability   : stable
-- Portability : portable
--
-- Quantity semaphores in which each thread may wait for an arbitrary
-- \"amount\".
module Control.Concurrent.Classy.QSemN
  ( -- * General Quantity Semaphores
    QSemN
  , newQSemN
  , waitQSemN
  , signalQSemN
  ) where

import           Control.Concurrent.Classy.MVar
import           Control.Monad.Catch            (mask_, onException,
                                                 uninterruptibleMask_)
import           Control.Monad.Conc.Class       (MonadConc)
import           Control.Monad.Fail             (MonadFail)
import           Data.Maybe

-- | 'QSemN' is a quantity semaphore in which the resource is aqcuired
-- and released in units of one. It provides guaranteed FIFO ordering
-- for satisfying blocked `waitQSemN` calls.
--
-- The pattern
--
-- > bracket_ (waitQSemN n) (signalQSemN n) (...)
--
-- is safe; it never loses any of the resource.
--
-- @since 1.0.0.0
newtype QSemN m = QSemN (MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())]))

-- | Build a new 'QSemN' with a supplied initial quantity.
--  The initial quantity must be at least 0.
--
-- @since 1.0.0.0
newQSemN :: (MonadConc m, MonadFail m) => Int -> m (QSemN m)
newQSemN :: Int -> m (QSemN m)
newQSemN Int
initial
  | Int
initial Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0 = String -> m (QSemN m)
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"newQSemN: Initial quantity must be non-negative"
  | Bool
otherwise   = MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())]) -> QSemN m
forall (m :: * -> *).
MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())]) -> QSemN m
QSemN (MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())]) -> QSemN m)
-> m (MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())]))
-> m (QSemN m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> m (MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())]))
forall (m :: * -> *) a. MonadConc m => a -> m (MVar m a)
newMVar (Int
initial, [], [])

-- | Wait for the specified quantity to become available
--
-- @since 1.0.0.0
waitQSemN :: MonadConc m => QSemN m -> Int -> m ()
waitQSemN :: QSemN m -> Int -> m ()
waitQSemN (QSemN MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m) Int
sz = m () -> m ()
forall (m :: * -> *) a. MonadMask m => m a -> m a
mask_ (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  (Int
quantity, [(Int, MVar m ())]
b1, [(Int, MVar m ())]
b2) <- MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
forall (m :: * -> *) a. MonadConc m => MVar m a -> m a
takeMVar MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m
  let remaining :: Int
remaining = Int
quantity Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
sz
  if Int
remaining Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
0
  -- Enqueue and block the thread
  then do
    MVar m ()
b <- m (MVar m ())
forall (m :: * -> *) a. MonadConc m => m (MVar m a)
newEmptyMVar
    MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> (Int, [(Int, MVar m ())], [(Int, MVar m ())]) -> m ()
forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m ()
putMVar MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m (Int
quantity, [(Int, MVar m ())]
b1, (Int
sz,MVar m ()
b)(Int, MVar m ()) -> [(Int, MVar m ())] -> [(Int, MVar m ())]
forall a. a -> [a] -> [a]
:[(Int, MVar m ())]
b2)
    MVar m () -> m ()
wait MVar m ()
b
  -- Claim the resource
  else
    MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> (Int, [(Int, MVar m ())], [(Int, MVar m ())]) -> m ()
forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m ()
putMVar MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m (Int
remaining, [(Int, MVar m ())]
b1, [(Int, MVar m ())]
b2)

  where
    wait :: MVar m () -> m ()
wait MVar m ()
b = MVar m () -> m ()
forall (m :: * -> *) a. MonadConc m => MVar m a -> m a
takeMVar MVar m ()
b m () -> m () -> m ()
forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
`onException` m () -> m ()
forall (m :: * -> *) a. MonadMask m => m a -> m a
uninterruptibleMask_ (do
      (Int
quantity, [(Int, MVar m ())]
b1, [(Int, MVar m ())]
b2) <- MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
forall (m :: * -> *) a. MonadConc m => MVar m a -> m a
takeMVar MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m
      Maybe ()
r  <- MVar m () -> m (Maybe ())
forall (m :: * -> *) a. MonadConc m => MVar m a -> m (Maybe a)
tryTakeMVar MVar m ()
b
      (Int, [(Int, MVar m ())], [(Int, MVar m ())])
r' <- if Maybe () -> Bool
forall a. Maybe a -> Bool
isJust Maybe ()
r
           then Int
-> (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
forall (m :: * -> *).
MonadConc m =>
Int
-> (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
signal Int
sz (Int
quantity, [(Int, MVar m ())]
b1, [(Int, MVar m ())]
b2)
           else MVar m () -> () -> m ()
forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m ()
putMVar MVar m ()
b () m ()
-> m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
quantity, [(Int, MVar m ())]
b1, [(Int, MVar m ())]
b2)
      MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> (Int, [(Int, MVar m ())], [(Int, MVar m ())]) -> m ()
forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m ()
putMVar MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
r')

-- | Signal that a given quantity is now available from the 'QSemN'.
--
-- @since 1.0.0.0
signalQSemN :: MonadConc m => QSemN m -> Int -> m ()
signalQSemN :: QSemN m -> Int -> m ()
signalQSemN (QSemN MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m) Int
sz = m () -> m ()
forall (m :: * -> *) a. MonadMask m => m a -> m a
uninterruptibleMask_ (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  (Int, [(Int, MVar m ())], [(Int, MVar m ())])
r  <- MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
forall (m :: * -> *) a. MonadConc m => MVar m a -> m a
takeMVar MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m
  (Int, [(Int, MVar m ())], [(Int, MVar m ())])
r' <- Int
-> (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
forall (m :: * -> *).
MonadConc m =>
Int
-> (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
signal Int
sz (Int, [(Int, MVar m ())], [(Int, MVar m ())])
r
  MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> (Int, [(Int, MVar m ())], [(Int, MVar m ())]) -> m ()
forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m ()
putMVar MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
r'

-- | Fix the queue and signal as many threads as we can.
signal :: MonadConc m
  => Int
  -> (Int, [(Int,MVar m ())], [(Int,MVar m ())])
  -> m (Int, [(Int,MVar m ())], [(Int,MVar m ())])
signal :: Int
-> (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
signal Int
sz0 (Int
i,[(Int, MVar m ())]
a1,[(Int, MVar m ())]
a2) = Int
-> [(Int, MVar m ())]
-> [(Int, MVar m ())]
-> m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
forall a (f :: * -> *).
(Num a, MonadConc f, Ord a) =>
a
-> [(a, MVar f ())]
-> [(a, MVar f ())]
-> f (a, [(a, MVar f ())], [(a, MVar f ())])
loop (Int
sz0 Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
i) [(Int, MVar m ())]
a1 [(Int, MVar m ())]
a2 where
  -- No more resource left, done.
  loop :: a
-> [(a, MVar f ())]
-> [(a, MVar f ())]
-> f (a, [(a, MVar f ())], [(a, MVar f ())])
loop a
0 [(a, MVar f ())]
bs [(a, MVar f ())]
b2 = (a, [(a, MVar f ())], [(a, MVar f ())])
-> f (a, [(a, MVar f ())], [(a, MVar f ())])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
0,  [(a, MVar f ())]
bs, [(a, MVar f ())]
b2)

  -- Fix the queue
  loop a
sz [] [] = (a, [(a, MVar f ())], [(a, MVar f ())])
-> f (a, [(a, MVar f ())], [(a, MVar f ())])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
sz, [], [])
  loop a
sz [] [(a, MVar f ())]
b2 = a
-> [(a, MVar f ())]
-> [(a, MVar f ())]
-> f (a, [(a, MVar f ())], [(a, MVar f ())])
loop a
sz ([(a, MVar f ())] -> [(a, MVar f ())]
forall a. [a] -> [a]
reverse [(a, MVar f ())]
b2) []

  -- Signal as many threads as there is enough resource to satisfy,
  -- stopping as soon as one thread requires more resource than there
  -- is.
  loop a
sz ((a
j,MVar f ()
b):[(a, MVar f ())]
bs) [(a, MVar f ())]
b2
    | a
j a -> a -> Bool
forall a. Ord a => a -> a -> Bool
> a
sz = do
      Bool
r <- MVar f () -> f Bool
forall (m :: * -> *) a. MonadConc m => MVar m a -> m Bool
isEmptyMVar MVar f ()
b
      if Bool
r then (a, [(a, MVar f ())], [(a, MVar f ())])
-> f (a, [(a, MVar f ())], [(a, MVar f ())])
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
sz, (a
j,MVar f ()
b)(a, MVar f ()) -> [(a, MVar f ())] -> [(a, MVar f ())]
forall a. a -> [a] -> [a]
:[(a, MVar f ())]
bs, [(a, MVar f ())]
b2)
           else a
-> [(a, MVar f ())]
-> [(a, MVar f ())]
-> f (a, [(a, MVar f ())], [(a, MVar f ())])
loop a
sz [(a, MVar f ())]
bs [(a, MVar f ())]
b2
    | Bool
otherwise = do
      Bool
r <- MVar f () -> () -> f Bool
forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m Bool
tryPutMVar MVar f ()
b ()
      if Bool
r then a
-> [(a, MVar f ())]
-> [(a, MVar f ())]
-> f (a, [(a, MVar f ())], [(a, MVar f ())])
loop (a
sza -> a -> a
forall a. Num a => a -> a -> a
-a
j) [(a, MVar f ())]
bs [(a, MVar f ())]
b2
           else a
-> [(a, MVar f ())]
-> [(a, MVar f ())]
-> f (a, [(a, MVar f ())], [(a, MVar f ())])
loop a
sz [(a, MVar f ())]
bs [(a, MVar f ())]
b2