Safe Haskell | None |
---|---|
Language | Haskell2010 |
Common transport-agnostic functions for using Churro.
Synopsis
- runWait :: Transport t => Churro t Void Void -> IO ()
- runWaitList :: Transport t => Churro t Void o -> IO [o]
- run :: Transport t => Churro t Void Void -> IO (Async ())
- run' :: Transport t => Churro t i o -> IO (Async ())
- sourceSingleton :: Transport t => o -> Churro t Void o
- sourceList :: (Transport t, Foldable f) => f o -> Churro t Void o
- sourceIO :: Transport t => ((o -> IO ()) -> IO ()) -> Churro t Void o
- sources :: Transport t => [Source t o] -> Source t o
- sink :: Transport t => Churro t b Void
- sinkIO :: Transport t => (o -> IO ()) -> Churro t o Void
- sinkPrint :: (Transport t, Show a) => Churro t a Void
- process :: Transport t => (a -> IO b) -> Churro t a b
- processPrint :: (Transport t, Show b) => Churro t b b
- processDebug :: (Transport t, Show b) => String -> Churro t b b
- processN :: Transport t => (a -> IO [b]) -> Churro t a b
- justs :: Transport t => Churro t (Maybe a) a
- lefts :: Transport t => Churro t (Either a b) a
- rights :: Transport t => Churro t (Either a b) b
- takeC :: (Transport t, Integral n) => n -> Churro t a a
- dropC :: (Transport t, Integral n) => n -> Churro t a a
- filterC :: Transport t => (a -> Bool) -> Churro t a a
- mapN :: Transport t => (a -> [b]) -> Churro t a b
- delay :: Transport t => NominalDiffTime -> Churro t a a
- delayMicro :: Transport t => Int -> Churro t a a
- withPrevious :: Transport t => Churro t a (a, a)
- processRetry :: Transport t => Natural -> (i -> IO o) -> Churro t i o
- processRetry' :: (Exception e, Transport t) => Natural -> (i -> IO o) -> Churro t i (Either e o)
- processRetry'' :: (Transport t, Exception e, Ord n, Enum n) => n -> (a -> IO b) -> Churro t (n, a) (Either e b)
Documentation
The examples in this module require the following imports:
>>>
import Control.Churro.Transport
>>>
import Data.Time.Clock
Runners
runWait :: Transport t => Churro t Void Void -> IO () Source #
Automatically wait for a churro to complete.
runWaitList :: Transport t => Churro t Void o -> IO [o] Source #
Read the output of a Churro into a list.
Warning: This will block until the Churro terminates,
Accumulating items in memory.
Only use when you expect a finite amount of output.
Otherwise consider composing with a Sink and using runWait
.
>>>
runWaitListChan $ sourceList [0..4] >>> arr succ
[1,2,3,4,5]
run :: Transport t => Churro t Void Void -> IO (Async ()) Source #
Run a sourced and sinked (double-dipped) churro and return an async action representing the in-flight processes.
run' :: Transport t => Churro t i o -> IO (Async ()) Source #
Run any churro, there is no check that this was spawned with a source, or terminated with a sink.
This is unsafe, since the pipeline may not generate or consume in a predictable way.
Use run
instead unless you are confident you know what you're doing.
Library
Sources
sourceSingleton :: Transport t => o -> Churro t Void o Source #
A single items source.
>>>
runWaitChan $ sourceSingleton 13 >>> sinkPrint
13
Equivalent to pure
from Applicative
. Redefined here in case you're looking for a source!
>>>
runWaitChan $ pure 23 >>> sinkPrint
23
sourceList :: (Transport t, Foldable f) => f o -> Churro t Void o Source #
Create a source from a list of items, sending each down the churro independently.
>>>
runWaitChan $ sourceList [4,2] >>> sinkPrint
4 2
sourceIO :: Transport t => ((o -> IO ()) -> IO ()) -> Churro t Void o Source #
Create a source from an IO action that is passed a function to yield new items.
>>>
runWaitChan $ sourceIO (\cb -> cb 4 >> cb 2) >>> sinkPrint
4 2
sources :: Transport t => [Source t o] -> Source t o Source #
Combine a list of sources into a single source.
Sends individual items downstream without attempting to combine them.
>>>
runWaitChan $ sources [pure 1, pure 1] >>> sinkPrint
1 1
Sinks
sink :: Transport t => Churro t b Void Source #
Consume all items with no additional effects.
TODO: Decide if we should use some kind of nf
evaluation here to force items.
>>>
runWaitChan $ pure 1 >>> process print >>> sink
1
sinkIO :: Transport t => (o -> IO ()) -> Churro t o Void Source #
Consume a churro with an IO process.
>>>
runWaitChan $ pure 1 >>> sinkIO (\x -> print "hello" >> print (succ x))
"hello" 2
sinkPrint :: (Transport t, Show a) => Churro t a Void Source #
Consume and print each item. Used in many examples, but not much use outside debugging!
>>>
runWaitChan $ pure "hi" >>> sinkPrint
"hi"
Churros
process :: Transport t => (a -> IO b) -> Churro t a b Source #
Process each item with an IO action. Acts as a one-to-one process.
>>>
runWaitChan $ pure "hi" >>> process (\x -> print x >> return (reverse x)) >>> sinkPrint
"hi" "ih"
processDebug :: (Transport t, Show b) => String -> Churro t b b Source #
Print each item with an additional debugging label.
processN :: Transport t => (a -> IO [b]) -> Churro t a b Source #
Process each item with an IO action and potentially yield many items as a result. Acts as a one-to-many process.
>>>
runWaitChan $ pure 1 >>> processN (\x -> print (show x) >> return [x, succ x]) >>> sinkPrint
"1" 1 2
justs :: Transport t => Churro t (Maybe a) a Source #
Extract xs from (Just x)s. Similar to catMaybes
.
>>>
runWaitChan $ sourceList [Just 1, Nothing, Just 3] >>> justs >>> sinkPrint
1 3
lefts :: Transport t => Churro t (Either a b) a Source #
Extract ls from (Left l)s.
>>>
runWaitChan $ sourceList [Left 1, Right 2, Left 3] >>> lefts >>> sinkPrint
1 3
rights :: Transport t => Churro t (Either a b) b Source #
Extract rs from (Right r)s.
>>>
runWaitChan $ sourceList [Left 1, Right 2, Left 3] >>> rights >>> sinkPrint
2
takeC :: (Transport t, Integral n) => n -> Churro t a a Source #
Take and yield the first n items.
WARNING: This is intended to terminate upstream once the items have been consumed downstream, but there is a bug preventing this from working at present!
>>>
runWaitChan $ sourceList [1..100] >>> takeC 2 >>> sinkPrint
1 2
This implementation explicitly stops propagating when the Churro completes, although this could be handled by downstream consumer composition terminating the producer and just using replicateM.
dropC :: (Transport t, Integral n) => n -> Churro t a a Source #
Drop the first n items.
>>>
runWaitChan $ sourceList [1..4] >>> dropC 2 >>> sinkPrint
3 4
filterC :: Transport t => (a -> Bool) -> Churro t a a Source #
Filter items according to a predicate.
>>>
runWaitChan $ sourceList [1..5] >>> filterC (> 3) >>> sinkPrint
4 5
mapN :: Transport t => (a -> [b]) -> Churro t a b Source #
Run a pure function over items, producing multiple outputs.
>>>
runWaitChan $ pure 9 >>> mapN (\x -> [x,x*10]) >>> sinkPrint
9 90
delay :: Transport t => NominalDiffTime -> Churro t a a Source #
Delay items from being sent downstream.
Note: NominalDiffTime's Num instance interprets literals as seconds.
>>>
let sinkTimeCheck = process (const getCurrentTime) >>> withPrevious >>> arr (\(x,y) -> diffUTCTime y x > 0.01) >>> sinkPrint
>>>
runWaitChan $ sourceList [1..2] >>> sinkTimeCheck
False
>>>
runWaitChan $ sourceList [1..2] >>> delay 0.1 >>> sinkTimeCheck
True
delayMicro :: Transport t => Int -> Churro t a a Source #
Delay items in microseconds. Works the same way as delay
.
withPrevious :: Transport t => Churro t a (a, a) Source #
Passes consecutive pairs of items downstream.
>>>
runWaitChan $ sourceList [1,2,3] >>> withPrevious >>> sinkPrint
(1,2) (2,3)
processRetry :: Transport t => Natural -> (i -> IO o) -> Churro t i o Source #
Requeue an item if it fails. Swallows exceptions and gives up after retries.
Note: Process will always try once so if retries = 1 then a failing process will execute twice.
The item is requeues on the input side of the churro, so if other items have been passed in they will appear first!
Catches all SomeException
s. If you wish to narrow the execption type, consider
using the processRetry' variant composed with rights
.
Note: There is an edgecase with Chan transport where a queued retry may not execute if a source completes and finalises before the item is requeued. A different transport type may allow a modified retry function that requeues differently.
>>>
:{
let prog = processRetry 1 flakeyThing flakeyThing x = do if x > 1 then print "GT" >> return x else print "LTE" >> error ("oops! " <> show x) in runWaitChan $ sourceList [1,2] >>> delay 0.1 >>> prog >>> sinkPrint :} "LTE" "LTE" "GT" 2
processRetry' :: (Exception e, Transport t) => Natural -> (i -> IO o) -> Churro t i (Either e o) Source #
Raw version of processRetry
. -- Polymorphic over exception type and forwards errors.