broadcast-chan-0.2.1.2: Closable, fair, single-wakeup channel type that avoids 0 reader space leaks.
Copyright(C) 2014-2021 Merijn Verstraaten
LicenseBSD-style (see the file LICENSE)
MaintainerMerijn Verstraaten <merijn@inconsistent.nl>
Stabilityexperimental
Portabilityhaha
Safe HaskellSafe
LanguageHaskell2010

BroadcastChan.Extra

Description

Functions in this module are *NOT* intended to be used by regular users of the library. Rather, they are intended for implementing parallel processing libraries on top of broadcast-chan, such as broadcast-chan-conduit.

This module, while not for end users, is considered part of the public API, so users can rely on PVP bounds to avoid breakage due to changes to this module.

Synopsis

Documentation

data Action Source #

Action to take when an exception occurs while processing an element.

Constructors

Drop

Drop the current element and continue processing.

Retry

Retry by appending the current element to the queue of remaining elements.

Terminate

Stop all processing and reraise the exception.

Instances

Instances details
Eq Action Source # 
Instance details

Defined in BroadcastChan.Extra

Methods

(==) :: Action -> Action -> Bool #

(/=) :: Action -> Action -> Bool #

Show Action Source # 
Instance details

Defined in BroadcastChan.Extra

data BracketOnError m r Source #

Allocation, cleanup, and work actions for parallel processing. These should be passed to an appropriate bracketOnError function.

Constructors

Bracket 

Fields

  • allocate :: IO [Weak ThreadId]

    Allocation action that spawn threads and sets up handlers.

  • cleanup :: [Weak ThreadId] -> IO ()

    Cleanup action that handles exceptional termination

  • action :: m r

    Action that performs actual processing and waits for processing to finish and threads to terminate.

data Handler m a Source #

Exception handler for parallel processing.

Constructors

Simple Action

Always take the specified Action.

Handle (a -> SomeException -> m Action)

Allow inspection of the element, exception, and execution of monadic actions before deciding the Action to take.

data ThreadBracket Source #

Datatype for specifying additional setup/cleanup around forking threads. Used by runParallelWith and runParallelWith_ to fix resource management in broadcast-chan-conduit.

If the allocation action can fail/abort with an exception it MUST take care not to leak resources in these cases. In other words, IFF setupFork succeeds then this library will ensure the corresponding cleanup runs.

Since: 0.2.1

Constructors

ThreadBracket 

Fields

  • setupFork :: IO ()

    Setup action to run before spawning a new thread.

  • cleanupFork :: IO ()

    Normal cleanup action upon thread termination.

  • cleanupForkError :: IO ()

    Exceptional cleanup action in case thread terminates due to an exception.

mapHandler :: (m Action -> n Action) -> Handler m a -> Handler n a Source #

Convenience function for changing the monad the exception handler runs in.

runParallel Source #

Arguments

:: forall a b m n r. (MonadIO m, MonadIO n) 
=> Either (b -> n r) (r -> b -> n r)

Output yielder

-> Handler IO a

Parallel processing exception handler

-> Int

Number of threads to use

-> (a -> IO b)

Function to run in parallel

-> ((a -> m ()) -> (a -> m (Maybe b)) -> n r)

"Stream" processing function

-> n (BracketOnError n r) 

Sets up parallel processing.

The workhorses of this function are the output yielder and "stream" processing functions.

The output yielder is responsible for handling the produced b values, which if can either yield downstream (Left) when used with something like conduit or pipes, or fold into a single results (Right) when used to run IO in parallel.

The stream processing function gets two arguments:

a -> m ()
Should be used to buffer a number of elements equal to the number of threads.
a -> m b
Which should be used to process the remainder of the element stream via, for example, mapM.

See BroadcastChan or broadcast-chan-conduit for examples.

The returned BracketOnError has a allocate action that takes care of setting up forkIO threads and exception handlers. The cleanup action ensures all threads are terminate in case of an exception. Finally, action performs the actual parallel processing of elements.

runParallelWith Source #

Arguments

:: forall a b m n r. (MonadIO m, MonadIO n) 
=> ThreadBracket

Bracketing action used to manage resources across thread spawns

-> Either (b -> n r) (r -> b -> n r)

Output yielder

-> Handler IO a

Parallel processing exception handler

-> Int

Number of threads to use

-> (a -> IO b)

Function to run in parallel

-> ((a -> m ()) -> (a -> m (Maybe b)) -> n r)

"Stream" processing function

-> n (BracketOnError n r) 

Like runParallel, but accepts a setup and cleanup action that will be run before spawning a new thread and upon thread exit respectively.

The main use case is to properly manage the resource reference counts of ResourceT.

If the setup throws an IO exception or otherwise aborts, it MUST ensure any allocated resource are freed. If it completes without an exception, the cleanup is guaranteed to run (assuming proper use of bracketing with the returned BracketOnError).

Since: 0.2.1

runParallel_ Source #

Arguments

:: (MonadIO m, MonadIO n) 
=> Handler IO a

Parallel processing exception handler

-> Int

Number of threads to use

-> (a -> IO ())

Function to run in parallel

-> ((a -> m ()) -> n r)

"Stream" processing function

-> n (BracketOnError n r) 

Sets up parallel processing for functions where we ignore the result.

The stream processing argument is the workhorse of this function. It gets a (rate-limited) function a -> m () that queues a values for processing. This function should be applied to all a elements that should be processed. This would be either a partially applied forM_ for parallel processing, or something like conduit's mapM_ to construct a "sink" for a values. See BroadcastChan or broadcast-chan-conduit for examples.

The returned BracketOnError has a allocate action that takes care of setting up forkIO threads and exception handlers. Th cleanup action ensures all threads are terminate in case of an exception. Finally, action performs the actual parallel processing of elements.

runParallelWith_ Source #

Arguments

:: (MonadIO m, MonadIO n) 
=> ThreadBracket

Bracketing action used to manage resources across thread spawns

-> Handler IO a

Parallel processing exception handler

-> Int

Number of threads to use

-> (a -> IO ())

Function to run in parallel

-> ((a -> m ()) -> n r)

"Stream" processing function

-> n (BracketOnError n r) 

Like runParallel_, but accepts a setup and cleanup action that will be run before spawning a new thread and upon thread exit respectively.

The main use case is to properly manage the resource reference counts of ResourceT.

If the setup throws an IO exception or otherwise aborts, it MUST ensure any allocated resource are freed. If it completes without an exception, the cleanup is guaranteed to run (assuming proper use of bracketing with the returned BracketOnError).

Since: 0.2.1