-- |
-- Module      : Control.Concurrent.Classy.QSemN
-- Copyright   : (c) 2016 Michael Walker
-- License     : MIT
-- Maintainer  : Michael Walker <mike@barrucadu.co.uk>
-- Stability   : stable
-- Portability : portable
--
-- Quantity semaphores in which each thread may wait for an arbitrary
-- \"amount\".
module Control.Concurrent.Classy.QSemN
  ( -- * General Quantity Semaphores
    QSemN
  , newQSemN
  , waitQSemN
  , signalQSemN
  ) where

import           Control.Concurrent.Classy.MVar
import           Control.Monad.Catch            (mask_, onException,
                                                 uninterruptibleMask_)
import           Control.Monad.Conc.Class       (MonadConc)
import           Control.Monad.Fail             (MonadFail)
import           Data.Maybe

-- | 'QSemN' is a quantity semaphore in which the resource is aqcuired
-- and released in units of one. It provides guaranteed FIFO ordering
-- for satisfying blocked `waitQSemN` calls.
--
-- The pattern
--
-- > bracket_ (waitQSemN n) (signalQSemN n) (...)
--
-- is safe; it never loses any of the resource.
--
-- @since 1.0.0.0
newtype QSemN m = QSemN (MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())]))

-- | Build a new 'QSemN' with a supplied initial quantity.
--  The initial quantity must be at least 0.
--
-- @since 1.0.0.0
newQSemN :: (MonadConc m, MonadFail m) => Int -> m (QSemN m)
newQSemN :: forall (m :: * -> *).
(MonadConc m, MonadFail m) =>
Int -> m (QSemN m)
newQSemN Int
initial
  | Int
initial forall a. Ord a => a -> a -> Bool
< Int
0 = forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"newQSemN: Initial quantity must be non-negative"
  | Bool
otherwise   = forall (m :: * -> *).
MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())]) -> QSemN m
QSemN forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadConc m => a -> m (MVar m a)
newMVar (Int
initial, [], [])

-- | Wait for the specified quantity to become available
--
-- @since 1.0.0.0
waitQSemN :: MonadConc m => QSemN m -> Int -> m ()
waitQSemN :: forall (m :: * -> *). MonadConc m => QSemN m -> Int -> m ()
waitQSemN (QSemN MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m) Int
sz = forall (m :: * -> *) a. MonadMask m => m a -> m a
mask_ forall a b. (a -> b) -> a -> b
$ do
  (Int
quantity, [(Int, MVar m ())]
b1, [(Int, MVar m ())]
b2) <- forall (m :: * -> *) a. MonadConc m => MVar m a -> m a
takeMVar MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m
  let remaining :: Int
remaining = Int
quantity forall a. Num a => a -> a -> a
- Int
sz
  if Int
remaining forall a. Ord a => a -> a -> Bool
< Int
0
  -- Enqueue and block the thread
  then do
    MVar m ()
b <- forall (m :: * -> *) a. MonadConc m => m (MVar m a)
newEmptyMVar
    forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m ()
putMVar MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m (Int
quantity, [(Int, MVar m ())]
b1, (Int
sz,MVar m ()
b)forall a. a -> [a] -> [a]
:[(Int, MVar m ())]
b2)
    MVar m () -> m ()
wait MVar m ()
b
  -- Claim the resource
  else
    forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m ()
putMVar MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m (Int
remaining, [(Int, MVar m ())]
b1, [(Int, MVar m ())]
b2)

  where
    wait :: MVar m () -> m ()
wait MVar m ()
b = forall (m :: * -> *) a. MonadConc m => MVar m a -> m a
takeMVar MVar m ()
b forall (m :: * -> *) a b. MonadCatch m => m a -> m b -> m a
`onException` forall (m :: * -> *) a. MonadMask m => m a -> m a
uninterruptibleMask_ (do
      (Int
quantity, [(Int, MVar m ())]
b1, [(Int, MVar m ())]
b2) <- forall (m :: * -> *) a. MonadConc m => MVar m a -> m a
takeMVar MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m
      Maybe ()
r  <- forall (m :: * -> *) a. MonadConc m => MVar m a -> m (Maybe a)
tryTakeMVar MVar m ()
b
      (Int, [(Int, MVar m ())], [(Int, MVar m ())])
r' <- if forall a. Maybe a -> Bool
isJust Maybe ()
r
           then forall (m :: * -> *).
MonadConc m =>
Int
-> (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
signal Int
sz (Int
quantity, [(Int, MVar m ())]
b1, [(Int, MVar m ())]
b2)
           else forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m ()
putMVar MVar m ()
b () forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (f :: * -> *) a. Applicative f => a -> f a
pure (Int
quantity, [(Int, MVar m ())]
b1, [(Int, MVar m ())]
b2)
      forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m ()
putMVar MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
r')

-- | Signal that a given quantity is now available from the 'QSemN'.
--
-- @since 1.0.0.0
signalQSemN :: MonadConc m => QSemN m -> Int -> m ()
signalQSemN :: forall (m :: * -> *). MonadConc m => QSemN m -> Int -> m ()
signalQSemN (QSemN MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m) Int
sz = forall (m :: * -> *) a. MonadMask m => m a -> m a
uninterruptibleMask_ forall a b. (a -> b) -> a -> b
$ do
  (Int, [(Int, MVar m ())], [(Int, MVar m ())])
r  <- forall (m :: * -> *) a. MonadConc m => MVar m a -> m a
takeMVar MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m
  (Int, [(Int, MVar m ())], [(Int, MVar m ())])
r' <- forall (m :: * -> *).
MonadConc m =>
Int
-> (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
signal Int
sz (Int, [(Int, MVar m ())], [(Int, MVar m ())])
r
  forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m ()
putMVar MVar m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
r'

-- | Fix the queue and signal as many threads as we can.
signal :: MonadConc m
  => Int
  -> (Int, [(Int,MVar m ())], [(Int,MVar m ())])
  -> m (Int, [(Int,MVar m ())], [(Int,MVar m ())])
signal :: forall (m :: * -> *).
MonadConc m =>
Int
-> (Int, [(Int, MVar m ())], [(Int, MVar m ())])
-> m (Int, [(Int, MVar m ())], [(Int, MVar m ())])
signal Int
sz0 (Int
i,[(Int, MVar m ())]
a1,[(Int, MVar m ())]
a2) = forall {a} {f :: * -> *}.
(Num a, MonadConc f, Ord a) =>
a
-> [(a, MVar f ())]
-> [(a, MVar f ())]
-> f (a, [(a, MVar f ())], [(a, MVar f ())])
loop (Int
sz0 forall a. Num a => a -> a -> a
+ Int
i) [(Int, MVar m ())]
a1 [(Int, MVar m ())]
a2 where
  -- No more resource left, done.
  loop :: a
-> [(a, MVar f ())]
-> [(a, MVar f ())]
-> f (a, [(a, MVar f ())], [(a, MVar f ())])
loop a
0 [(a, MVar f ())]
bs [(a, MVar f ())]
b2 = forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
0,  [(a, MVar f ())]
bs, [(a, MVar f ())]
b2)

  -- Fix the queue
  loop a
sz [] [] = forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
sz, [], [])
  loop a
sz [] [(a, MVar f ())]
b2 = a
-> [(a, MVar f ())]
-> [(a, MVar f ())]
-> f (a, [(a, MVar f ())], [(a, MVar f ())])
loop a
sz (forall a. [a] -> [a]
reverse [(a, MVar f ())]
b2) []

  -- Signal as many threads as there is enough resource to satisfy,
  -- stopping as soon as one thread requires more resource than there
  -- is.
  loop a
sz ((a
j,MVar f ()
b):[(a, MVar f ())]
bs) [(a, MVar f ())]
b2
    | a
j forall a. Ord a => a -> a -> Bool
> a
sz = do
      Bool
r <- forall (m :: * -> *) a. MonadConc m => MVar m a -> m Bool
isEmptyMVar MVar f ()
b
      if Bool
r then forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
sz, (a
j,MVar f ()
b)forall a. a -> [a] -> [a]
:[(a, MVar f ())]
bs, [(a, MVar f ())]
b2)
           else a
-> [(a, MVar f ())]
-> [(a, MVar f ())]
-> f (a, [(a, MVar f ())], [(a, MVar f ())])
loop a
sz [(a, MVar f ())]
bs [(a, MVar f ())]
b2
    | Bool
otherwise = do
      Bool
r <- forall (m :: * -> *) a. MonadConc m => MVar m a -> a -> m Bool
tryPutMVar MVar f ()
b ()
      if Bool
r then a
-> [(a, MVar f ())]
-> [(a, MVar f ())]
-> f (a, [(a, MVar f ())], [(a, MVar f ())])
loop (a
szforall a. Num a => a -> a -> a
-a
j) [(a, MVar f ())]
bs [(a, MVar f ())]
b2
           else a
-> [(a, MVar f ())]
-> [(a, MVar f ())]
-> f (a, [(a, MVar f ())], [(a, MVar f ())])
loop a
sz [(a, MVar f ())]
bs [(a, MVar f ())]
b2