{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE BlockArguments #-}

-- | Common transport-agnostic functions for using Churro.
-- 
-- Variants with a trailing underscore - E.g. `runWait_` specialised the Async action to
-- be () if you don't care about accumulating results and only processing items as they
-- pass through the pipeline.
-- 
-- Variants with a trailing prime - E.g. `processRetry'`. also change the generality of the
-- types involved in some way.
-- 
module Control.Churro.Prelude where

import Control.Churro.Types

import Prelude hiding (id, (.))

import           Control.Arrow            (arr)
import           Control.Category         (id, (.), (>>>))
import           Control.Concurrent       (threadDelay)
import           Control.Concurrent.Async (async, cancel, Async, wait)
import           Control.Exception        (Exception, SomeException, try)
import           Control.Monad            (replicateM_, when)
import           Data.Foldable            (foldMap', for_)
import           Data.Maybe               (isJust)
import           Data.Time                (NominalDiffTime)
import           Data.Void                (Void)
import           GHC.Natural              (Natural)
import Data.Traversable (for)


-- $setup
-- 
-- The examples in this module require the following imports:
-- 
-- >>> :set -XBlockArguments
-- >>> import Control.Churro.Transport
-- >>> import Data.Time.Clock
-- >>> import System.Timeout (timeout)
-- 

-- * Runners

-- | Automatically wait for a churro to complete.
-- 
runWait :: Transport t => Churro a t Void Void -> IO a
runWait :: forall (t :: * -> *) a. Transport t => Churro a t Void Void -> IO a
runWait Churro a t Void Void
x = forall a. Async a -> IO a
wait forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (t :: * -> *) a.
Transport t =>
Churro a t Void Void -> IO (Async a)
run Churro a t Void Void
x

-- | Version of `runWait` specialised to `()`.
-- 
runWait_ :: Transport t => Churro () t Void Void -> IO ()
runWait_ :: forall (t :: * -> *). Transport t => Churro () t Void Void -> IO ()
runWait_ = forall (t :: * -> *) a. Transport t => Churro a t Void Void -> IO a
runWait

-- | Read the output of a Churro into a list.
-- 
-- Warning: This will block until the Churro terminates,
--          Accumulating items in memory.
--          Only use when you expect a finite amount of output.
--          Otherwise consider composing with a Sink and using `runWait`.
-- 
-- >>> runWaitListChan $ sourceList [0..4] >>> arr succ
-- [1,2,3,4,5]
-- 
runWaitList :: (Transport t, Monoid a) => Churro a t Void b -> IO [b]
runWaitList :: forall (t :: * -> *) a b.
(Transport t, Monoid a) =>
Churro a t Void b -> IO [b]
runWaitList Churro a t Void b
c = forall (t :: * -> *) a. Transport t => Churro a t Void Void -> IO a
runWait forall a b. (a -> b) -> a -> b
$ (Churro a t Void b
c forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (cat :: * -> * -> *) a b.
(Functor (cat a), Category cat) =>
(a -> b) -> cat a b
arr' (forall a. a -> [a] -> [a]
:[])) forall (t :: * -> *) fo gi a1 fi a2 go.
(Transport t, fo ~ gi) =>
Churro a1 t fi fo -> Churro a2 t gi go -> Churro a2 t fi go
>>>> forall (t :: * -> *) a.
(Transport t, Monoid a) =>
Churro a t a Void
sink 

-- | Version of `runWaitList` specialised to `()`.
-- 
runWaitList_ :: Transport t => Churro () t Void b -> IO [b]
runWaitList_ :: forall (t :: * -> *) b. Transport t => Churro () t Void b -> IO [b]
runWaitList_ = forall (t :: * -> *) a b.
(Transport t, Monoid a) =>
Churro a t Void b -> IO [b]
runWaitList

-- | Run a sourced and sinked (double-dipped) churro and return an async action representing the in-flight processes.
--
run :: Transport t => Churro a t Void Void -> IO (Async a)
run :: forall (t :: * -> *) a.
Transport t =>
Churro a t Void Void -> IO (Async a)
run = forall (t :: * -> *) a i o.
Transport t =>
Churro a t i o -> IO (Async a)
run'

-- | Version of `run` with async return type specialised to `()`.
--
run_ :: Transport t => Churro () t Void Void -> IO (Async ())
run_ :: forall (t :: * -> *).
Transport t =>
Churro () t Void Void -> IO (Async ())
run_ = forall (t :: * -> *) a i o.
Transport t =>
Churro a t i o -> IO (Async a)
run'

-- | Run any churro, there is no check that this was spawned with a source, or terminated with a sink.
--   This is unsafe, since the pipeline may not generate or consume in a predictable way.
--   Use `run` instead unless you are confident you know what you're doing.
-- 
run' :: Transport t => Churro a t i o -> IO (Async a)
run' :: forall (t :: * -> *) a i o.
Transport t =>
Churro a t i o -> IO (Async a)
run' Churro a t i o
c = do
    (In t (Maybe i)
_i,Out t (Maybe o)
_o,Async a
a) <- forall a (t :: * -> *) i o.
Churro a t i o -> IO (In t (Maybe i), Out t (Maybe o), Async a)
runChurro Churro a t i o
c
    forall (m :: * -> *) a. Monad m => a -> m a
return Async a
a

-- * Library

-- ** Sources

-- | A single items source.
--
-- >>> runWaitChan $ sourceSingleton 13 >>> sinkPrint
-- 13
--
-- Equivalent to `pure` from `Applicative`. Redefined here in case you're looking for a source!
-- 
-- >>> runWaitChan $ pure 23 >>> sinkPrint
-- 23
sourceSingleton :: Transport t => o -> Churro () t Void o
sourceSingleton :: forall (t :: * -> *) o. Transport t => o -> Churro () t Void o
sourceSingleton o
x = forall (t :: * -> *) (f :: * -> *) o.
(Transport t, Foldable f) =>
f o -> Churro () t Void o
sourceList [o
x]

-- | Create a source from a list of items, sending each down the churro independently.
--
-- >>> runWaitChan $ sourceList [4,2] >>> sinkPrint
-- 4
-- 2
sourceList :: (Transport t, Foldable f) => f o -> Churro () t Void o
sourceList :: forall (t :: * -> *) (f :: * -> *) o.
(Transport t, Foldable f) =>
f o -> Churro () t Void o
sourceList = forall (t :: * -> *) o a.
Transport t =>
((o -> IO ()) -> IO a) -> Churro a t Void o
sourceIO forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_

-- | Create a source from an IO action that is passed a function to yield new items.
--
-- >>> runWaitChan $ sourceIO (\cb -> cb 4 >> cb 2) >>> sinkPrint
-- 4
-- 2
sourceIO :: Transport t => ((o -> IO ()) -> IO a) -> Churro a t Void o
sourceIO :: forall (t :: * -> *) o a.
Transport t =>
((o -> IO ()) -> IO a) -> Churro a t Void o
sourceIO (o -> IO ()) -> IO a
cb =
    forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe Void)
_i In t (Maybe o)
o -> do
        a
r <- (o -> IO ()) -> IO a
cb (forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe o)
o forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. a -> Maybe a
Just)
        forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe o)
o forall a. Maybe a
Nothing
        forall (m :: * -> *) a. Monad m => a -> m a
return a
r

-- | Variant of `sourceIO` with Async action specialised to `()`.
-- 
sourceIO_ :: Transport t => ((o -> IO ()) -> IO ()) -> Churro () t Void o
sourceIO_ :: forall (t :: * -> *) o.
Transport t =>
((o -> IO ()) -> IO ()) -> Churro () t Void o
sourceIO_ = forall (t :: * -> *) o a.
Transport t =>
((o -> IO ()) -> IO a) -> Churro a t Void o
sourceIO

-- | Combine a list of sources into a single source.
-- 
-- Sends individual items downstream without attempting to combine them.
-- 
-- >>> runWaitChan $ sources [pure 1, pure 1] >>> sinkPrint
-- 1
-- 1
-- 
-- Can combine results of sources with a Monoid instance, although this isn't very useful
-- when forming the start of a longer pipeline:
-- 
-- >>> :{
-- do
--   r <- runWaitChan $ sources [sourceIO \_cb -> print 1 >> return "hello ", sourceIO \_cb -> print 1 >> return "world"]
--   print r
-- :}
-- 1
-- 1
-- "hello world"
sources :: (Transport t, Traversable f, Monoid a) => f (Churro a t Void o) -> Churro a t Void o
sources :: forall (t :: * -> *) (f :: * -> *) a o.
(Transport t, Traversable f, Monoid a) =>
f (Churro a t Void o) -> Churro a t Void o
sources f (Churro a t Void o)
ss = forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe Void)
_i In t (Maybe o)
o -> do
    f (In t (Maybe Void), Out t (Maybe o), Async a)
cs <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM forall a (t :: * -> *) i o.
Churro a t i o -> IO (In t (Maybe i), Out t (Maybe o), Async a)
runChurro f (Churro a t Void o)
ss
    forall b a. IO b -> IO a -> IO a
finally' (forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(In t (Maybe Void)
_,Out t (Maybe o)
_,Async a
a) -> forall a. Async a -> IO ()
cancel Async a
a) f (In t (Maybe Void), Out t (Maybe o), Async a)
cs) do
        f (Async a)
as <- forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for f (In t (Maybe Void), Out t (Maybe o), Async a)
cs \(In t (Maybe Void)
_i,Out t (Maybe o)
o',Async a
a) ->
            forall a. IO a -> IO (Async a)
async do
                forall (t :: * -> *) b a.
(Transport t, Monoid b) =>
Out t (Maybe a) -> (Maybe a -> IO b) -> IO b
yankAll' Out t (Maybe o)
o' \Maybe o
v -> do
                    case Maybe o
v of
                        Maybe o
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
                        Just o
x  -> forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe o)
o (forall a. a -> Maybe a
Just o
x)
                forall a. Async a -> IO a
wait Async a
a
        a
r <- forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap' forall a. Async a -> IO a
wait f (Async a)
as
        forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe o)
o forall a. Maybe a
Nothing
        forall (m :: * -> *) a. Monad m => a -> m a
return a
r

-- | Variant of `sources` with Async action of sources in argument specialised to `()`.
-- 
sources_ :: Transport t => [Source () t o] -> Source () t o
sources_ :: forall (t :: * -> *) o.
Transport t =>
[Source () t o] -> Source () t o
sources_ = forall (t :: * -> *) (f :: * -> *) a o.
(Transport t, Traversable f, Monoid a) =>
f (Churro a t Void o) -> Churro a t Void o
sources

-- ** Sinks

-- | Consume all items and combines them into a result via their monoid.
-- 
-- >>> :set -XFlexibleContexts
-- >>> r <- runWaitChan $ pure' [1 :: Int] >>> sink
-- >>> print r
-- [1]
sink :: (Transport t, Monoid a) => Churro a t a Void
sink :: forall (t :: * -> *) a.
(Transport t, Monoid a) =>
Churro a t a Void
sink = forall (t :: * -> *) a o.
(Transport t, Monoid a) =>
(o -> IO a) -> Churro a t o Void
sinkIO forall (m :: * -> *) a. Monad m => a -> m a
return

-- | Consume all items with no additional effects.
-- 
-- TODO: Decide if we should use some kind of `nf` evaluation here to force items.
-- 
-- >>> runWaitChan $ pure 1 >>> process print >>> sink_
-- 1
-- 
sink_ :: Transport t => Churro () t i Void
sink_ :: forall (t :: * -> *) i. Transport t => Churro () t i Void
sink_ = forall (t :: * -> *) a o.
(Transport t, Monoid a) =>
(o -> IO a) -> Churro a t o Void
sinkIO (forall a b. a -> b -> a
const (forall (m :: * -> *) a. Monad m => a -> m a
return ()))

-- | Consume a churro with an IO process.
-- 
-- >>> runWaitChan $ pure 1 >>> sinkIO (\x -> print "hello" >> print (succ x))
-- "hello"
-- 2
sinkIO :: (Transport t, Monoid a) => (o -> IO a) -> Churro a t o Void
sinkIO :: forall (t :: * -> *) a o.
(Transport t, Monoid a) =>
(o -> IO a) -> Churro a t o Void
sinkIO o -> IO a
cb = forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe o)
i In t (Maybe Void)
_o -> forall (t :: * -> *) a i.
(Transport t, Monoid a) =>
Out t (Maybe i) -> (i -> IO a) -> IO a
yankAll Out t (Maybe o)
i o -> IO a
cb

-- | Variant of `sinkIO` with Async action specialised to `()`.
-- 
sinkIO_ :: Transport t => (o -> IO ()) -> Churro () t o Void
sinkIO_ :: forall (t :: * -> *) o.
Transport t =>
(o -> IO ()) -> Churro () t o Void
sinkIO_ o -> IO ()
cb = forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe o)
i In t (Maybe Void)
_o -> forall (t :: * -> *) a i.
(Transport t, Monoid a) =>
Out t (Maybe i) -> (i -> IO a) -> IO a
yankAll Out t (Maybe o)
i o -> IO ()
cb

-- | Create a "sink" with more flexibility about when items are demanded using a higher-order "HO" callback.
-- 
-- This also allows a non-unit async action that can be recovered when run.
-- 
-- WARNING: You should use the provided callback if you want to acually create a sink.
-- 
-- TODO: Use hidden callback return type in order to ensure that the callback is called.
-- 
-- >>> import System.Timeout (timeout)
-- >>> :{
-- do
--   r <- timeout 100000 $ runWaitChan $ sourceSingleton 1 >>>> sinkHO \ya -> do
--     ya (print . show)
--     return 25
--   print r
-- :}
-- "1"
-- Just 25
sinkHO :: Transport t => (((i -> IO ()) -> IO ()) -> IO a) -> Churro a t i o
sinkHO :: forall (t :: * -> *) i a o.
Transport t =>
(((i -> IO ()) -> IO ()) -> IO a) -> Churro a t i o
sinkHO ((i -> IO ()) -> IO ()) -> IO a
cb = forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe i)
i In t (Maybe o)
_o -> ((i -> IO ()) -> IO ()) -> IO a
cb (forall (t :: * -> *) a i.
(Transport t, Monoid a) =>
Out t (Maybe i) -> (i -> IO a) -> IO a
yankAll Out t (Maybe i)
i)

-- | Consume and print each item. Used in many examples, but not much use outside debugging!
-- 
-- >>> runWaitChan $ pure "hi" >>> sinkPrint
-- "hi"
sinkPrint :: (Transport t, Show a) => Churro () t a Void
sinkPrint :: forall (t :: * -> *) a. (Transport t, Show a) => Churro () t a Void
sinkPrint = forall (t :: * -> *) a o.
(Transport t, Monoid a) =>
(o -> IO a) -> Churro a t o Void
sinkIO forall a. Show a => a -> IO ()
print
    

-- ** Churros

-- | Process each item with an IO action.
--   Acts as a one-to-one process.
-- 
-- >>> runWaitChan $ pure "hi" >>> process (\x -> print x >> return (reverse x)) >>> sinkPrint
-- "hi"
-- "ih"
process :: Transport t => (a -> IO b) -> Churro () t a b
process :: forall (t :: * -> *) a b.
Transport t =>
(a -> IO b) -> Churro () t a b
process a -> IO b
f = forall (t :: * -> *) i o.
Transport t =>
(i -> IO [o]) -> Churro () t i o
processN (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall (f :: * -> *) a. Applicative f => a -> f a
pure forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> IO b
f)

-- | Print each item then pass it on.
processPrint :: (Transport t, Show b) => Churro () t b b
processPrint :: forall (t :: * -> *) b. (Transport t, Show b) => Churro () t b b
processPrint = forall (t :: * -> *) a b.
Transport t =>
(a -> IO b) -> Churro () t a b
process \b
x -> do forall a. Show a => a -> IO ()
print b
x forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return b
x

-- | Print each item with an additional debugging label.
processDebug :: (Transport t, Show b) => String -> Churro () t b b
processDebug :: forall (t :: * -> *) b.
(Transport t, Show b) =>
String -> Churro () t b b
processDebug String
d = forall (t :: * -> *) a b.
Transport t =>
(a -> IO b) -> Churro () t a b
process \b
x -> String -> IO ()
putStrLn (String
"Debugging [" forall a. Semigroup a => a -> a -> a
<> String
d forall a. Semigroup a => a -> a -> a
<> String
"]: " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show b
x) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return b
x

-- | Process each item with an IO action and potentially yield many items as a result.
--   Acts as a one-to-many process.
-- 
-- >>> runWaitChan $ pure 1 >>> processN (\x -> print (show x) >> return [x, succ x]) >>> sinkPrint
-- "1"
-- 1
-- 2
processN :: Transport t => (i -> IO [o]) -> Churro () t i o
processN :: forall (t :: * -> *) i o.
Transport t =>
(i -> IO [o]) -> Churro () t i o
processN i -> IO [o]
f =
    forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe i)
i In t (Maybe o)
o -> do
        forall (t :: * -> *) a i.
(Transport t, Monoid a) =>
Out t (Maybe i) -> (i -> IO a) -> IO a
yankAll Out t (Maybe i)
i \i
x -> do forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe o)
o forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. a -> Maybe a
Just) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< i -> IO [o]
f i
x
        forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe o)
o forall a. Maybe a
Nothing

-- | Concatenates splits lists of items into individual items.
-- 
concatC :: Transport t => Churro () t [o] o
concatC :: forall (t :: * -> *) o. Transport t => Churro () t [o] o
concatC = forall (t :: * -> *) i o.
Transport t =>
(i -> IO [o]) -> Churro () t i o
processN (forall (f :: * -> *) a. Applicative f => a -> f a
pure forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall {k} (cat :: k -> k -> *) (a :: k). Category cat => cat a a
id)

-- | Run a set of churros like a work-stealing queue for its inputs.
-- 
-- Similar to ArrowChoice, but more straightforward due to unified output type and independent implementation.
-- 
-- * NOTE: This makes no judgement about the ordering of outputs corresponding to the ordering of inputs.
-- * NOTE: You will need to specialise the transport of the processes. This is deliberate as it allows you
--         to use a bounded channel that ensures allocation to idle processes.
--         Use the `processesUnagi` variant from `Control.Churro.Transport.Unagi.Bounded` to default to
--         a buffer size of 1.
-- 
-- WARNING: This won't deterministically allocate work to idle workers unless a bounded channel is used.
-- 
-- * TODO: Figure out cancellation strategy.
-- * TODO: Consider a binary combinator and this as a folded application.
-- 
-- >>> import Control.Churro.Transport.Unagi.Bounded (processesUnagi)
-- 
-- Sanity check - All items entering should propagate, independent of the number of processes:
-- 
-- >>> runWaitListChan $ sourceList [1,1,1,1,1] >>> processesUnagi (replicate 3 (delay 0.1))
-- [1,1,1,1,1]
-- 
-- This example creates a source of 10 values, then creates a process of 10 workers that all wait 1/2 a second.
-- If this works, then all ten values should be consumed and propagated in 1/2 a second by distributing the load
-- over the set of 10 workers:
-- 
-- >>> :{
-- do
--   timeout 10000000 $ runWaitListChan $ sourceList (replicate 10 1) >>> processesUnagi (replicate 1 $ delay 0.05)
-- :}
-- Just [1,1,1,1,1,1,1,1,1,1]
-- 
-- We could use different strategies such as round-robin, etc. to default to a more balanced allocation, but this wouldn't
-- be most efficient if each worker performed at different rates of consumption.
-- 
processes :: (Traversable f, Transport t1, Transport t2, Monoid a) => f (Churro a t1 i o) -> Churro a t2 i o
processes :: forall (f :: * -> *) (t1 :: * -> *) (t2 :: * -> *) a i o.
(Traversable f, Transport t1, Transport t2, Monoid a) =>
f (Churro a t1 i o) -> Churro a t2 i o
processes f (Churro a t1 i o)
cs = forall a (t :: * -> *) i o.
IO (In t (Maybe i), Out t (Maybe o), Async a) -> Churro a t i o
Churro do
    (In t2 (Maybe i)
i,  Out t2 (Maybe i)
o ) <- forall (t :: * -> *) a. Transport t => IO (In t a, Out t a)
flex
    (In t2 (Maybe o)
i', Out t2 (Maybe o)
o') <- forall (t :: * -> *) a. Transport t => IO (In t a, Out t a)
flex

    let
        worker :: Churro a t i o -> IO (Async a)
worker Churro a t i o
c = forall a. IO a -> IO (Async a)
async do
            forall a (t :: * -> *) i o b.
Churro a t i o
-> (In t (Maybe i) -> Out t (Maybe o) -> Async a -> IO b) -> IO b
withChurro Churro a t i o
c \In t (Maybe i)
ci Out t (Maybe o)
co Async a
ca -> do
                Async ()
a' <- forall a. IO a -> IO (Async a)
async do
                    forall (t1 :: * -> *) (t2 :: * -> *) i.
(Transport t1, Transport t2) =>
Out t1 (Maybe i) -> In t2 (Maybe i) -> IO ()
c2c' Out t (Maybe o)
co In t2 (Maybe o)
i'
                    forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t2 (Maybe i)
i forall a. Maybe a
Nothing -- Ensure other consumers aren't blocked. FIXME: This produces one more `Nothing` than is required.
                forall (t1 :: * -> *) (t2 :: * -> *) i.
(Transport t1, Transport t2) =>
Out t1 (Maybe i) -> In t2 (Maybe i) -> IO ()
c2c' Out t2 (Maybe i)
o In t (Maybe i)
ci
                forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe i)
ci forall a. Maybe a
Nothing
                forall a. Async a -> IO a
wait Async ()
a'
                forall a. Async a -> IO a
wait Async a
ca

    f (Async a)
as <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM forall {t :: * -> *} {a}.
Transport t =>
Churro a t i o -> IO (Async a)
worker f (Churro a t1 i o)
cs

    Async a
a  <- forall a. IO a -> IO (Async a)
async do
        a
r <- forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap' forall a. Async a -> IO a
wait f (Async a)
as
        forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t2 (Maybe o)
i' forall a. Maybe a
Nothing -- Make sure to conclude the process once all the processes have finished consuming
        forall (m :: * -> *) a. Monad m => a -> m a
return a
r

    forall (m :: * -> *) a. Monad m => a -> m a
return (In t2 (Maybe i)
i,Out t2 (Maybe o)
o',Async a
a)

    where
    -- Version of c2c that doesn't propagate Nothing once transport is consumed.
    -- This is required here since we don't want a worker to be able to prematurely terminate the processes
    -- while an earlier slower worker still hasn't finished propagating its result.
    -- Also allows two different types of Transport.
    -- 
    c2c' :: (Transport t1, Transport t2) => Out t1 (Maybe i) -> In t2 (Maybe i) -> IO ()
    c2c' :: forall (t1 :: * -> *) (t2 :: * -> *) i.
(Transport t1, Transport t2) =>
Out t1 (Maybe i) -> In t2 (Maybe i) -> IO ()
c2c' Out t1 (Maybe i)
o In t2 (Maybe i)
i = forall (t :: * -> *) a i.
(Transport t, Monoid a) =>
Out t (Maybe i) -> (i -> IO a) -> IO a
yankAll Out t1 (Maybe i)
o (forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t2 (Maybe i)
i forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. a -> Maybe a
Just)

-- | Set up N worker churro processes to concurrently process the stream.
-- 
-- Consider using `thiefUnagi` unless you have a requirement for controlling the transport of the process group.
-- 
thief :: (Transport t1, Transport t2, Monoid a) => Int -> Churro a t1 i o -> Churro a t2 i o
thief :: forall (t1 :: * -> *) (t2 :: * -> *) a i o.
(Transport t1, Transport t2, Monoid a) =>
Int -> Churro a t1 i o -> Churro a t2 i o
thief Int
n Churro a t1 i o
c = forall (f :: * -> *) (t1 :: * -> *) (t2 :: * -> *) a i o.
(Traversable f, Transport t1, Transport t2, Monoid a) =>
f (Churro a t1 i o) -> Churro a t2 i o
processes (forall a. Int -> a -> [a]
replicate Int
n Churro a t1 i o
c)

-- | Extract xs from (Just x)s. Similar to `catMaybes`.
-- 
-- >>> runWaitChan $ sourceList [Just 1, Nothing, Just 3] >>> justs >>> sinkPrint
-- 1
-- 3
justs :: Transport t => Churro () t (Maybe a) a
justs :: forall (t :: * -> *) a. Transport t => Churro () t (Maybe a) a
justs = forall (t :: * -> *) a b.
Transport t =>
(a -> [b]) -> Churro () t a b
mapN (forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] forall (f :: * -> *) a. Applicative f => a -> f a
pure)

-- | Extract ls from (Left l)s.
-- 
-- >>> runWaitChan $ sourceList [Left 1, Right 2, Left 3] >>> lefts >>> sinkPrint
-- 1
-- 3
lefts :: Transport t => Churro () t (Either a b) a
lefts :: forall (t :: * -> *) a b. Transport t => Churro () t (Either a b) a
lefts = forall (t :: * -> *) a b.
Transport t =>
(a -> [b]) -> Churro () t a b
mapN (forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. a -> b -> a
const []))

-- | Extract rs from (Right r)s.
-- 
-- >>> runWaitChan $ sourceList [Left 1, Right 2, Left 3] >>> rights >>> sinkPrint
-- 2
rights :: Transport t => Churro () t (Either a b) b
rights :: forall (t :: * -> *) a b. Transport t => Churro () t (Either a b) b
rights = forall (t :: * -> *) a b.
Transport t =>
(a -> [b]) -> Churro () t a b
mapN (forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (forall a b. a -> b -> a
const []) forall (f :: * -> *) a. Applicative f => a -> f a
pure)

-- | Take and yield the first n items.
-- 
-- WARNING: This is intended to terminate upstream once the items have been consumed
--          downstream, but there is a bug preventing this from working at present!
-- 
-- >>> runWaitChan $ sourceList [1..100] >>> takeC 2 >>> sinkPrint
-- 1
-- 2
-- 
-- This implementation explicitly stops propagating when the Churro completes,
-- although this could be handled by downstream consumer composition terminating
-- the producer and just using replicateM.
takeC :: (Transport t, Integral n) => n -> Churro () t a a
takeC :: forall (t :: * -> *) n a.
(Transport t, Integral n) =>
n -> Churro () t a a
takeC n
n = forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe a)
i In t (Maybe a)
o -> forall {t} {t :: * -> *} {t :: * -> *} {a}.
(Ord t, Num t, Transport t, Transport t, Enum t) =>
t -> Out t (Maybe a) -> In t (Maybe a) -> IO ()
go n
n Out t (Maybe a)
i In t (Maybe a)
o
    where
    go :: t -> Out t (Maybe a) -> In t (Maybe a) -> IO ()
go t
t Out t (Maybe a)
i In t (Maybe a)
o
        | t
t forall a. Ord a => a -> a -> Bool
<= t
0 = forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe a)
o forall a. Maybe a
Nothing
        | Bool
otherwise = do
            Maybe a
x <- forall (t :: * -> *) a. Transport t => Out t a -> IO a
yank Out t (Maybe a)
i
            forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe a)
o Maybe a
x
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Maybe a -> Bool
isJust Maybe a
x) do t -> Out t (Maybe a) -> In t (Maybe a) -> IO ()
go (forall a. Enum a => a -> a
pred t
t) Out t (Maybe a)
i In t (Maybe a)
o

-- | Drop the first n items.
-- 
-- >>> runWaitChan $ sourceList [1..4] >>> dropC 2 >>> sinkPrint
-- 3
-- 4
dropC :: (Transport t, Integral n) => n -> Churro () t a a
dropC :: forall (t :: * -> *) n a.
(Transport t, Integral n) =>
n -> Churro () t a a
dropC n
n = forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe a)
i In t (Maybe a)
o -> do
    forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ (forall a b. (Integral a, Num b) => a -> b
fromIntegral n
n) (forall (t :: * -> *) a. Transport t => Out t a -> IO a
yank Out t (Maybe a)
i) -- TODO: Check the async behaviour of this...
    forall (t :: * -> *) a b.
Transport t =>
(a -> b) -> Out t (Maybe a) -> In t (Maybe b) -> IO ()
c2c forall {k} (cat :: k -> k -> *) (a :: k). Category cat => cat a a
id Out t (Maybe a)
i In t (Maybe a)
o

-- | Filter items according to a predicate.
--
-- >>> runWaitChan $ sourceList [1..5] >>> filterC (> 3) >>> sinkPrint
-- 4
-- 5
filterC :: Transport t => (a -> Bool) -> Churro () t a a
filterC :: forall (t :: * -> *) a.
Transport t =>
(a -> Bool) -> Churro () t a a
filterC a -> Bool
p = forall (t :: * -> *) a b.
Transport t =>
(a -> [b]) -> Churro () t a b
mapN (forall a. (a -> Bool) -> [a] -> [a]
filter a -> Bool
p forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall (f :: * -> *) a. Applicative f => a -> f a
pure)

-- | Run a pure function over items, producing multiple outputs.
-- 
-- >>> runWaitChan $ pure 9 >>> mapN (\x -> [x,x*10]) >>> sinkPrint
-- 9
-- 90
mapN :: Transport t => (a -> [b]) -> Churro () t a b
mapN :: forall (t :: * -> *) a b.
Transport t =>
(a -> [b]) -> Churro () t a b
mapN a -> [b]
f = forall (t :: * -> *) i o.
Transport t =>
(i -> IO [o]) -> Churro () t i o
processN (forall (m :: * -> *) a. Monad m => a -> m a
return forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> [b]
f)

-- | Delay items from being sent downstream.
-- 
--   Note: NominalDiffTime's Num instance interprets literals as seconds.
-- 
-- >>> let sinkTimeCheck = process (const getCurrentTime) >>> withPrevious >>> arr (\(x,y) -> diffUTCTime y x > 0.01) >>> sinkPrint
-- 
-- >>> runWaitChan $ sourceList [1..2] >>> sinkTimeCheck
-- False
-- 
-- >>> runWaitChan $ sourceList [1..2] >>> delay 0.1 >>> sinkTimeCheck
-- True
delay :: Transport t => NominalDiffTime -> Churro () t a a
delay :: forall (t :: * -> *) a.
Transport t =>
NominalDiffTime -> Churro () t a a
delay = forall (t :: * -> *) a. Transport t => Int -> Churro () t a a
delayMicro forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a b. (RealFrac a, Integral b) => a -> b
ceiling @Double forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. Fractional a => Rational -> a
fromRational forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. (forall a. Num a => a -> a -> a
*Rational
1000000) forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. Real a => a -> Rational
toRational

-- | Delay items in microseconds. Works the same way as `delay`.
delayMicro :: Transport t => Int -> Churro () t a a
delayMicro :: forall (t :: * -> *) a. Transport t => Int -> Churro () t a a
delayMicro Int
d = forall (t :: * -> *) a b.
Transport t =>
(a -> IO b) -> Churro () t a b
process \a
x -> do
    Int -> IO ()
threadDelay Int
d
    forall (m :: * -> *) a. Monad m => a -> m a
return a
x

-- | Passes consecutive pairs of items downstream.
-- 
-- >>> runWaitChan $ sourceList [1,2,3] >>> withPrevious >>> sinkPrint
-- (1,2)
-- (2,3)
withPrevious :: Transport t => Churro () t a (a,a)
withPrevious :: forall (t :: * -> *) a. Transport t => Churro () t a (a, a)
withPrevious = forall (t :: * -> *) i o a.
Transport t =>
(Out t (Maybe i) -> In t (Maybe o) -> IO a) -> Churro a t i o
buildChurro \Out t (Maybe a)
i In t (Maybe (a, a))
o -> do
    forall {t :: * -> *} {t :: * -> *} {b}.
(Transport t, Transport t) =>
Maybe b -> Out t (Maybe b) -> In t (Maybe (b, b)) -> IO ()
prog forall a. Maybe a
Nothing Out t (Maybe a)
i In t (Maybe (a, a))
o 
    forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe (a, a))
o forall a. Maybe a
Nothing
    where
    prog :: Maybe b -> Out t (Maybe b) -> In t (Maybe (b, b)) -> IO ()
prog Maybe b
x Out t (Maybe b)
i In t (Maybe (b, b))
o = do
        Maybe b
y <- forall (t :: * -> *) a. Transport t => Out t a -> IO a
yank Out t (Maybe b)
i
        case (Maybe b
x,Maybe b
y) of
            (Just b
x', Just b
y') -> forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe (b, b))
o (forall a. a -> Maybe a
Just (b
x',b
y')) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe b -> Out t (Maybe b) -> In t (Maybe (b, b)) -> IO ()
prog Maybe b
y Out t (Maybe b)
i In t (Maybe (b, b))
o
            (Maybe b
Nothing, Just b
y') -> Maybe b -> Out t (Maybe b) -> In t (Maybe (b, b)) -> IO ()
prog (forall a. a -> Maybe a
Just b
y') Out t (Maybe b)
i In t (Maybe (b, b))
o
            (Maybe b, Maybe b)
_                  -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Requeue an item if it fails. Swallows exceptions and gives up after retries.
-- 
--  Note: Process will always try once so if retries = 1 then a failing process will execute twice.
-- 
--  The item is requeues on the input side of the churro, so if other items have
--  been passed in they will appear first!
-- 
--  Catches all `SomeException`s. If you wish to narrow the execption type, consider
--  using the processRetry' variant composed with `rights`.
-- 
--  Note: There is an edgecase with Chan transport where a queued retry may not execute
--        if a source completes and finalises before the item is requeued.
--        A different transport type may allow a modified retry function that requeues differently.
-- 
-- >>> :{
-- let
--   prog = processRetry 1 flakeyThing
--   flakeyThing x = do
--     if x > 1
--       then print "GT"  >> return x
--       else print "LTE" >> error ("oops! " <> show x)
-- in
--   runWaitChan $ sourceList [1,2] >>> delay 0.1 >>> prog >>> sinkPrint
-- :}
-- "LTE"
-- "LTE"
-- "GT"
-- 2
-- 
processRetry :: Transport t => Natural -> (i -> IO o) -> Churro () t i o
processRetry :: forall (t :: * -> *) i o.
Transport t =>
Natural -> (i -> IO o) -> Churro () t i o
processRetry Natural
retries i -> IO o
f = forall e (t :: * -> *) i o.
(Exception e, Transport t) =>
Natural -> (i -> IO o) -> Churro () t i (Either e o)
processRetry' @SomeException Natural
retries i -> IO o
f forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (t :: * -> *) a b. Transport t => Churro () t (Either a b) b
rights

-- | Raw version of `processRetry`. -- Polymorphic over exception type and forwards errors.
--   
processRetry' :: (Exception e, Transport t) => Natural -> (i -> IO o) -> Churro () t i (Either e o)
processRetry' :: forall e (t :: * -> *) i o.
(Exception e, Transport t) =>
Natural -> (i -> IO o) -> Churro () t i (Either e o)
processRetry' Natural
retries i -> IO o
f = forall (a :: * -> * -> *) b c. Arrow a => (b -> c) -> a b c
arr (Natural
0,) forall {k} (cat :: k -> k -> *) (a :: k) (b :: k) (c :: k).
Category cat =>
cat a b -> cat b c -> cat a c
>>> forall (t :: * -> *) e n a b.
(Transport t, Exception e, Ord n, Enum n) =>
n -> (a -> IO b) -> Churro () t (n, a) (Either e b)
processRetry'' Natural
retries i -> IO o
f

-- | Rawest version of `processRetry`.
--   Expects the incoming items to contain number of retries.
-- 
--   Also polymorphic over exception type. And forwards errors.
--   
processRetry'' :: (Transport t, Exception e, Ord n, Enum n) => n -> (a -> IO b) -> Churro () t (n, a) (Either e b)
processRetry'' :: forall (t :: * -> *) e n a b.
(Transport t, Exception e, Ord n, Enum n) =>
n -> (a -> IO b) -> Churro () t (n, a) (Either e b)
processRetry'' n
retries a -> IO b
f =
    forall (t :: * -> *) i o a.
Transport t =>
(In t (Maybe i) -> Out t (Maybe i) -> In t (Maybe o) -> IO a)
-> Churro a t i o
buildChurro' \In t (Maybe (n, a))
i' Out t (Maybe (n, a))
o In t (Maybe (Either e b))
i -> do
        forall (t :: * -> *) a i.
(Transport t, Monoid a) =>
Out t (Maybe i) -> (i -> IO a) -> IO a
yankAll Out t (Maybe (n, a))
o \(n
n, a
y) -> do
            Either e b
r <- forall e a. Exception e => IO a -> IO (Either e a)
try do a -> IO b
f a
y
            forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe (Either e b))
i (forall a. a -> Maybe a
Just Either e b
r)
            case Either e b
r of
                Right b
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
                Left  e
_ -> forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (n
n forall a. Ord a => a -> a -> Bool
< n
retries) do forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe (n, a))
i' (forall a. a -> Maybe a
Just (forall a. Enum a => a -> a
succ n
n, a
y))
        forall (t :: * -> *) a. Transport t => In t a -> a -> IO ()
yeet In t (Maybe (Either e b))
i forall a. Maybe a
Nothing