{-| Integration in "MonadIO". -} module Potoki.Conduit.MonadIO ( consumeConduit, ) where import Potoki.Conduit.Prelude import Conduit import qualified Potoki.Core.Produce as Produce import qualified Potoki.Core.Transform as Transform import qualified Potoki.Core.Consume as Consume import qualified Potoki.Core.Fetch as Fetch import qualified Potoki.Core.IO as IO {-| Given a Conduit source over a "MonadIO" monad and a Potoki consumer execute the whole pipeline in the base monad of the source. -} consumeConduit :: MonadIO m => ConduitT () o m () -> Consume.Consume o r -> m r consumeConduit conduit consume = do elementChan <- liftIO $ newTBMChanIO 100 resultVar <- liftIO $ newEmptyMVar liftIO $ fork $ do result <- IO.produceAndConsume (Produce.tbmChan elementChan) consume putMVar resultVar result feedConduitToTBMChan conduit elementChan liftIO $ takeMVar resultVar feedConduitToMVar :: MonadIO m => ConduitT () o m () -> MVar (Maybe o) -> m () feedConduitToMVar conduit mvar = do runConduit $ conduit .| mapM_C (liftIO . putMVar mvar . Just) liftIO (putMVar mvar Nothing) feedConduitToTBMChan :: MonadIO m => ConduitT () o m () -> TBMChan o -> m () feedConduitToTBMChan conduit chan = do runConduit $ conduit .| mapM_C (liftIO . atomically . writeTBMChan chan) liftIO $ atomically $ closeTBMChan chan