{-# LANGUAGE Rank2Types, KindSignatures #-} -- | Contains a simple source and sink linking together conduits in -- different threads. For extended examples of usage and bottlenecks -- see 'Data.Conduit.TMChan'. -- -- TQueue is an amoritized FIFO queue behaves like TChan, with two -- important differences: -- -- * it's faster (but amortized thus the cost of individual operations -- may vary a lot) -- -- * it doesn't provide equivalent of the dupTChan and cloneTChan -- operations -- -- -- Here is short description of data structures: -- -- * TQueue - unbounded infinite queue -- -- * TBQueue - bounded infinite queue -- -- * TMQueue - unbounded finite (closable) queue -- -- * TBMQueue - bounded finite (closable) queue -- -- Caveats -- -- Infinite operations means that source doesn't know when stream is -- ended so one need to use other methods of finishing stream like -- sending an exception or finish conduit in downstream. -- module Data.Conduit.TQueue ( -- * Connectors -- ** Infinite queues -- $inifinite -- *** TQueue connectors sourceTQueue , sinkTQueue -- *** TBQueue connectors , sourceTBQueue , sinkTBQueue , entangledPair -- ** Closable queues -- *** TMQueue connectors , sourceTMQueue , sinkTMQueue -- *** TBMQueue connectors , sourceTBMQueue , sinkTBMQueue , module Control.Concurrent.STM.TQueue ) where import Control.Concurrent.STM import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TBMQueue import Control.Concurrent.STM.TMQueue import Control.Monad import Control.Monad.IO.Class import Data.Conduit import qualified Data.Conduit.List as CL -- | A simple wrapper around a "TQueue". As data is pushed into the queue, the -- source will read it and pass it down the conduit pipeline. sourceTQueue :: MonadIO m => TQueue a -> ConduitT z a m () sourceTQueue q = forever $ liftSTM (readTQueue q) >>= yield -- | A simple wrapper around a "TQueue". As data is pushed into this sink, it -- will magically begin to appear in the queue. sinkTQueue :: MonadIO m => TQueue a -> ConduitT a z m () sinkTQueue q = CL.mapM_ (liftSTM . writeTQueue q) -- | A simple wrapper around a "TBQueue". As data is pushed into the queue, the -- source will read it and pass it down the conduit pipeline. sourceTBQueue :: MonadIO m => TBQueue a -> ConduitT z a m () sourceTBQueue q = forever $ liftSTM (readTBQueue q) >>= yield -- | A simple wrapper around a "TBQueue". As data is pushed into this sink, it -- will magically begin to appear in the queue. sinkTBQueue :: MonadIO m => TBQueue a -> ConduitT a z m () sinkTBQueue q = CL.mapM_ (liftSTM . writeTBQueue q) -- | A convenience wrapper for creating a source and sink TBQueue of the given -- size at once, without exposing the underlying queue. -- -- Returns release key that can be used for premature close of the communication -- channel, otherwise channel will be closed when the ResourceT scope will be closed. entangledPair :: MonadIO m => Int -> m (ConduitT z a m (), ConduitT a l m ()) entangledPair size = liftM (liftM2 (,) sourceTBQueue sinkTBQueue) $ liftIO $ atomically $ newTBQueue (fromIntegral size) -- | A simple wrapper around a "TMQueue". As data is pushed into the queue, the -- source will read it and pass it down the conduit pipeline. When the -- queue is closed, the source will close also. sourceTMQueue :: MonadIO m => TMQueue a -> ConduitT z a m () sourceTMQueue q = loop where loop = do mx <- liftSTM $ readTMQueue q case mx of Nothing -> return () Just x -> yield x >> loop -- | A simple wrapper around a "TMQueue". As data is pushed into this sink, it -- will magically begin to appear in the queue. sinkTMQueue :: MonadIO m => TMQueue a -> ConduitT a z m () sinkTMQueue q = CL.mapM_ (liftSTM . writeTMQueue q) -- | A simple wrapper around a "TBMQueue". As data is pushed into the queue, the -- source will read it and pass it down the conduit pipeline. When the -- queue is closed, the source will close also. sourceTBMQueue :: MonadIO m => TBMQueue a -> ConduitT z a m () sourceTBMQueue q = loop where loop = do mx <- liftSTM $ readTBMQueue q case mx of Nothing -> return () Just x -> yield x >> loop -- | A simple wrapper around a "TBMQueue". As data is pushed into this sink, it -- will magically begin to appear in the queue. sinkTBMQueue :: MonadIO m => TBMQueue a -> ConduitT a z m () sinkTBMQueue q = CL.mapM_ (liftSTM . writeTBMQueue q) liftSTM :: forall (m :: * -> *) a. MonadIO m => STM a -> m a liftSTM = liftIO . atomically -- $infinite -- It's impossible to close infinite queues but they work slightly faster, -- so it's reasonable to use them inside infinite computations for -- performance reasons.