module HaskellWorks.Data.Conduit.Combinator where
import Control.Concurrent (MVar, putMVar, tryTakeMVar)
import Control.Monad (void)
import Control.Monad.IO.Class
import Control.Monad.Trans.Class (lift)
import Data.Conduit
import Data.Maybe
import Data.Time.Clock.POSIX as T
import qualified Data.Conduit.List as L
maybeC :: Monad m => ConduitT () () m () -> ConduitT a c m () -> ConduitT (Maybe a) (Maybe c) m ()
maybeC n j = getZipConduit
$ ZipConduit (L.filter isNothing .| L.map (const ()) .| n .| L.map (const Nothing))
<* ZipConduit (L.concat .| j .| L.map Just )
justC :: Monad m => ConduitT a c m () -> ConduitT (Maybe a) (Maybe c) m ()
justC = maybeC (L.map id)
nothingC :: Monad m => ConduitT () () m () -> ConduitT (Maybe a) (Maybe a) m ()
nothingC n = maybeC n (L.map id)
eitherC :: Monad m => ConduitT l a m () -> ConduitT r a m () -> ConduitT (Either l r) a m ()
eitherC l r = getZipConduit
$ ZipConduit (projectLefts .| l)
<* ZipConduit (projectRights .| r)
rightC :: Monad m => ConduitT r a m () -> ConduitT (Either l r) (Either l a) m ()
rightC r = eitherC (L.map Left) (r .| L.map Right)
leftC :: Monad m => ConduitT l a m () -> ConduitT (Either l r) (Either a r) m ()
leftC l = eitherC (l .| L.map Left) (L.map Right)
effectC :: Monad m => (a -> m b) -> ConduitT a a m ()
effectC f = L.mapM (\a -> f a >> return a)
effectC' :: Monad m => m b -> ConduitT a a m ()
effectC' m = L.mapM (\a -> m >> return a)
mvarWriteC :: MonadIO m => MVar a -> ConduitT a Void m ()
mvarWriteC mvar = awaitForever $ \v ->
liftIO $ tryTakeMVar mvar >> putMVar mvar v
mvarWriteMC :: MonadIO m => (a -> b) -> MVar b -> ConduitT a Void m ()
mvarWriteMC f mvar = awaitForever $ \v ->
liftIO $ tryTakeMVar mvar >> putMVar mvar (f v)
mvarWriteSink :: MonadIO m => MVar a -> ConduitT a Void m ()
mvarWriteSink mvar = awaitForever $ \v ->
liftIO $ tryTakeMVar mvar >> putMVar mvar v
sinkWithPred :: Monad m => (a -> Bool) -> ConduitT a Void m () -> ConduitT a Void m () -> ConduitT a Void m ()
sinkWithPred p tr fl =
void $ sequenceSinks [L.filter p .| tr, L.filter (not . p) .| fl]
{-# INLINE sinkWithPred #-}
projectNothings :: Monad m => ConduitT (Maybe a) () m ()
projectNothings = awaitForever $ maybe (yield ()) (const $ return ())
{-# INLINE projectNothings #-}
projectLefts :: Monad m => ConduitT (Either l r) l m ()
projectLefts = awaitForever $ either yield (const $ return ())
{-# INLINE projectLefts #-}
projectRights :: Monad m => ConduitT (Either l r) r m ()
projectRights = awaitForever $ either (const $ return ()) yield
{-# INLINE projectRights #-}
everyN :: Monad m => Int -> ConduitT a a m ()
everyN n = go 1
where
go n' = await >>= maybe (return ()) (\x ->
if n' < n
then go (n'+1)
else yield x >> go 1)
{-# INLINE everyN #-}
onEveryN :: Monad m => Int -> (a -> m b) -> ConduitT a a m ()
onEveryN n f = go 1
where
go i = await >>= maybe (pure ()) (\x ->
if i < n
then yield x >> go (i + 1)
else lift (f x) >> yield x >> go 1)
{-# INLINE onEveryN #-}
onEveryN' :: Monad m => Int -> m b -> ConduitT a a m ()
onEveryN' n m = go 1
where
go i = await >>= maybe (pure ()) (\x ->
if i < n
then yield x >> go (i + 1)
else lift m >> yield x >> go 1)
{-# INLINE onEveryN' #-}
everyNSeconds :: MonadIO m => Int -> ConduitT a a m ()
everyNSeconds interval = go 0
where
go t = do
mmsg <- await
case mmsg of
Nothing -> pure ()
Just msg -> do
ct <- liftIO $ round . T.utcTimeToPOSIXSeconds <$> T.getCurrentTime
if ct > t
then yield msg >> go (ct + interval)
else go t
{-# DEPRECATED effect "Use effectC instead" #-}
effect :: Monad m => (a -> m b) -> ConduitT a a m ()
effect = effectC
effect' :: Monad m => m b -> ConduitT a a m ()
effect' = effectC'
{-# DEPRECATED inJust "Use justC instead" #-}
inJust :: Monad m => ConduitT a c m () -> ConduitT (Maybe a) (Maybe c) m ()
inJust = justC
mvarSink :: MonadIO m => MVar a -> ConduitT a () m ()
mvarSink mvar = awaitForever $ \v ->
liftIO $ tryTakeMVar mvar >> putMVar mvar v
{-# DEPRECATED tapWith "Unsafe. Do not use" #-}
tapWith :: Monad m => ConduitT a b m () -> ConduitT b Void m () -> ConduitT a a m ()
tapWith f s = passthroughSink (f .| s) (const $ return ())
{-# INLINE tapWith #-}
{-# DEPRECATED tap "Unsafe. Do not use" #-}
tap :: Monad m => ConduitT a Void m () -> ConduitT a a m ()
tap s = passthroughSink s (const $ return ())
{-# INLINE tap #-}
{-# DEPRECATED tapPred "Unsafe. Do not use" #-}
tapPred :: Monad m => (a -> Bool) -> ConduitT a Void m () -> ConduitT a Void m () -> ConduitT a a m ()
tapPred p tr fl = tap (L.filter p .| tr) .| tap (L.filter (not . p) .| fl)
{-# INLINE tapPred #-}
{-# DEPRECATED tapNothing "Unsafe. Do not use" #-}
tapNothing :: Monad m => ConduitT () Void m () -> ConduitT (Maybe a) (Maybe a) m ()
tapNothing = tapWith projectNothings
{-# INLINE tapNothing #-}
{-# DEPRECATED divertNothing "Unsafe. Do not use" #-}
divertNothing :: Monad m => ConduitT () Void m () -> ConduitT (Maybe a) a m ()
divertNothing sink = tapNothing sink .| L.catMaybes
{-# INLINE divertNothing #-}
{-# DEPRECATED tapLeft "Unsafe. Do not use" #-}
tapLeft :: Monad m => ConduitT l Void m () -> ConduitT (Either l r) (Either l r) m ()
tapLeft = tapWith projectLefts
{-# INLINE tapLeft #-}
{-# DEPRECATED divertLeft "Unsafe. Do not use" #-}
divertLeft :: Monad m => ConduitT l Void m () -> ConduitT (Either l r) r m ()
divertLeft sink = tapLeft sink .| projectRights
{-# INLINE divertLeft #-}
{-# DEPRECATED tapRight "Unsafe. Do not use" #-}
tapRight :: Monad m => ConduitT r Void m () -> ConduitT (Either l r) (Either l r) m ()
tapRight = tapWith projectRights
{-# INLINE tapRight #-}
{-# DEPRECATED divertRight "Unsafe. Do not use" #-}
divertRight :: Monad m => ConduitT r Void m () -> ConduitT (Either l r) l m ()
divertRight sink = tapRight sink .| projectLefts
{-# INLINE divertRight #-}