jet-stream-1.0.0.0: Yet another streaming library.
Safe HaskellNone
LanguageHaskell2010

Jet

Description

A streaming library build around the Jet type, which behaves as a kind of "effectful list".

For example, here's a way to print the first ten lines of a file to stdout:

>>> action = J.jet @Line (File "foo.txt") & J.limit 10 & J.sink stdout

The code is using the jet function to create a Jet of Line values (read using the default system encoding). jet is part of the JetSource helper typeclass. Meanwhile, sink is part of the complementary JetSink typeclass.

Note also the use of (&), which is simply a flipped ($). I've found it useful to define forward-chained pipelines.

If instead of printing to stdout we wanted to store the lines in a list:

>>> action = J.jet @Line (File "foo.txt") & J.limit 10 & J.toList

Imagine we wanted to print the combined lines of two files, excepting the first 10 lines of each:

>>> :{
action = 
 do file <- J.each [File "foo.txt", File "bar.txt"]
    jet @Line file & J.drop 10
 & J.sink stdout
:}

Here we are making use of the Monad instance of Jet, which resembles that of conventional lists. We are mixing monadic do-blocks and conventional function application. Also we use each, a function which creates a Jet out of any Foldable container.

Jets are Monoids too, so we could have written:

>>> action = [File "foo.txt", File "bar.txt"] & foldMap (J.drop 10 . J.jet @Line) & J.sink stdout

Here's an interesting use of sink. Imagine we have a big utf8-encoded file and we want to split it into a number of files of no more than 100000 bytes each, with the extra condition that we don't want to split any line between two files. We could do it like this:

>>> :{
action =
   let buckets = BoundedSize 100000 . File . ("result.txt." ++) . show <$> [1..]
    in jet (File "12999.txt.utf-8") 
       & J.decodeUtf8 
       & J.lines 
     <&> (\line -> J.lineToUtf8 line <> J.textToUtf8 J.newline) 
       & J.sink buckets
:}       

In this example we aren't using the default system encoding: instead of that, we are reading bytes, explicity decoding them with decodeUtf8 and finding lines. Then we create a ByteBundle for each Line to signify that it shouldn't be broken, and end by writing to a sequence of BoundedSize Files.

Synopsis

The Jet type

data Jet a Source #

A Jet is a sequence of values produced through IO effects.

It allows consuming the elements as they are produced and doesn't force them to be present in memory all at the same time, unlike functions like replicateM from base.

Instances

Instances details
Monad Jet Source #

Similar to the instance for pure lists, that does search.

>>> :{
do string <- J.each ["ab","cd"]
   J.each string
&
J.toList
:}
"abcd"
Instance details

Defined in Jet.Internal

Methods

(>>=) :: Jet a -> (a -> Jet b) -> Jet b

(>>) :: Jet a -> Jet b -> Jet b

return :: a -> Jet a

Functor Jet Source #

Maps over the yielded elements. (<&>) can be used to put the function last.

>>> J.each "aa" <&> succ & J.toList
"bb"
Instance details

Defined in Jet.Internal

Methods

fmap :: (a -> b) -> Jet a -> Jet b

(<$) :: a -> Jet b -> Jet a

MonadFail Jet Source #

A failed pattern-match in a do-block produces mzero.

>>> :{
do Just c <- J.each [Nothing, Just 'a', Nothing, Just 'b']
   pure c
& J.toList
:}
"ab"
Instance details

Defined in Jet.Internal

Methods

fail :: String -> Jet a

Applicative Jet Source #

Similar to the instance for pure lists, that generates combinations.

>>> (,) <$> J.each "ab" <*> J.each "cd" & J.toList
[('a','c'),('a','d'),('b','c'),('b','d')]
Instance details

Defined in Jet.Internal

Methods

pure :: a -> Jet a

(<*>) :: Jet (a -> b) -> Jet a -> Jet b

liftA2 :: (a -> b -> c) -> Jet a -> Jet b -> Jet c

(*>) :: Jet a -> Jet b -> Jet b

(<*) :: Jet a -> Jet b -> Jet a

Alternative Jet Source #

Same as Monoid.

Instance details

Defined in Jet.Internal

Methods

empty :: Jet a

(<|>) :: Jet a -> Jet a -> Jet a

some :: Jet a -> Jet [a]

many :: Jet a -> Jet [a]

MonadPlus Jet Source #

Same as Monoid

Instance details

Defined in Jet.Internal

Methods

mzero :: Jet a

mplus :: Jet a -> Jet a -> Jet a

MonadIO Jet Source #
>>> liftIO (putStrLn "foo") <> liftIO (putStrLn "bar") & J.toList
foo
bar
[(),()]
Instance details

Defined in Jet.Internal

Methods

liftIO :: IO a -> Jet a

Semigroup (Jet a) Source #

Jet concatenation.

>>> J.each "ab" <> J.each "cd" & J.toList
"abcd"
Instance details

Defined in Jet.Internal

Methods

(<>) :: Jet a -> Jet a -> Jet a

sconcat :: NonEmpty (Jet a) -> Jet a

stimes :: Integral b => b -> Jet a -> Jet a

Monoid (Jet a) Source #

mempty is the empty Jet.

>>> mempty <> J.each "ab" <> mempty & J.toList
"ab"
Instance details

Defined in Jet.Internal

Methods

mempty :: Jet a

mappend :: Jet a -> Jet a -> Jet a

mconcat :: [Jet a] -> Jet a

run :: forall a s. Jet a -> (s -> Bool) -> (s -> a -> IO s) -> s -> IO s Source #

Go through the elements produced by a Jet, while threading an state s and possibly performing some effect.

The caller is the one who chooses the type of the state s, and must pass an initial value for it. The state is kept in weak-head normal form.

The caller must also provide a predicate on the state that informs the Jet when to stop producing values: whenever the predicate returns True.

consume :: forall a s. Jet a -> (s -> a -> IO s) -> s -> IO s Source #

Like run, but always goes through all elements produced by the Jet.

Equivalent to run (const False).

drain :: Sink a Source #

Go through the Jet only for the IO effects, discarding all yielded elements.

Building Jets

each :: forall a f. Foldable f => f a -> Jet a Source #

Build a Jet from any Foldable container

>>> J.each [True,False] & J.toList
[True,False]

repeat :: a -> Jet a Source #

>>> J.repeat True & J.take 2 & J.toList
[True,True]

repeatIO :: IO a -> Jet a Source #

>>> J.repeatIO (putStrLn "hi" *> pure True) & J.take 2 & J.toList
hi
hi
[True,True]

replicate :: Int -> a -> Jet a Source #

>>> J.replicate 2 True & J.toList
[True,True]

replicateIO :: Int -> IO a -> Jet a Source #

>>> J.replicateIO 2 (putStrLn "hi" *> pure True) & J.toList
hi
hi
[True,True]

Don't confuse this with Control.Monad.replicateM :: Int -> Jet a -> Jet [a] which has a combinatorial behavior.

iterate :: (a -> a) -> a -> Jet a Source #

>>> J.iterate succ (1 :: Int) & J.take 2 & J.toList
[1,2]

iterateIO :: (a -> IO a) -> a -> Jet a Source #

>>> J.iterateIO (\x -> putStrLn "hi" *> pure (succ x)) (1 :: Int) & J.take 2 & J.toList
hi
[1,2]

unfold :: (b -> Maybe (a, b)) -> b -> Jet a Source #

>>> J.unfold (\case [] -> Nothing ; c : cs -> Just (c,cs)) "abc" & J.toList
"abc"

unfoldIO :: (b -> IO (Maybe (a, b))) -> b -> Jet a Source #

>>> :{
J.unfoldIO (\x -> do putStrLn "hi" 
                     pure $ case x of 
                        [] -> Nothing 
                        c : cs -> Just (c,cs)) 
           "abc" 
& J.toList
:}                             
hi
hi
hi
hi
"abc"

untilEOF :: (handle -> IO Bool) -> (handle -> IO a) -> handle -> Jet a Source #

>>> j = J.untilEOF System.IO.hIsEOF System.IO.hGetLine :: Handle -> Jet String

untilNothing :: IO (Maybe a) -> Jet a Source #

>>> :{
do ref <- newIORef "abc"
   let pop = atomicModifyIORef ref (\case [] -> ([], Nothing)
                                          x : xs -> (xs, Just x)) 
   J.untilNothing pop & J.toList                                       
:}
"abc"

List-like functions

In these functions, the Jet is working as a kind of "effectful list". The effects which produce the elements, and the effects with which we transform and consume the elements, are always IO effects.

Don't confuse these functions with similarly named functions from Traversable or Monad, for which Jet doesn't work as the "container", but as the Applicative/Monadic effect itself.

toList :: Jet a -> IO [a] Source #

Convert to a regular list. This breaks streaming.

>>> J.each "abc" & J.toList
"abc"

Alternatively, we can use fold in combination with list form the foldl library:

>>> L.purely (J.fold (J.each "abc")) L.list
"abc"

which is more verbose, but more composable.

length :: Jet a -> IO Int Source #

Returns the number of elements yielded by the Jet, exhausting it in the process.

>>> J.each "abc" & J.length
3

Alternatively, we can use fold in combination with length form the foldl library:

>>> L.purely (J.fold (J.each "abc")) L.length
3

which is more verbose, but more composable.

traverse :: (a -> IO b) -> Jet a -> Jet b Source #

Apply an effectful transformation to each element in a Jet.

>>> :{
J.each "abc" 
& J.traverse (\c -> let c' = succ c in putStrLn ([c] ++ " -> " ++ [c']) *> pure c')
& J.toList
:}
a -> b
b -> c
c -> d
"bcd"

traverse_ :: (a -> IO b) -> Sink a Source #

for :: Jet a -> (a -> IO b) -> Jet b Source #

for_ :: Jet a -> (a -> IO b) -> IO () Source #

filter :: (a -> Bool) -> Jet a -> Jet a Source #

>>> J.each "abc" & J.filter (=='a') & J.toList
"a"

filterIO :: (a -> IO Bool) -> Jet a -> Jet a Source #

take :: Int -> Jet a -> Jet a Source #

>>> J.each "abc" & J.take 2 & J.toList
"ab"

limit :: Int -> Jet a -> Jet a Source #

Synonym for take.

takeWhile :: (a -> Bool) -> Jet a -> Jet a Source #

>>> J.each [1..] & J.takeWhile (<5) & J.toList
[1,2,3,4]

takeWhileIO :: (a -> IO Bool) -> Jet a -> Jet a Source #

drop :: Int -> Jet a -> Jet a Source #

>>> J.each "abc" & J.drop 2 & J.toList
"c"

dropWhile :: (a -> Bool) -> Jet a -> Jet a Source #

>>> J.each [1..5] & J.dropWhile (<3) & J.toList
[3,4,5]

dropWhileIO :: (a -> IO Bool) -> Jet a -> Jet a Source #

mapAccum :: (a -> b -> (a, c)) -> a -> Jet b -> Jet c Source #

Behaves like a combination of fmap and foldl; it applies a function to each element of a structure passing an accumulating parameter from left to right.

The resulting Jet has the same number of elements as the original one.

Unlike mapAccumL, it doesn't make the final state available.

>>> J.each [1,2,3,4] & J.mapAccum (\a b -> (a + b,a)) 0 & J.toList
[0,1,3,6]

mapAccumIO :: (a -> b -> IO (a, c)) -> a -> Jet b -> Jet c Source #

intersperse :: a -> Jet a -> Jet a Source #

>>> J.each "abc" & J.intersperse '-' & J.toList
"a-b-c"

Zips

It's not possible to zip two Jets together. But Jets can be zipped with pure lists, or with lists of IO actions.

zip :: Foldable f => f a -> Jet b -> Jet (a, b) Source #

>>> J.each "abc" & J.zip [1..] & J.toList
[(1,'a'),(2,'b'),(3,'c')]
>>> J.each [1..] & J.zip "abc" & J.toList
[('a',1),('b',2),('c',3)]

zipWith :: Foldable f => (a -> b -> c) -> f a -> Jet b -> Jet c Source #

zipIO :: Foldable f => f (IO a) -> Jet b -> Jet (a, b) Source #

zipWithIO :: Foldable f => (a -> b -> IO c) -> f (IO a) -> Jet b -> Jet c Source #

Zips a list of IO actions with a Jet, where the combining function can also have effects.

If the list of actions is exhausted, the Jet stops:

>>> J.each [1..] <&> show & zipWithIO (\c1 c2 -> putStrLn (c1 ++ c2)) [pure "a", pure "b"] & J.toList
a1
b2
[(),()]

Control operations

Some Jets must allocate resources to do its work. For example, opening a text file and yielding its lines. These resources must be promptly released when the Jet itself finishes or the consumers stops it (for example, by using limit on the Jet). They must also be released in the face of exceptions.

Here are various control operations like those from Control.Exception, but lifted to work on Jets.

When put in a do-block, these operations "protect" every statement in the do-block below the operation itself.

withFile :: FilePath -> IOMode -> Jet Handle Source #

Opens a file and makes the Handle available to all following statements in the do-block.

Notice that it's often simpler to use the JetSource (for reading) and JetSink (for writing) instances of File.

bracket Source #

Arguments

:: forall a b. IO a

allocator

-> (a -> IO b)

finalizer

-> Jet a 
>>> :{
do r <- J.bracket (putStrLn "allocating" *> pure "foo") (\r -> putStrLn $ "deallocating " ++ r)
   liftIO $ putStrLn $ "using resource " ++ r
& drain
:}
allocating
using resource foo
deallocating foo

bracket_ Source #

Arguments

:: forall a b. IO a

allocator

-> IO b

finalizer

-> Jet () 

bracketOnError Source #

Arguments

:: forall a b. IO a

allocator

-> (a -> IO b)

finalizer

-> Jet a 

finally :: IO a -> Jet () Source #

Notice how the finalizer runs even when we limit the Jet:

>>> :{
do J.finally (putStrLn "hi") -- protects statements below
   liftIO (putStrLn "hey")
   J.each "abc" 
& J.limit 2 
& J.toList
:}
hey
hi
"ab"

But if the protected Jet is not consumed at all, the finalizer might not run.

>>> :{
do J.finally (putStrLn "hi") -- protects statements below 
   liftIO (putStrLn "hey") 
   J.each "abc" 
& J.limit 0 
& J.toList
:}
""

onException :: IO a -> Jet () Source #

Building your own

These are for advanced usage.

Sometimes we want to lift some existing resource-handling operation not already covered, one that works with plain IO values. These functions help with that.

They have a linear type to statically forbid "funny" operations like \x -> x *> x that disrupt proper threading of the consumer state.

control :: forall resource. (forall x. (resource -> IO x) -> IO x) -> Jet resource Source #

Lift a control operation (like bracket) for which the callback uses the allocated resource.

unsafeCoerceControl :: forall resource. (forall x. (resource -> IO x) -> IO x) -> forall x. (resource -> IO x) -> IO x Source #

"morally", all control operations compatible with this library should execute the callback only once, which means that they should have a linear type. But because linear types are not widespread, they usually are given a less precise non-linear type. If you know what you are doing, use this function to give them a linear type.

control_ :: (forall x. IO x -> IO x) -> Jet () Source #

Lift a control operation (like finally) for which the callback doesn't use the allocated resource.

unsafeCoerceControl_ :: (forall x. IO x -> IO x) -> forall x. IO x -> IO x Source #

Line unsafeCoerceControl, for when the callback doesn't use the allocated resource.

Folding Jets

These functions can be used directly, but they're also useful for interfacing with the Applicative folds from the foldl library, with the help of functions like Control.Foldl.purely and Control.Foldl.impurely.

Applicative folds are useful because they let you run multiple "analyses" of a Jet while going through it only once.

fold :: Jet a -> (s -> a -> s) -> s -> (s -> r) -> IO r Source #

>>> L.purely (J.fold (J.each "abc")) ((,) <$> L.list <*> L.length)
("abc",3)

foldIO :: Jet a -> (s -> a -> IO s) -> IO s -> (s -> IO r) -> IO r Source #

>>> L.impurely (J.foldIO (J.each "abc")) (L.FoldM (\() c -> putStrLn [c]) (pure ()) pure *> L.generalize L.length)
a
b
c
3

Byte utils

bytes :: ChunkSize -> Handle -> Jet ByteString Source #

data ChunkSize Source #

Instances

Instances details
Show ChunkSize Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> ChunkSize -> ShowS

show :: ChunkSize -> String

showList :: [ChunkSize] -> ShowS

data ByteBundle Source #

A sequence of bytes that we might want to keep together.

Instances

Instances details
Show ByteBundle Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> ByteBundle -> ShowS

show :: ByteBundle -> String

showList :: [ByteBundle] -> ShowS

Semigroup ByteBundle Source # 
Instance details

Defined in Jet.Internal

Methods

(<>) :: ByteBundle -> ByteBundle -> ByteBundle

sconcat :: NonEmpty ByteBundle -> ByteBundle

stimes :: Integral b => b -> ByteBundle -> ByteBundle

Monoid ByteBundle Source # 
Instance details

Defined in Jet.Internal

JetSink ByteBundle Handle Source # 
Instance details

Defined in Jet.Internal

Methods

sink :: Handle -> Sink ByteBundle Source #

JetSink ByteBundle [BoundedSize File] Source #

Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one.

Each ByteBundle value is garanteed to be written to a single file. If a file turns out to be too small for even a single ByteBundle value, a BucketOverflow exception is thrown.

Instance details

Defined in Jet.Internal

bundle :: Foldable f => f ByteString -> ByteBundle Source #

Constructs a ByteBundle out of the bytes of some Foldable container.

bundleLength :: ByteBundle -> Int Source #

Length in bytes.

bundleBytes :: ByteBundle -> Jet ByteString Source #

Text and line utils

decodeUtf8 :: Jet ByteString -> Jet Text Source #

encodeUtf8 :: Jet Text -> Jet ByteString Source #

data Line where Source #

A line of text.

While it is guaranteed that the Lines coming out of the lines function do not contain newlines, that invariant is not otherwise enforced.

Bundled Patterns

pattern Line :: Text -> Line

Unidirectional pattern that allows converting a Line into a Text during pattern-matching.

Instances

Instances details
Eq Line Source # 
Instance details

Defined in Jet.Internal

Methods

(==) :: Line -> Line -> Bool

(/=) :: Line -> Line -> Bool

Ord Line Source # 
Instance details

Defined in Jet.Internal

Methods

compare :: Line -> Line -> Ordering

(<) :: Line -> Line -> Bool

(<=) :: Line -> Line -> Bool

(>) :: Line -> Line -> Bool

(>=) :: Line -> Line -> Bool

max :: Line -> Line -> Line

min :: Line -> Line -> Line

Show Line Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> Line -> ShowS

show :: Line -> String

showList :: [Line] -> ShowS

IsString Line Source # 
Instance details

Defined in Jet.Internal

Methods

fromString :: String -> Line

Semigroup Line Source # 
Instance details

Defined in Jet.Internal

Methods

(<>) :: Line -> Line -> Line

sconcat :: NonEmpty Line -> Line

stimes :: Integral b => b -> Line -> Line

Monoid Line Source # 
Instance details

Defined in Jet.Internal

Methods

mempty :: Line

mappend :: Line -> Line -> Line

mconcat :: [Line] -> Line

JetSink Line Handle Source #

Uses the default system locale. Adds newlines.

Instance details

Defined in Jet.Internal

Methods

sink :: Handle -> Sink Line Source #

JetSource Line Handle Source #

Uses the default system locale.

Instance details

Defined in Jet.Internal

Methods

jet :: Handle -> Jet Line Source #

lines :: Jet Text -> Jet Line Source #

unlines :: Jet Line -> Jet Text Source #

newline :: Text Source #

Data.Text.singleton '\n'

lineToText :: Line -> Text Source #

Converts a Line back to text, without adding the newline.

lineToUtf8 :: Line -> ByteBundle Source #

Converts a Line to an utf8-encdoed ByteBundle, without adding the newline.

textToLine :: Text -> Line Source #

stringToLine :: String -> Line Source #

lineContains :: Text -> Line -> Bool Source #

lineBeginsWith :: Text -> Line -> Bool Source #

prefixLine :: Text -> Line -> Line Source #

Adds the Text to the beginning of the Line.

Concurrency

traverseConcurrently :: (PoolConf -> PoolConf) -> (a -> IO b) -> Jet a -> Jet b Source #

Process the values yielded by the upstream Jet in a concurrent way, and return the results in the form of another Jet as they are produced.

NB: this function might scramble the order of the returned values. Right now there isn't a function for unscrambling them.

>>> :{
 J.each [(3,'a'), (2,'b'), (1,'c')]
 & J.traverseConcurrently (numberOfWorkers 10) (\(d,c) -> threadDelay (d*1e5) *> pure c)
 & J.toList
:}
"cba"

What happens if we limit the resulting Jet and we reach that limit, or if we otherwise stop consuming the Jet before it gets exhausted? In those cases, all pending IO b tasks are cancelled.

>>> :{
 J.each [(9999,'a'), (2,'b'), (1,'c')]
 & J.traverseConcurrently (numberOfWorkers 10) (\(d,c) -> threadDelay (d*1e5) *> pure c)
 & J.take 2
 & J.toList
:}
"cb"

data PoolConf Source #

Configuration record for the worker pool.

Instances

Instances details
Show PoolConf Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> PoolConf -> ShowS

show :: PoolConf -> String

showList :: [PoolConf] -> ShowS

defaults :: a -> a Source #

An alias for id. Useful with functions like traverseConcurrently and throughProcess, for which it means "use the default configuration".

inputQueueSize :: Int -> PoolConf -> PoolConf Source #

Size of the waiting queue into the worker pool. The default is 1.

numberOfWorkers :: Int -> PoolConf -> PoolConf Source #

The size of the worker pool. The default is 1.

outputQueueSize :: Int -> PoolConf -> PoolConf Source #

Size of the queue holding results out of the working pool before they are yielded downstream. The default is 1.

Process invocation

throughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet ByteString -> Jet ByteString Source #

Feeds the upstream Jet to an external process' stdin and returns the process' stdout as another Jet. The feeding and reading of the standard streams is done concurrently in order to avoid deadlocks.

What happens if we limit the resulting Jet and we reach that limit, or if we otherwise stop consuming the Jet before it gets exhausted? In those cases, the external process is promptly terminated.

linesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line Source #

Like throughProcess, but feeding and reading Lines using the default system encoding.

>>> :{
J.each ["aaa","bbb","ccc"]
<&> J.stringToLine
& linesThroughProcess defaults (shell "cat")
& J.toList
:}
["aaa","bbb","ccc"]

An example of not reading all the lines from a long-lived process that gets cancelled:

>>> :{
mempty
& linesThroughProcess defaults (shell "{ printf \"aaa\\nbbb\\nccc\\n\" ; sleep infinity ; }")
& J.limit 2
& J.toList
:}
["aaa","bbb"]

utf8LinesThroughProcess :: (ProcConf -> ProcConf) -> CreateProcess -> Jet Line -> Jet Line Source #

Like throughProcess, but feeding and reading Lines encoded in UTF8.

type ProcConf = ProcConf_ ByteString ByteString Source #

Configuration record with some extra options in addition to those in CreateProcess.

bufferStdin :: Bool -> ProcConf -> ProcConf Source #

Should we buffer the process' stdin? Usually should be True for interactive scenarios.

By default, False.

readFromStderr :: (Handle -> IO ()) -> ProcConf -> ProcConf Source #

Sets the function that reads a single line of output from the process stderr. It's called repeatedly until stderr is exhausted. The reads are done concurrently with the reads from stdout.

By default, lines of text are read using the system's default encoding.

This is a good place to throw an exception if we don't like what comes out of stderr.

handleExitCode :: (ExitCode -> IO ()) -> ProcConf -> ProcConf Source #

Sets the function that handles the final ExitCode of the process.

The default behavior is to throw the ExitCode as an exception if it's not a success.

Conversion helpers

class JetSource a source where Source #

Helper multi-parameter typeclass for creating Jet values out of a variety of common sources.

Because there's no functional dependency, sometimes we need to use TypeApplications to give the compiler a hint about the type of elements we want to produce. For example, here we want Lines and not, say, ByteStrings:

>>> action = J.jet @Line (File "foo.txt") & J.sink J.stdout

Methods

jet :: source -> Jet a Source #

Instances

Instances details
JetSource a Handle => JetSource a File Source # 
Instance details

Defined in Jet.Internal

Methods

jet :: File -> Jet a Source #

JetSource ByteString Handle Source # 
Instance details

Defined in Jet.Internal

Methods

jet :: Handle -> Jet ByteString Source #

JetSource Line Handle Source #

Uses the default system locale.

Instance details

Defined in Jet.Internal

Methods

jet :: Handle -> Jet Line Source #

class JetSink a target where Source #

Helper multi-parameter typeclass for creating Jet-consuming functions out of a variety of common destinations.

>>> J.each ["aaa","bbb","ccc"] <&> J.stringToLine & J.sink J.stdout
aaa
bbb
ccc

Methods

sink :: target -> Sink a Source #

Instances

Instances details
JetSink a Handle => JetSink a File Source # 
Instance details

Defined in Jet.Internal

Methods

sink :: File -> Sink a Source #

JetSink Text Handle Source #

Uses the default system locale.

Instance details

Defined in Jet.Internal

Methods

sink :: Handle -> Sink Text Source #

JetSink ByteString Handle Source # 
Instance details

Defined in Jet.Internal

Methods

sink :: Handle -> Sink ByteString Source #

JetSink Line Handle Source #

Uses the default system locale. Adds newlines.

Instance details

Defined in Jet.Internal

Methods

sink :: Handle -> Sink Line Source #

JetSink ByteBundle Handle Source # 
Instance details

Defined in Jet.Internal

Methods

sink :: Handle -> Sink ByteBundle Source #

JetSink ByteString [BoundedSize File] Source #

Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one.

Instance details

Defined in Jet.Internal

Methods

sink :: [BoundedSize File] -> Sink ByteString Source #

JetSink ByteBundle [BoundedSize File] Source #

Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one.

Each ByteBundle value is garanteed to be written to a single file. If a file turns out to be too small for even a single ByteBundle value, a BucketOverflow exception is thrown.

Instance details

Defined in Jet.Internal

type Sink a = Jet a -> IO () Source #

A function that consumes a Jet totally or partially, without returning a result.

newtype File Source #

FilePaths are plain strings. This newtype provides a small measure of safety over them.

Constructors

File 

Fields

Instances

Instances details
Show File Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> File -> ShowS

show :: File -> String

showList :: [File] -> ShowS

JetSink a Handle => JetSink a File Source # 
Instance details

Defined in Jet.Internal

Methods

sink :: File -> Sink a Source #

JetSource a Handle => JetSource a File Source # 
Instance details

Defined in Jet.Internal

Methods

jet :: File -> Jet a Source #

JetSink ByteString [BoundedSize File] Source #

Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one.

Instance details

Defined in Jet.Internal

Methods

sink :: [BoundedSize File] -> Sink ByteString Source #

JetSink ByteBundle [BoundedSize File] Source #

Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one.

Each ByteBundle value is garanteed to be written to a single file. If a file turns out to be too small for even a single ByteBundle value, a BucketOverflow exception is thrown.

Instance details

Defined in Jet.Internal

data BoundedSize x Source #

The maximum size in bytes of some destination into which we write the bytes produced by a Jet.

Constructors

BoundedSize Int x 

Instances

Instances details
JetSink ByteString [BoundedSize File] Source #

Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one.

Instance details

Defined in Jet.Internal

Methods

sink :: [BoundedSize File] -> Sink ByteString Source #

JetSink ByteBundle [BoundedSize File] Source #

Distributes incoming bytes through a sequence of files. Once a file is full, we start writing the next one.

Each ByteBundle value is garanteed to be written to a single file. If a file turns out to be too small for even a single ByteBundle value, a BucketOverflow exception is thrown.

Instance details

Defined in Jet.Internal

Read x => Read (BoundedSize x) Source # 
Instance details

Defined in Jet.Internal

Methods

readsPrec :: Int -> ReadS (BoundedSize x)

readList :: ReadS [BoundedSize x]

readPrec :: ReadPrec (BoundedSize x)

readListPrec :: ReadPrec [BoundedSize x]

Show x => Show (BoundedSize x) Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> BoundedSize x -> ShowS

show :: BoundedSize x -> String

showList :: [BoundedSize x] -> ShowS

data BucketOverflow Source #

Exception thrown when we try to write too much data in a size-bounded destination.

Constructors

BucketOverflow 

Instances

Instances details
Show BucketOverflow Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> BucketOverflow -> ShowS

show :: BucketOverflow -> String

showList :: [BucketOverflow] -> ShowS

Exception BucketOverflow Source # 
Instance details

Defined in Jet.Internal

Methods

toException :: BucketOverflow -> SomeException

fromException :: SomeException -> Maybe BucketOverflow

displayException :: BucketOverflow -> String

Some complicated stuff

I didn't manage to make this stuff simpler.

recast :: forall a b c. Splitter a b -> Combiners b c -> Jet a -> Jet c Source #

This is a complex, unwieldly, yet versatile function. It can be used to define grouping operations, but also for decoding and other purposes.

Groups are delimited in the input Jet using the Splitter, and the contents of those groups are then combined using Combiners. The result of each combiner is yielded by the return Jet.

If the list of combiners is finite and becomes exhausted, we stop splitting and the return Jet stops.

type Splitter a b = MealyIO a (SplitStepResult b) Source #

Delimits groups in the values yielded by a Jet, and can also transform those values.

data MealyIO a b where Source #

A Mealy machine with an existentially hidden state.

Very much like a FoldM IO from the foldl library, but it emits an output at each step, not only at the end.

Constructors

MealyIO 

Fields

  • :: (s -> a -> IO (b, s))

    The step function which threads the state.

  • -> (s -> IO b)

    The final output, produced from the final state.

  • -> IO s

    An action that produces the initial state.

  • -> MealyIO a b
     

Instances

Instances details
Functor (MealyIO a) Source # 
Instance details

Defined in Jet.Internal

Methods

fmap :: (a0 -> b) -> MealyIO a a0 -> MealyIO a b

(<$) :: a0 -> MealyIO a b -> MealyIO a a0

data SplitStepResult b Source #

For each value coming from upstream, what has the Splitter learned?

  • Perhaps we should continue some group we have already started in a previous step.
  • Perhaps we have found entire groups that we should emit in one go, groups we know are already complete.
  • Perhaps we should start a new group that will continue in the next steps.

Constructors

SplitStepResult 

Fields

  • continuationOfPreviouslyStartedGroup :: [b]

    The continued group will be "closed" if in the current step we emit an entire group or we begin a new group.

    INVARIANT: we should only continue a group if we have already opened a "new one" with one or more elements in an earlier step.

  • entireGroups :: [[b]]

    It's ok if the groups we find are empty.

  • startOfNewGroup :: [b]

    INVARIANT: when we are in the final step, we should not yield elements for the beginning of a new one.

Instances

Instances details
Functor SplitStepResult Source # 
Instance details

Defined in Jet.Internal

Methods

fmap :: (a -> b) -> SplitStepResult a -> SplitStepResult b

(<$) :: a -> SplitStepResult b -> SplitStepResult a

Show b => Show (SplitStepResult b) Source # 
Instance details

Defined in Jet.Internal

Methods

showsPrec :: Int -> SplitStepResult b -> ShowS

show :: SplitStepResult b -> String

showList :: [SplitStepResult b] -> ShowS

Semigroup (SplitStepResult b) Source # 
Instance details

Defined in Jet.Internal

Monoid (SplitStepResult b) Source # 
Instance details

Defined in Jet.Internal

bytesOverBuckets :: [Int] -> Splitter ByteString ByteString Source #

Splits a stream of bytes into groups bounded by maximum byte sizes. When one group "fills up", the next one is started.

When the list of buckets sizes is exhausted, all incoming bytes are put into the same unbounded group.

Useful in combination with recast.

byteBundlesOverBuckets :: [Int] -> Splitter ByteBundle ByteString Source #

Splits a stream of ByteBundles into groups bounded by maximum byte sizes. Bytes belonging to the same ByteBundle are always put in the same group. When one group "fills up", the next one is started.

When the list of buckets sizes is exhausted, all incoming bytes are put into the same unbounded group.

Useful in combination with recast.

THROWS:

  • BucketOverflow exception if the size bound of a group turns out to be too small for holding even a single ByteBundle value.

data Combiners a b Source #

A Combiners value knows how to process a sequence of groups, while keeping a (existentially hidden) state for each group.

Very much like a FoldM IO from the foldl library, but "restartable" with a list of starting states.

For converting one into the other, this function should do the trick:

\(L.FoldM step allocator coda) -> combiners step coda (Prelude.repeat allocator)

Instances

Instances details
Functor (Combiners a) Source # 
Instance details

Defined in Jet.Internal

Methods

fmap :: (a0 -> b) -> Combiners a a0 -> Combiners a b

(<$) :: a0 -> Combiners a b -> Combiners a a0

combiners Source #

Arguments

:: forall s a b r. (s -> a -> IO s)

Step function that threads the state s.

-> (s -> IO b)

Coda invoked when a group closes.

-> [IO s]

Actions that produce the initial states s for processing each group.

-> Combiners a b 

Constructor for Combiners values.

withCombiners Source #

Arguments

:: forall h s a b r. (h -> s -> a -> IO s)

Step function that accesses the resource h and threads the state s.

-> (h -> s -> IO b)

Coda invoked when a group closes.

-> (h -> IO ())

Finalizer to run after each coda, and also in the case of an exception.

-> [(IO h, h -> IO s)]

Actions that allocate a sequence of resources h and produce initial states s for processing each group.

-> (Combiners a b -> IO r)

The Combiners value should be consumed linearly.

-> IO r 

Combiners thread a state s while processing each group. Sometimes, in addition to that, we want to allocate a resource h when we start processing a group, and deallocate it after we finish processing the group or an exception is thrown. The typical example is allocating a Handle for writing the elements of the group as they arrive.

withCombiners_ Source #

Arguments

:: forall h a r. (h -> a -> IO ())

Step function that accesses the resource h.

-> (h -> IO ())

Finalizer to run after closing each group, and also in the case of an exception.

-> [IO h]

Actions that allocate a sequence of resources h.

-> (Combiners a () -> IO r)

The Combiners value should be consumed linearly.

-> IO r 

A simpler version of withCombiners that doen't thread a state; it merely allocates and deallocates the resource h.

combineIntoLists :: Combiners a [a] Source #

Puts the elements of each group into a list that is kept in memory. This breaks streaming within the group.

Useful with recast.

Re-exports

I've found that the & (reverse application) and <&> (reverse fmap) operators feel quite natural for building pipelines.

(&) :: a -> (a -> b) -> b #

(<&>) :: Functor f => f a -> (a -> b) -> f b #

The standard streams, useful with functions like sink.

stdin :: Handle #

stdout :: Handle #

stderr :: Handle #

Thrown when decoding UTF8.

data UnicodeException #

Instances

Instances details
Eq UnicodeException 
Instance details

Defined in Data.Text.Encoding.Error

Show UnicodeException 
Instance details

Defined in Data.Text.Encoding.Error

Methods

showsPrec :: Int -> UnicodeException -> ShowS

show :: UnicodeException -> String

showList :: [UnicodeException] -> ShowS

Exception UnicodeException 
Instance details

Defined in Data.Text.Encoding.Error

Methods

toException :: UnicodeException -> SomeException

fromException :: SomeException -> Maybe UnicodeException

displayException :: UnicodeException -> String

NFData UnicodeException 
Instance details

Defined in Data.Text.Encoding.Error

Methods

rnf :: UnicodeException -> ()

Functions that create process specs for use with throughProcess. For more control, import the whole of System.Process.

proc :: FilePath -> [String] -> CreateProcess #

shell :: String -> CreateProcess #