module Aws.Kinesis.Client.Internal.Queue
( BoundedCloseableQueue(..)
) where
import Control.Applicative
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Concurrent.STM.TBMChan
import Control.Monad.Unicode
import Numeric.Natural
import Prelude.Unicode
class BoundedCloseableQueue q α | q → α where
newQueue
∷ Natural
→ IO q
closeQueue
∷ q
→ IO ()
writeQueue
∷ q
→ α
→ IO Bool
tryWriteQueue
∷ q
→ α
→ IO (Maybe Bool)
readQueue
∷ q
→ IO (Maybe α)
takeQueueTimeout
∷ q
→ Natural
→ Natural
→ IO [α]
isEmptyQueue
∷ q
→ IO Bool
isClosedQueue
∷ q
→ IO Bool
isClosedAndEmptyQueue
∷ q
→ IO Bool
isClosedAndEmptyQueue q =
(&&) <$> isEmptyQueue q <*> isClosedQueue q
instance BoundedCloseableQueue (TBMQueue a) a where
newQueue =
newTBMQueueIO ∘ fromIntegral
closeQueue =
atomically ∘ closeTBMQueue
writeQueue q a =
atomically $ isClosedTBMQueue q ≫= \case
True → return False
False → True <$ writeTBMQueue q a
tryWriteQueue q a =
atomically $ tryWriteTBMQueue q a ≫= \case
Nothing → return $ Just False
Just False → return Nothing
Just True → return $ Just True
readQueue =
atomically ∘ readTBMQueue
takeQueueTimeout q n timeoutDelay = do
timedOutVar ← registerDelay $ fromIntegral timeoutDelay
let
timeout =
readTVar timedOutVar ≫= check
readItems xs = do
atomically (Left <$> timeout <|> Right <$> readTBMQueue q) ≫= \case
Left _ → return xs
Right Nothing → return xs
Right (Just x) → go (x:xs)
go xs
| length xs ≥ fromIntegral n = return xs
| otherwise = readItems xs
go []
isClosedQueue =
atomically ∘ isClosedTBMQueue
isEmptyQueue =
atomically ∘ isEmptyTBMQueue
isClosedAndEmptyQueue q =
atomically $
(&&) <$> isClosedTBMQueue q <*> isEmptyTBMQueue q
instance BoundedCloseableQueue (TBMChan a) a where
newQueue =
newTBMChanIO ∘ fromIntegral
closeQueue =
atomically ∘ closeTBMChan
writeQueue q a =
atomically $ isClosedTBMChan q ≫= \case
True → return False
False → True <$ writeTBMChan q a
tryWriteQueue q a =
atomically $ tryWriteTBMChan q a ≫= \case
Nothing → return $ Just False
Just False → return Nothing
Just True → return $ Just True
readQueue =
atomically ∘ readTBMChan
isClosedQueue =
atomically ∘ isClosedTBMChan
isEmptyQueue =
atomically ∘ isEmptyTBMChan
isClosedAndEmptyQueue q =
atomically $
(&&) <$> isClosedTBMChan q <*> isEmptyTBMChan q
takeQueueTimeout q n timeoutDelay = do
timedOutVar ← registerDelay $ fromIntegral timeoutDelay
let
timeout =
readTVar timedOutVar ≫= check
readItems xs = do
atomically (Left <$> timeout <|> Right <$> readTBMChan q) ≫= \case
Left _ → return xs
Right Nothing → return xs
Right (Just x) → go (x:xs)
go xs
| length xs ≥ fromIntegral n = return xs
| otherwise = readItems xs
go []