core-program-0.7.0.0: Opinionated Haskell Interoperability
Safe HaskellSafe-Inferred
LanguageHaskell2010

Core.Program.Workers

Description

Utility functions for building programs which consume work off of a queue.

Frequently you need to receive items from an external system and perform work on them. One way to structure such a program is to feed the items into a queue and then consume those items one at a time. That, of course, is slow—especially when then worker has to itself carry out computationally intensive tasks or interact itself with external systems. So we want to have multiple workers running, but only to an extent limited by the number of cores available, the number of external connections allowed, or some other constraint.

This library allows you to add items to a queue, then launch worker threads to consume those items at up to a specified maximum amount of concurrency.

Synopsis

Work Queue

newQueue :: Program τ (Queue α) Source #

Initialize a new queue.

Since: 0.6.9

writeQueue :: Queue α -> α -> Program τ () Source #

Add an item to the queue.

Since: 0.6.9

writeQueue' :: Foldable ω => Queue α -> ω α -> Program τ () Source #

Add a list of items to the queue.

Since: 0.6.9

finishQueue :: Queue α -> Program τ () Source #

Indicate that you are finished adding queue, thereby allowing the worker threads consuming from the queue to complete and return.

Remember that you can call at any time, even before you have launched the worker threads with runWorkers_.

Since: 0.6.9

Worker Threads

runWorkers_ :: Int -> (α -> Program τ ()) -> Queue α -> Program τ () Source #

Run a pool of worker threads which consume items off the work queue.

Once you have an action that enqueues items with writeQueue you can then launch the worker threads:

    runWorkers_ 16 worker queue

consuming 16 items at a time concurrently in this example.

It is assumed that the workers have a way of communicating their results onwards, either because they are side-effecting in the real world themselves, or because you have passed in some MVar or TQueue to collect the results.

Since: 0.6.9

mapWorkers :: Int -> (α -> Program τ β) -> [α] -> Program τ [β] Source #

Map a pool of workers over a list concurrently.

Simply forking one Haskell thread for every item in a list is a suprisingly reasonable choice in many circumstances given how good Haskell's concurrency machinery is, and in this library can be achieved by forMing forkThread over a list of items. But if you need tighter control over the amount of concurrency—as is often the case when doing something computationally heavy or making requests of an external service with known limitations—then you are better off using this convenience function.

(this was originally modelled on async's mapConcurrently. That implementation has the drawback that the number of threads created is set by the size of the structure being traversed. Here we set the amount of concurrency explicitly.)

Be aware that the order of items in the output list is non-deterministic and will depend on the order that the action function completes, not the order of items in the input.

Since: 0.6.9

Internals

data Queue α Source #

A queue which has an end, someday.

(this is a thin wrapper over the stm TQueue type)

Since: 0.6.9

unQueue :: Queue α -> TQueue (Maybe α) Source #

Access the underlying queue. We make use of the STM TQueue type, so you'll want the following imports:

import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TQueue (TQueue, writeTQueue)

Having accessed the underlying queue you can write items, wrapped in Just, to it directly:

    liftIO $ do
        atomically $ do
            writeTQueue queue (Just item)

A Nothing written to the underlying queue will signal the worker threads that the end of input has been reached and they can safely return.

Since: 0.6.9

getMachineSize :: Program τ Int Source #

Report back the number of processor cores that are available as Haskell "capabilities" (this was set when you launched the program with execute). This can best be used to set the number of concurrent worker threads when running runWorkers_ or mapWorkers.

Since: 0.6.9