module HaskellWorks.Data.Conduit.Combinator where

import Control.Concurrent        (MVar, putMVar, tryTakeMVar)
import Control.Monad             (void)
import Control.Monad.IO.Class
import Control.Monad.Trans.Class (lift)
import Data.Conduit
import Data.Maybe
import Data.Time.Clock.POSIX     as T

import qualified Data.Conduit.List as L

-- | Run the provided conduit in the Just case
maybeC :: Monad m => ConduitT () () m () -> ConduitT a c m () -> ConduitT (Maybe a) (Maybe c) m ()
maybeC :: ConduitT () () m ()
-> ConduitT a c m () -> ConduitT (Maybe a) (Maybe c) m ()
maybeC ConduitT () () m ()
n ConduitT a c m ()
j = ZipConduit (Maybe a) (Maybe c) m ()
-> ConduitT (Maybe a) (Maybe c) m ()
forall i o (m :: * -> *) r. ZipConduit i o m r -> ConduitT i o m r
getZipConduit
  (ZipConduit (Maybe a) (Maybe c) m ()
 -> ConduitT (Maybe a) (Maybe c) m ())
-> ZipConduit (Maybe a) (Maybe c) m ()
-> ConduitT (Maybe a) (Maybe c) m ()
forall a b. (a -> b) -> a -> b
$   ConduitT (Maybe a) (Maybe c) m ()
-> ZipConduit (Maybe a) (Maybe c) m ()
forall i o (m :: * -> *) r. ConduitT i o m r -> ZipConduit i o m r
ZipConduit ((Maybe a -> Bool) -> ConduitT (Maybe a) (Maybe a) m ()
forall (m :: * -> *) a. Monad m => (a -> Bool) -> ConduitT a a m ()
L.filter Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing  ConduitT (Maybe a) (Maybe a) m ()
-> ConduitT (Maybe a) (Maybe c) m ()
-> ConduitT (Maybe a) (Maybe c) m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (Maybe a -> ()) -> ConduitT (Maybe a) () m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
L.map (() -> Maybe a -> ()
forall a b. a -> b -> a
const ()) ConduitT (Maybe a) () m ()
-> ConduitM () (Maybe c) m () -> ConduitT (Maybe a) (Maybe c) m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.|  ConduitT () () m ()
n ConduitT () () m ()
-> ConduitM () (Maybe c) m () -> ConduitM () (Maybe c) m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (() -> Maybe c) -> ConduitM () (Maybe c) m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
L.map (Maybe c -> () -> Maybe c
forall a b. a -> b -> a
const Maybe c
forall a. Maybe a
Nothing))
  ZipConduit (Maybe a) (Maybe c) m ()
-> ZipConduit (Maybe a) (Maybe c) m ()
-> ZipConduit (Maybe a) (Maybe c) m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<*  ConduitT (Maybe a) (Maybe c) m ()
-> ZipConduit (Maybe a) (Maybe c) m ()
forall i o (m :: * -> *) r. ConduitT i o m r -> ZipConduit i o m r
ZipConduit (ConduitT (Maybe a) a m ()
forall (m :: * -> *) (f :: * -> *) a.
(Monad m, Foldable f) =>
ConduitT (f a) a m ()
L.concat            ConduitT (Maybe a) a m ()
-> ConduitM a (Maybe c) m () -> ConduitT (Maybe a) (Maybe c) m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.|                      ConduitT a c m ()
j ConduitT a c m ()
-> ConduitM c (Maybe c) m () -> ConduitM a (Maybe c) m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (c -> Maybe c) -> ConduitM c (Maybe c) m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
L.map c -> Maybe c
forall a. a -> Maybe a
Just           )

-- | Run the provided conduit in the Just case
justC :: Monad m => ConduitT a c m () -> ConduitT (Maybe a) (Maybe c) m ()
justC :: ConduitT a c m () -> ConduitT (Maybe a) (Maybe c) m ()
justC = ConduitT () () m ()
-> ConduitT a c m () -> ConduitT (Maybe a) (Maybe c) m ()
forall (m :: * -> *) a c.
Monad m =>
ConduitT () () m ()
-> ConduitT a c m () -> ConduitT (Maybe a) (Maybe c) m ()
maybeC ((() -> ()) -> ConduitT () () m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
L.map () -> ()
forall a. a -> a
id)

-- | Run the provided conduit in the Just case
nothingC :: Monad m => ConduitT () () m () -> ConduitT (Maybe a) (Maybe a) m ()
nothingC :: ConduitT () () m () -> ConduitT (Maybe a) (Maybe a) m ()
nothingC ConduitT () () m ()
n = ConduitT () () m ()
-> ConduitT a a m () -> ConduitT (Maybe a) (Maybe a) m ()
forall (m :: * -> *) a c.
Monad m =>
ConduitT () () m ()
-> ConduitT a c m () -> ConduitT (Maybe a) (Maybe c) m ()
maybeC ConduitT () () m ()
n ((a -> a) -> ConduitT a a m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
L.map a -> a
forall a. a -> a
id)

-- | Run the provided conduits on the left and right side of the either respectively
eitherC :: Monad m => ConduitT l a m () -> ConduitT r a m () -> ConduitT (Either l r) a m ()
eitherC :: ConduitT l a m ()
-> ConduitT r a m () -> ConduitT (Either l r) a m ()
eitherC ConduitT l a m ()
l ConduitT r a m ()
r = ZipConduit (Either l r) a m () -> ConduitT (Either l r) a m ()
forall i o (m :: * -> *) r. ZipConduit i o m r -> ConduitT i o m r
getZipConduit
  (ZipConduit (Either l r) a m () -> ConduitT (Either l r) a m ())
-> ZipConduit (Either l r) a m () -> ConduitT (Either l r) a m ()
forall a b. (a -> b) -> a -> b
$   ConduitT (Either l r) a m () -> ZipConduit (Either l r) a m ()
forall i o (m :: * -> *) r. ConduitT i o m r -> ZipConduit i o m r
ZipConduit (ConduitT (Either l r) l m ()
forall (m :: * -> *) l r. Monad m => ConduitT (Either l r) l m ()
projectLefts  ConduitT (Either l r) l m ()
-> ConduitT l a m () -> ConduitT (Either l r) a m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT l a m ()
l)
  ZipConduit (Either l r) a m ()
-> ZipConduit (Either l r) a m () -> ZipConduit (Either l r) a m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<*  ConduitT (Either l r) a m () -> ZipConduit (Either l r) a m ()
forall i o (m :: * -> *) r. ConduitT i o m r -> ZipConduit i o m r
ZipConduit (ConduitT (Either l r) r m ()
forall (m :: * -> *) l r. Monad m => ConduitT (Either l r) r m ()
projectRights ConduitT (Either l r) r m ()
-> ConduitT r a m () -> ConduitT (Either l r) a m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT r a m ()
r)

-- | Run the conduit on the right side of the either
rightC :: Monad m => ConduitT r a m () -> ConduitT (Either l r) (Either l a) m ()
rightC :: ConduitT r a m () -> ConduitT (Either l r) (Either l a) m ()
rightC ConduitT r a m ()
r = ConduitT l (Either l a) m ()
-> ConduitT r (Either l a) m ()
-> ConduitT (Either l r) (Either l a) m ()
forall (m :: * -> *) l a r.
Monad m =>
ConduitT l a m ()
-> ConduitT r a m () -> ConduitT (Either l r) a m ()
eitherC ((l -> Either l a) -> ConduitT l (Either l a) m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
L.map l -> Either l a
forall a b. a -> Either a b
Left) (ConduitT r a m ()
r ConduitT r a m ()
-> ConduitM a (Either l a) m () -> ConduitT r (Either l a) m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (a -> Either l a) -> ConduitM a (Either l a) m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
L.map a -> Either l a
forall a b. b -> Either a b
Right)

-- | Run the conduit on the left side of the either
leftC :: Monad m => ConduitT l a m () -> ConduitT (Either l r) (Either a r) m ()
leftC :: ConduitT l a m () -> ConduitT (Either l r) (Either a r) m ()
leftC ConduitT l a m ()
l = ConduitT l (Either a r) m ()
-> ConduitT r (Either a r) m ()
-> ConduitT (Either l r) (Either a r) m ()
forall (m :: * -> *) l a r.
Monad m =>
ConduitT l a m ()
-> ConduitT r a m () -> ConduitT (Either l r) a m ()
eitherC (ConduitT l a m ()
l ConduitT l a m ()
-> ConduitM a (Either a r) m () -> ConduitT l (Either a r) m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (a -> Either a r) -> ConduitM a (Either a r) m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
L.map a -> Either a r
forall a b. a -> Either a b
Left) ((r -> Either a r) -> ConduitT r (Either a r) m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
L.map r -> Either a r
forall a b. b -> Either a b
Right)

-- | Performs the effect but ignores its result.
-- The original value is propagated downstream.
effectC :: Monad m => (a -> m b) -> ConduitT a a m ()
effectC :: (a -> m b) -> ConduitT a a m ()
effectC a -> m b
f = (a -> m a) -> ConduitT a a m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
L.mapM (\a
a -> a -> m b
f a
a m b -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a)

-- | Performs the effect but ignores its result.
-- The original value is propagated downstream.
effectC' :: Monad m => m b -> ConduitT a a m ()
effectC' :: m b -> ConduitT a a m ()
effectC' m b
m = (a -> m a) -> ConduitT a a m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
L.mapM (\a
a -> m b
m m b -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a)

-- | Sink that writes the message to an mvar
mvarWriteC :: MonadIO m => MVar a -> ConduitT a Void m ()
mvarWriteC :: MVar a -> ConduitT a Void m ()
mvarWriteC MVar a
mvar = (a -> ConduitT a Void m ()) -> ConduitT a Void m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever ((a -> ConduitT a Void m ()) -> ConduitT a Void m ())
-> (a -> ConduitT a Void m ()) -> ConduitT a Void m ()
forall a b. (a -> b) -> a -> b
$ \a
v ->
  IO () -> ConduitT a Void m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT a Void m ()) -> IO () -> ConduitT a Void m ()
forall a b. (a -> b) -> a -> b
$ MVar a -> IO (Maybe a)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar a
mvar IO (Maybe a) -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
mvar a
v

-- | Sink that writes the message to an mvar
mvarWriteMC :: MonadIO m => (a -> b) -> MVar b -> ConduitT a Void m ()
mvarWriteMC :: (a -> b) -> MVar b -> ConduitT a Void m ()
mvarWriteMC a -> b
f MVar b
mvar = (a -> ConduitT a Void m ()) -> ConduitT a Void m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever ((a -> ConduitT a Void m ()) -> ConduitT a Void m ())
-> (a -> ConduitT a Void m ()) -> ConduitT a Void m ()
forall a b. (a -> b) -> a -> b
$ \a
v ->
  IO () -> ConduitT a Void m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT a Void m ()) -> IO () -> ConduitT a Void m ()
forall a b. (a -> b) -> a -> b
$ MVar b -> IO (Maybe b)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar b
mvar IO (Maybe b) -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar b -> b -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar b
mvar (a -> b
f a
v)

-- | Sink that writes the message to an mvar
mvarWriteSink :: MonadIO m => MVar a -> ConduitT a Void m ()
mvarWriteSink :: MVar a -> ConduitT a Void m ()
mvarWriteSink MVar a
mvar = (a -> ConduitT a Void m ()) -> ConduitT a Void m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever ((a -> ConduitT a Void m ()) -> ConduitT a Void m ())
-> (a -> ConduitT a Void m ()) -> ConduitT a Void m ()
forall a b. (a -> b) -> a -> b
$ \a
v ->
  IO () -> ConduitT a Void m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT a Void m ()) -> IO () -> ConduitT a Void m ()
forall a b. (a -> b) -> a -> b
$ MVar a -> IO (Maybe a)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar a
mvar IO (Maybe a) -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
mvar a
v

-- | Creates a unified sink, which is actually two separate sinks with results
-- being sent to one or the other based on a predicate.
sinkWithPred :: Monad m => (a -> Bool) -> ConduitT a Void m () -> ConduitT a Void m () -> ConduitT a Void m ()
sinkWithPred :: (a -> Bool)
-> ConduitT a Void m ()
-> ConduitT a Void m ()
-> ConduitT a Void m ()
sinkWithPred a -> Bool
p ConduitT a Void m ()
tr ConduitT a Void m ()
fl =
  ConduitT a Void m [()] -> ConduitT a Void m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ConduitT a Void m [()] -> ConduitT a Void m ())
-> ConduitT a Void m [()] -> ConduitT a Void m ()
forall a b. (a -> b) -> a -> b
$ [ConduitT a Void m ()] -> ConduitT a Void m [()]
forall (f :: * -> *) (m :: * -> *) i r.
(Traversable f, Monad m) =>
f (Sink i m r) -> Sink i m (f r)
sequenceSinks [(a -> Bool) -> ConduitT a a m ()
forall (m :: * -> *) a. Monad m => (a -> Bool) -> ConduitT a a m ()
L.filter a -> Bool
p ConduitT a a m () -> ConduitT a Void m () -> ConduitT a Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT a Void m ()
tr, (a -> Bool) -> ConduitT a a m ()
forall (m :: * -> *) a. Monad m => (a -> Bool) -> ConduitT a a m ()
L.filter (Bool -> Bool
not (Bool -> Bool) -> (a -> Bool) -> a -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
p) ConduitT a a m () -> ConduitT a Void m () -> ConduitT a Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT a Void m ()
fl]
{-# INLINE sinkWithPred #-}

-- | Projects nothings from the stream.
-- Returns a stream that only contains nothings (represented by unit)
projectNothings :: Monad m => ConduitT (Maybe a) () m ()
projectNothings :: ConduitT (Maybe a) () m ()
projectNothings = (Maybe a -> ConduitT (Maybe a) () m ())
-> ConduitT (Maybe a) () m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever ((Maybe a -> ConduitT (Maybe a) () m ())
 -> ConduitT (Maybe a) () m ())
-> (Maybe a -> ConduitT (Maybe a) () m ())
-> ConduitT (Maybe a) () m ()
forall a b. (a -> b) -> a -> b
$ ConduitT (Maybe a) () m ()
-> (a -> ConduitT (Maybe a) () m ())
-> Maybe a
-> ConduitT (Maybe a) () m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT (Maybe a) () m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield ()) (ConduitT (Maybe a) () m () -> a -> ConduitT (Maybe a) () m ()
forall a b. a -> b -> a
const (ConduitT (Maybe a) () m () -> a -> ConduitT (Maybe a) () m ())
-> ConduitT (Maybe a) () m () -> a -> ConduitT (Maybe a) () m ()
forall a b. (a -> b) -> a -> b
$ () -> ConduitT (Maybe a) () m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
{-# INLINE projectNothings #-}

-- | Projects left side values for each value in a stream.
-- Downstream only receives values that were on the left side,
-- the right side is ignored.
projectLefts :: Monad m => ConduitT (Either l r) l m ()
projectLefts :: ConduitT (Either l r) l m ()
projectLefts = (Either l r -> ConduitT (Either l r) l m ())
-> ConduitT (Either l r) l m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever ((Either l r -> ConduitT (Either l r) l m ())
 -> ConduitT (Either l r) l m ())
-> (Either l r -> ConduitT (Either l r) l m ())
-> ConduitT (Either l r) l m ()
forall a b. (a -> b) -> a -> b
$ (l -> ConduitT (Either l r) l m ())
-> (r -> ConduitT (Either l r) l m ())
-> Either l r
-> ConduitT (Either l r) l m ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either l -> ConduitT (Either l r) l m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (ConduitT (Either l r) l m () -> r -> ConduitT (Either l r) l m ()
forall a b. a -> b -> a
const (ConduitT (Either l r) l m () -> r -> ConduitT (Either l r) l m ())
-> ConduitT (Either l r) l m ()
-> r
-> ConduitT (Either l r) l m ()
forall a b. (a -> b) -> a -> b
$ () -> ConduitT (Either l r) l m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
{-# INLINE projectLefts #-}

-- | Projects right side values for each value in a stream.
-- Downstream only receives values that were on the right side,
-- the left side is ignored.
projectRights :: Monad m => ConduitT (Either l r) r m ()
projectRights :: ConduitT (Either l r) r m ()
projectRights = (Either l r -> ConduitT (Either l r) r m ())
-> ConduitT (Either l r) r m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever ((Either l r -> ConduitT (Either l r) r m ())
 -> ConduitT (Either l r) r m ())
-> (Either l r -> ConduitT (Either l r) r m ())
-> ConduitT (Either l r) r m ()
forall a b. (a -> b) -> a -> b
$ (l -> ConduitT (Either l r) r m ())
-> (r -> ConduitT (Either l r) r m ())
-> Either l r
-> ConduitT (Either l r) r m ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (ConduitT (Either l r) r m () -> l -> ConduitT (Either l r) r m ()
forall a b. a -> b -> a
const (ConduitT (Either l r) r m () -> l -> ConduitT (Either l r) r m ())
-> ConduitT (Either l r) r m ()
-> l
-> ConduitT (Either l r) r m ()
forall a b. (a -> b) -> a -> b
$ () -> ConduitT (Either l r) r m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) r -> ConduitT (Either l r) r m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield
{-# INLINE projectRights #-}

-- | Propagate every N messages and drop all others.
everyN :: Monad m => Int -> ConduitT a a m ()
everyN :: Int -> ConduitT a a m ()
everyN Int
n = Int -> ConduitT a a m ()
forall (m :: * -> *) o. Monad m => Int -> ConduitT o o m ()
go Int
1
  where
    go :: Int -> ConduitT o o m ()
go Int
n' = ConduitT o o m (Maybe o)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT o o m (Maybe o)
-> (Maybe o -> ConduitT o o m ()) -> ConduitT o o m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT o o m ()
-> (o -> ConduitT o o m ()) -> Maybe o -> ConduitT o o m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT o o m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (\o
x ->
      if Int
n' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
n
        then Int -> ConduitT o o m ()
go (Int
n'Int -> Int -> Int
forall a. Num a => a -> a -> a
+Int
1)
        else o -> ConduitT o o m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield o
x ConduitT o o m () -> ConduitT o o m () -> ConduitT o o m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Int -> ConduitT o o m ()
go Int
1)
{-# INLINE everyN #-}

-- | Performs an action every N messages, but ignores its result. All original values are propagted downstream.
onEveryN :: Monad m => Int -> (a -> m b) -> ConduitT a a m ()
onEveryN :: Int -> (a -> m b) -> ConduitT a a m ()
onEveryN Int
n a -> m b
f = Int -> ConduitT a a m ()
go Int
1
  where
    go :: Int -> ConduitT a a m ()
go Int
i = ConduitT a a m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a a m (Maybe a)
-> (Maybe a -> ConduitT a a m ()) -> ConduitT a a m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT a a m ()
-> (a -> ConduitT a a m ()) -> Maybe a -> ConduitT a a m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT a a m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (\a
x ->
            if Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
n
              then a -> ConduitT a a m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield a
x ConduitT a a m () -> ConduitT a a m () -> ConduitT a a m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Int -> ConduitT a a m ()
go (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
              else m b -> ConduitT a a m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (a -> m b
f a
x) ConduitT a a m b -> ConduitT a a m () -> ConduitT a a m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> ConduitT a a m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield a
x ConduitT a a m () -> ConduitT a a m () -> ConduitT a a m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Int -> ConduitT a a m ()
go Int
1)
{-# INLINE onEveryN #-}

-- | Performs an action every N messages, but ignores its result. All original values are propagted downstream.
onEveryN' :: Monad m => Int -> m b -> ConduitT a a m ()
onEveryN' :: Int -> m b -> ConduitT a a m ()
onEveryN' Int
n m b
m = Int -> ConduitT a a m ()
forall o. Int -> ConduitT o o m ()
go Int
1
  where
    go :: Int -> ConduitT o o m ()
go Int
i = ConduitT o o m (Maybe o)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT o o m (Maybe o)
-> (Maybe o -> ConduitT o o m ()) -> ConduitT o o m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT o o m ()
-> (o -> ConduitT o o m ()) -> Maybe o -> ConduitT o o m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT o o m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (\o
x ->
            if Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
n
              then o -> ConduitT o o m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield o
x ConduitT o o m () -> ConduitT o o m () -> ConduitT o o m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Int -> ConduitT o o m ()
go (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
              else m b -> ConduitT o o m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m b
m ConduitT o o m b -> ConduitT o o m () -> ConduitT o o m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> o -> ConduitT o o m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield o
x ConduitT o o m () -> ConduitT o o m () -> ConduitT o o m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Int -> ConduitT o o m ()
go Int
1)
{-# INLINE onEveryN' #-}

-- | Propagate a message every N seconds and drop all others.
everyNSeconds :: MonadIO m => Int -> ConduitT a a m ()
everyNSeconds :: Int -> ConduitT a a m ()
everyNSeconds Int
interval = Int -> ConduitT a a m ()
forall (m :: * -> *) o. MonadIO m => Int -> ConduitT o o m ()
go Int
0
  where
    go :: Int -> ConduitT o o m ()
go Int
t = do
      Maybe o
mmsg <- ConduitT o o m (Maybe o)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
      case Maybe o
mmsg of
        Maybe o
Nothing -> () -> ConduitT o o m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Just o
msg -> do
          Int
ct <- IO Int -> ConduitT o o m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> ConduitT o o m Int) -> IO Int -> ConduitT o o m Int
forall a b. (a -> b) -> a -> b
$ POSIXTime -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (POSIXTime -> Int) -> (UTCTime -> POSIXTime) -> UTCTime -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. UTCTime -> POSIXTime
T.utcTimeToPOSIXSeconds (UTCTime -> Int) -> IO UTCTime -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UTCTime
T.getCurrentTime
          if Int
ct Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
t
            then o -> ConduitT o o m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield o
msg ConduitT o o m () -> ConduitT o o m () -> ConduitT o o m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Int -> ConduitT o o m ()
go (Int
ct Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
interval)
            else Int -> ConduitT o o m ()
go Int
t

---------

-- | Performs the effect but ignores its result.
-- The original value is propagated downstream.
{-# DEPRECATED effect "Use effectC instead" #-}
effect :: Monad m => (a -> m b) -> ConduitT a a m ()
effect :: (a -> m b) -> ConduitT a a m ()
effect = (a -> m b) -> ConduitT a a m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a a m ()
effectC

-- | Performs the effect but ignores its result.
-- The original value is propagated downstream.
effect' :: Monad m => m b -> ConduitT a a m ()
effect' :: m b -> ConduitT a a m ()
effect' = m b -> ConduitT a a m ()
forall (m :: * -> *) b a. Monad m => m b -> ConduitT a a m ()
effectC'

{-# DEPRECATED inJust "Use justC instead" #-}
inJust :: Monad m => ConduitT a c m () -> ConduitT (Maybe a) (Maybe c) m ()
inJust :: ConduitT a c m () -> ConduitT (Maybe a) (Maybe c) m ()
inJust = ConduitT a c m () -> ConduitT (Maybe a) (Maybe c) m ()
forall (m :: * -> *) a c.
Monad m =>
ConduitT a c m () -> ConduitT (Maybe a) (Maybe c) m ()
justC

-- | Sinks values into a given MVar.
mvarSink :: MonadIO m => MVar a -> ConduitT a () m ()
mvarSink :: MVar a -> ConduitT a () m ()
mvarSink MVar a
mvar = (a -> ConduitT a () m ()) -> ConduitT a () m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever ((a -> ConduitT a () m ()) -> ConduitT a () m ())
-> (a -> ConduitT a () m ()) -> ConduitT a () m ()
forall a b. (a -> b) -> a -> b
$ \a
v ->
  IO () -> ConduitT a () m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT a () m ()) -> IO () -> ConduitT a () m ()
forall a b. (a -> b) -> a -> b
$ MVar a -> IO (Maybe a)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar a
mvar IO (Maybe a) -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
mvar a
v

-- | Taps the stream by applying a transformation and sending the transformed
-- value into a given sink. The original value is then propagated downstream.
--
-- > tapWith projectLefts myErrorSink

{-# DEPRECATED tapWith "Unsafe.  Do not use" #-}
tapWith :: Monad m => ConduitT a b m () -> ConduitT b Void m () -> ConduitT a a m ()
tapWith :: ConduitT a b m () -> ConduitT b Void m () -> ConduitT a a m ()
tapWith ConduitT a b m ()
f ConduitT b Void m ()
s = Sink a m () -> (() -> m ()) -> ConduitT a a m ()
forall (m :: * -> *) i r.
Monad m =>
Sink i m r -> (r -> m ()) -> Conduit i m i
passthroughSink (ConduitT a b m ()
f ConduitT a b m () -> ConduitT b Void m () -> Sink a m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT b Void m ()
s) (m () -> () -> m ()
forall a b. a -> b -> a
const (m () -> () -> m ()) -> m () -> () -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
{-# INLINE tapWith #-}

-- | Taps into a given sink. The original value is then propagated downstream.
{-# DEPRECATED tap "Unsafe.  Do not use" #-}
tap :: Monad m => ConduitT a Void m () -> ConduitT a a m ()
tap :: ConduitT a Void m () -> ConduitT a a m ()
tap ConduitT a Void m ()
s = ConduitT a Void m () -> (() -> m ()) -> ConduitT a a m ()
forall (m :: * -> *) i r.
Monad m =>
Sink i m r -> (r -> m ()) -> Conduit i m i
passthroughSink ConduitT a Void m ()
s (m () -> () -> m ()
forall a b. a -> b -> a
const (m () -> () -> m ()) -> m () -> () -> m ()
forall a b. (a -> b) -> a -> b
$ () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
{-# INLINE tap #-}

-- | Taps a conduit, and sends the results into two different sinks, switching
-- on a predicate.
{-# DEPRECATED tapPred "Unsafe.  Do not use" #-}
tapPred :: Monad m => (a -> Bool) -> ConduitT a Void m () -> ConduitT a Void m () -> ConduitT a a m ()
tapPred :: (a -> Bool)
-> ConduitT a Void m ()
-> ConduitT a Void m ()
-> ConduitT a a m ()
tapPred a -> Bool
p ConduitT a Void m ()
tr ConduitT a Void m ()
fl = ConduitT a Void m () -> ConduitT a a m ()
forall (m :: * -> *) a.
Monad m =>
ConduitT a Void m () -> ConduitT a a m ()
tap ((a -> Bool) -> ConduitT a a m ()
forall (m :: * -> *) a. Monad m => (a -> Bool) -> ConduitT a a m ()
L.filter a -> Bool
p ConduitT a a m () -> ConduitT a Void m () -> ConduitT a Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT a Void m ()
tr) ConduitT a a m () -> ConduitT a a m () -> ConduitT a a m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT a Void m () -> ConduitT a a m ()
forall (m :: * -> *) a.
Monad m =>
ConduitT a Void m () -> ConduitT a a m ()
tap ((a -> Bool) -> ConduitT a a m ()
forall (m :: * -> *) a. Monad m => (a -> Bool) -> ConduitT a a m ()
L.filter (Bool -> Bool
not (Bool -> Bool) -> (a -> Bool) -> a -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
p) ConduitT a a m () -> ConduitT a Void m () -> ConduitT a Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT a Void m ()
fl)
{-# INLINE tapPred #-}

-- | For every `Nothing` value in a stream sends `()` to a given `Sink`.
-- Downstream receives an untouched original `Maybe` value.
{-# DEPRECATED tapNothing "Unsafe.  Do not use" #-}
tapNothing :: Monad m => ConduitT () Void m () -> ConduitT (Maybe a) (Maybe a) m ()
tapNothing :: ConduitT () Void m () -> ConduitT (Maybe a) (Maybe a) m ()
tapNothing = ConduitT (Maybe a) () m ()
-> ConduitT () Void m () -> ConduitT (Maybe a) (Maybe a) m ()
forall (m :: * -> *) a b.
Monad m =>
ConduitT a b m () -> ConduitT b Void m () -> ConduitT a a m ()
tapWith ConduitT (Maybe a) () m ()
forall (m :: * -> *) a. Monad m => ConduitT (Maybe a) () m ()
projectNothings
{-# INLINE tapNothing #-}

-- | For every `Nothing` value in a stream sends `()` to a given `Sink`.
-- `Nothing` is then not propagated downstream.
-- Downstream only receives values from `Just`
{-# DEPRECATED divertNothing "Unsafe.  Do not use" #-}
divertNothing :: Monad m => ConduitT () Void m () -> ConduitT (Maybe a) a m ()
divertNothing :: ConduitT () Void m () -> ConduitT (Maybe a) a m ()
divertNothing ConduitT () Void m ()
sink = ConduitT () Void m () -> ConduitT (Maybe a) (Maybe a) m ()
forall (m :: * -> *) a.
Monad m =>
ConduitT () Void m () -> ConduitT (Maybe a) (Maybe a) m ()
tapNothing ConduitT () Void m ()
sink ConduitT (Maybe a) (Maybe a) m ()
-> ConduitT (Maybe a) a m () -> ConduitT (Maybe a) a m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT (Maybe a) a m ()
forall (m :: * -> *) a. Monad m => ConduitT (Maybe a) a m ()
L.catMaybes
{-# INLINE divertNothing #-}

-- | Sends every left-side value in a stream into a given `Sink`.
-- Downstream receives the original `Either` value untouched.
{-# DEPRECATED tapLeft "Unsafe.  Do not use" #-}
tapLeft :: Monad m => ConduitT l Void m () -> ConduitT (Either l r) (Either l r) m ()
tapLeft :: ConduitT l Void m () -> ConduitT (Either l r) (Either l r) m ()
tapLeft = ConduitT (Either l r) l m ()
-> ConduitT l Void m () -> ConduitT (Either l r) (Either l r) m ()
forall (m :: * -> *) a b.
Monad m =>
ConduitT a b m () -> ConduitT b Void m () -> ConduitT a a m ()
tapWith ConduitT (Either l r) l m ()
forall (m :: * -> *) l r. Monad m => ConduitT (Either l r) l m ()
projectLefts
{-# INLINE tapLeft #-}

-- | Sends every left-side value in a stream into a given `Sink`.
-- Downstream receives only right-side values.
{-# DEPRECATED divertLeft "Unsafe.  Do not use" #-}
divertLeft :: Monad m => ConduitT l Void m () -> ConduitT (Either l r) r m ()
divertLeft :: ConduitT l Void m () -> ConduitT (Either l r) r m ()
divertLeft ConduitT l Void m ()
sink = ConduitT l Void m () -> ConduitT (Either l r) (Either l r) m ()
forall (m :: * -> *) l r.
Monad m =>
ConduitT l Void m () -> ConduitT (Either l r) (Either l r) m ()
tapLeft ConduitT l Void m ()
sink ConduitT (Either l r) (Either l r) m ()
-> ConduitT (Either l r) r m () -> ConduitT (Either l r) r m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT (Either l r) r m ()
forall (m :: * -> *) l r. Monad m => ConduitT (Either l r) r m ()
projectRights
{-# INLINE divertLeft #-}

-- | Sends every right-side value in a stream into a given `Sink`.
-- Downstream receives the original `Either` value untouched.
{-# DEPRECATED tapRight "Unsafe.  Do not use" #-}
tapRight :: Monad m => ConduitT r Void m () -> ConduitT (Either l r) (Either l r) m ()
tapRight :: ConduitT r Void m () -> ConduitT (Either l r) (Either l r) m ()
tapRight = ConduitT (Either l r) r m ()
-> ConduitT r Void m () -> ConduitT (Either l r) (Either l r) m ()
forall (m :: * -> *) a b.
Monad m =>
ConduitT a b m () -> ConduitT b Void m () -> ConduitT a a m ()
tapWith ConduitT (Either l r) r m ()
forall (m :: * -> *) l r. Monad m => ConduitT (Either l r) r m ()
projectRights
{-# INLINE tapRight #-}

-- | Sends every right-side value in a stream into a given `Sink`.
-- Downstream receives only left-side values.
{-# DEPRECATED divertRight "Unsafe.  Do not use" #-}
divertRight :: Monad m => ConduitT r Void m () -> ConduitT (Either l r) l m ()
divertRight :: ConduitT r Void m () -> ConduitT (Either l r) l m ()
divertRight ConduitT r Void m ()
sink = ConduitT r Void m () -> ConduitT (Either l r) (Either l r) m ()
forall (m :: * -> *) r l.
Monad m =>
ConduitT r Void m () -> ConduitT (Either l r) (Either l r) m ()
tapRight ConduitT r Void m ()
sink ConduitT (Either l r) (Either l r) m ()
-> ConduitT (Either l r) l m () -> ConduitT (Either l r) l m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT (Either l r) l m ()
forall (m :: * -> *) l r. Monad m => ConduitT (Either l r) l m ()
projectLefts
{-# INLINE divertRight #-}