{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE BangPatterns #-}
{- |
Module: Data.Queue
Description: A real-time, concurrent, and mutable queue
Copyright: (c) Samuel Schlesinger 2020
License: MIT
Maintainer: sgschlesinger@gmail.com
Stability: experimental
Portability: POSIX, Windows
-}
module Data.Queue
( Queue
, newQueue
, peek
, tryPeek
, enqueue
, dequeue
, tryDequeue
, flush
) where

import Control.Concurrent
import Control.Concurrent.STM

-- | Real time 'Queue' backed by transactional variables ('TVar's)
data Queue a = Queue
  {-# UNPACK #-} !(TVar ([a], [a]))
  {-# UNPACK #-} !(TVar ([a], [a]))

-- | Create a new, empty 'Queue'
newQueue :: STM (Queue a)
newQueue :: forall a. STM (Queue a)
newQueue = forall a. TVar ([a], [a]) -> TVar ([a], [a]) -> Queue a
Queue
  forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> STM (TVar a)
newTVar ([], [])
  forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. a -> STM (TVar a)
newTVar ([], [])

rotate :: [a] -> [a] -> [a]
rotate :: forall a. [a] -> [a] -> [a]
rotate [a]
xs [a]
ys = forall {a}. [a] -> [a] -> [a] -> [a]
go [a]
xs [a]
ys []
  where
  go :: [a] -> [a] -> [a] -> [a]
go [] [a]
bottom [a]
acc = [a]
bottom forall a. [a] -> [a] -> [a]
++ [a]
acc
  go (a
t:[a]
ts) (a
b:[a]
bs) [a]
acc = a
t forall a. a -> [a] -> [a]
: [a] -> [a] -> [a] -> [a]
go [a]
ts [a]
bs (a
bforall a. a -> [a] -> [a]
:[a]
acc)
  go [a]
ts [] [a]
acc = [a]
ts forall a. [a] -> [a] -> [a]
++ [a]
acc

-- | Enqueue a single item onto the 'Queue'.
enqueue :: Queue a -> a -> STM ()
enqueue :: forall a. Queue a -> a -> STM ()
enqueue q :: Queue a
q@(Queue TVar ([a], [a])
top TVar ([a], [a])
bottom) a
a = do
  ([a]
bs, [a]
sbs) <- forall a. TVar a -> STM a
readTVar TVar ([a], [a])
bottom
  let bs' :: [a]
bs' = a
a forall a. a -> [a] -> [a]
: [a]
bs
  case [a]
sbs of
    a
_:a
_:[a]
sbs' -> do
      forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
bottom ([a]
bs', [a]
sbs')
    [a]
_ -> do
      ([a]
ts, [a]
_sts) <- forall a. TVar a -> STM a
readTVar TVar ([a], [a])
top
      let ts' :: [a]
ts' = forall a. [a] -> [a] -> [a]
rotate [a]
ts [a]
bs'
      forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
bottom ([], [a]
ts')
      forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
top ([a]
ts', [a]
ts')

-- | Dequeue a single item onto the 'Queue', 'retry'ing if there is nothing
-- there. This is the motivating use case of this library, allowing a thread to
-- register its interest in the head of a 'Queue' and be woken up by the
-- runtime system to read from the top of that 'Queue' when an item has
-- been made available.
dequeue :: Queue a -> STM a
dequeue :: forall a. Queue a -> STM a
dequeue q :: Queue a
q@(Queue TVar ([a], [a])
top TVar ([a], [a])
bottom) = do
  ([a]
ts, [a]
sts) <- forall a. TVar a -> STM a
readTVar TVar ([a], [a])
top
  case [a]
ts of
    [] -> forall a. STM a
retry
    a
t:[a]
ts' ->
      case [a]
sts of
        a
_:a
_:[a]
sts' -> do
          forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
top ([a]
ts', [a]
sts')
          forall (f :: * -> *) a. Applicative f => a -> f a
pure a
t
        [a]
_ -> do
          ([a]
bs, [a]
_) <- forall a. TVar a -> STM a
readTVar TVar ([a], [a])
bottom
          let !ts'' :: [a]
ts'' = forall a. [a] -> [a] -> [a]
rotate [a]
ts' [a]
bs
          forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
bottom ([], [a]
ts'')
          forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
top ([a]
ts'', [a]
ts'')
          forall (f :: * -> *) a. Applicative f => a -> f a
pure a
t

-- | Try to 'dequeue' a single item. This function is offered to allow
-- users to easily port from the 'TQueue' offered in the stm package,
-- but is not the intended usage of the library.
tryDequeue :: Queue a -> STM (Maybe a)
tryDequeue :: forall a. Queue a -> STM (Maybe a)
tryDequeue q :: Queue a
q@(Queue TVar ([a], [a])
top TVar ([a], [a])
bottom) = do
  ([a]
ts, [a]
sts) <- forall a. TVar a -> STM a
readTVar TVar ([a], [a])
top
  case [a]
ts of
    [] -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
    a
t:[a]
ts' ->
      case [a]
sts of
        a
_:a
_:[a]
sts' -> do
          forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
top ([a]
ts', [a]
sts')
          forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just a
t)
        [a]
_ -> do
          ([a]
bs, [a]
_) <- forall a. TVar a -> STM a
readTVar TVar ([a], [a])
bottom
          let !ts'' :: [a]
ts'' = forall a. [a] -> [a] -> [a]
rotate [a]
ts' [a]
bs
          forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
bottom ([], [a]
ts'')
          forall a. TVar a -> a -> STM ()
writeTVar TVar ([a], [a])
top ([a]
ts'', [a]
ts'')
          forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just a
t)

-- | Peek at the top of the 'Queue', returning the top element.
peek :: Queue a -> STM a
peek :: forall a. Queue a -> STM a
peek (Queue TVar ([a], [a])
top TVar ([a], [a])
_bottom) =
  forall a. TVar a -> STM a
readTVar TVar ([a], [a])
top forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    (a
x : [a]
xs, [a]
_) -> forall (f :: * -> *) a. Applicative f => a -> f a
pure a
x
    ([], [a]
_) -> forall a. STM a
retry

-- | Try to 'peek' for the top item of the 'Queue'. This function is
-- offered to easily port from the 'TQueue' offered in the stm package,
-- but is not the intended usage of the library.
tryPeek :: Queue a -> STM (Maybe a)
tryPeek :: forall a. Queue a -> STM (Maybe a)
tryPeek (Queue TVar ([a], [a])
top TVar ([a], [a])
_bottom) =
  forall a. TVar a -> STM a
readTVar TVar ([a], [a])
top forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    (a
x : [a]
xs, [a]
_) -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> Maybe a
Just a
x)
    ([], [a]
_) -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing  

-- | Efficiently read the entire contents of a 'Queue' into a list.
flush :: Queue a -> STM [a]
flush :: forall a. Queue a -> STM [a]
flush (Queue TVar ([a], [a])
top TVar ([a], [a])
bottom) = do
  ([a]
xs, [a]
_) <- forall a. TVar a -> a -> STM a
swapTVar TVar ([a], [a])
top ([], [])
  ([a]
ys, [a]
_) <- forall a. TVar a -> a -> STM a
swapTVar TVar ([a], [a])
bottom ([], [])
  forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. [a] -> [a] -> [a]
rotate [a]
xs [a]
ys)