module Data.Conduit.Throttle
( Conf
, newConf
, setMeasure
, setInterval
, setMaxThroughput
, setBufferSize
, setEmaAlpha
, throttleProducer
) where
import Conduit
import Control.Concurrent.STM.TBMQueue
import qualified Control.Concurrent.Throttle as Throttle
import Control.Monad.Trans.Resource
import Data.Conduit.Throttle.Internal
import UnliftIO
throttleProducer :: (MonadUnliftIO m, MonadResource m)
=> Conf o
-> ConduitM () o m ()
-> ConduitM () o m ()
throttleProducer conf producer = do
(UnliftIO unlifter) <- lift askUnliftIO
queueIn <- liftIO $ newTBMQueueIO 1024
queueOut <- liftIO $ newTBMQueueIO 1
(_, _) <- allocate
(UnliftIO.async (unlifter (runConduit (producer .| drainConduit queueIn))))
UnliftIO.cancel
let throttleConf = throttleConfPrepare conf
readCallback = atomically (readTBMQueue queueIn)
writeCallback = \case
Just a -> atomically $ writeTBMQueue queueOut a
Nothing -> atomically $ closeTBMQueue queueOut
(_, asyncThrottler) <- allocate
(Throttle.throttle throttleConf readCallback writeCallback)
UnliftIO.cancel
link asyncThrottler
go queueOut
where go queue = do
liftIO (atomically (readTBMQueue queue)) >>= \case
Just a -> yield a >> go queue
Nothing -> return ()
drainConduit queue = do
await >>= \case
Just a -> do liftIO (atomically (writeTBMQueue queue a))
drainConduit queue
Nothing -> liftIO (atomically (closeTBMQueue queue))