Safe Haskell | None |
---|---|
Language | Haskell2010 |
Transient provides high level concurrency allowing you to do concurrent processing without requiring any knowledge of threads or synchronization. From the programmer's perspective, the programming model is single threaded. Concurrent tasks are created and composed seamlessly resulting in highly modular and composable concurrent programs. Transient has diverse applications from simple concurrent applications to massively parallel and distributed map-reduce problems. If you are considering Apache Spark or Cloud Haskell then transient might be a simpler yet better solution for you (see transient-universe). Transient makes it easy to write composable event driven reactive UI applications. For example, Axiom is a transient based unified client and server side framework that provides a better programming model and composability compared to frameworks like ReactJS.
Overview
The TransientIO
monad allows you to:
- Split a problem into concurrent task sets
- Compose concurrent task sets using non-determinism
- Collect and combine results of concurrent tasks
You can think of TransientIO
as a concurrent list transformer monad with
many other features added on top e.g. backtracking, logging and recovery to
move computations across machines for distributed processing.
Non-determinism
In its non-concurrent form, the TransientIO
monad behaves exactly like a
list transformer monad.
It is like a list whose elements are generated using IO effects. It composes
in the same way as a list monad. Let's see an example:
import Control.Concurrent (threadDelay) import Control.Monad.IO.Class (liftIO) import System.Random (randomIO) import Transient.Base (keep, threads, waitEvents) main = keep $ threads 0 $ do x <- waitEvents (randomIO :: IO Int) liftIO $ threadDelay 1000000 liftIO $ putStrLn $ show x
keep
runs the TransientIO
monad. The threads
primitive limits the
number of threads to force non-concurrent operation. The waitEvents
primitive generates values (list elements) in a loop using the randomIO
IO
action. The above code behaves like a list monad as if we are drawing
elements from a list generated by waitEvents
. The sequence of actions
following waitEvents
is executed for each element of the list. We see a
random value printed on the screen every second. As you can see this
behavior is identical to a list transformer monad.
Concurrency
TransientIO
monad is a concurrent list transformer i.e. each element of
the generated list can be processed concurrently. In the previous example
if we change the number of threads to 10 we can see concurrency in action:
... main = keep $ threads 10 $ do ...
Now each element of the list is processed concurrently in a separate thread, up to 10 threads are used. Therefore we see 10 results printed every second instead of 1 in the previous version.
In the above examples the list elements are generated using a synchronous IO action. These elements can also be asynchronous events, for example an interactive user input. In transient, the elements of the list are known as tasks. The tasks terminology is general and intuitive in the context of transient as tasks can be triggered by asynchronous events and multiple of them can run simultaneously in an unordered fashion.
Composing Tasks
The type TransientIO a
represents a task set with each task in
the set returning a value of type a
. A task set could be finite or
infinite; multiple tasks could run simultaneously. The absence of a task,
a void task set or failure is denoted by a special value empty
in an
Alternative
composition, or the stop
primitive in a monadic composition.
In the transient programming model the programmer thinks in terms of tasks
and composes tasks. Whether the tasks run synchronously or concurrently does
not matter; concurrency is hidden from the programmer for the most part. In
the previous example the code written for a single threaded list transformer
works concurrently as well.
We have already seen that the Monad
instance provides a way to compose the
tasks in a sequential, non-deterministic and concurrent manner. When a void
task set is encountered, the monad stops processing any further computations
as we have nothing to do. The following example does not generate any
output after "stop here":
main = keep $ threads 0 $ do x <- waitEvents (randomIO :: IO Int) liftIO $ threadDelay 1000000 liftIO $ putStrLn $ "stop here" stop liftIO $ putStrLn $ show x
When a task creation primitive creates a task concurrently in a new thread
(e.g. waitEvents
), it returns a void task set in the current thread
making it stop further processing. However, processing resumes from the same
point onwards with the same state in the new task threads as and when they
are created; as if the current thread along with its state has branched into
multiple threads, one for each new task. In the following example you can
see that the thread id changes after the waitEvents
call:
main = keep $ threads 1 $ do mainThread <- liftIO myThreadId liftIO $ putStrLn $ "Main thread: " ++ show mainThread x <- waitEvents (randomIO :: IO Int) liftIO $ threadDelay 1000000 evThread <- liftIO myThreadId liftIO $ putStrLn $ "Event thread: " ++ show evThread
Note that if we use threads 0
then the new task thread is the same as the
main thread because waitEvents
falls back to synchronous non-concurrent
mode, and therefore returns a non void task set.
In an Alternative
composition, when a computation results in empty
the next alternative is tried. When a task creation primitive creates a
concurrent task, it returns empty
allowing tasks to run concurrently when
composed with the <|>
combinator. The following example combines two
single concurrent tasks generated by async
:
main = keep $ do x <- event 1 <|> event 2 liftIO $ putStrLn $ show x where event n = async (return n :: IO Int)
Note that availability of threads can impact the behavior of an application.
An infinite task set generator (e.g. waitEvents
or sample
) running
synchronously (due to lack of threads) can block all other computations in
an Alternative
composition. The following example does not trigger the
async
task unless we increase the number of threads to make waitEvents
asynchronous:
main = keep $ threads 0 $ do x <- waitEvents (randomIO :: IO Int) <|> async (return 0 :: IO Int) liftIO $ threadDelay 1000000 liftIO $ putStrLn $ show x
Parallel Map Reduce
The following example uses choose
to send the items in a list to parallel
tasks for squaring and then folds the results of those tasks using collect
.
import Control.Monad.IO.Class (liftIO) import Data.List (sum) import Transient.Base (keep) import Transient.Indeterminism (choose, collect) main = keep $ do collect 100 squares >>= liftIO . putStrLn . show . sum where squares = do x <- choose [1..100] return (x * x)
State Isolation
State is inherited but never shared. A transient application is written as a composition of task sets. New concurrent tasks can be triggered from inside a task. A new task inherits the state of the monad at the point where it got started. However, the state of a task is always completely isolated from other tasks irrespective of whether it is started in a new thread or not. The state is referentially transparent i.e. any changes to the state creates a new copy of the state. Therefore a programmer does not have to worry about synchronization or unintended side effects.
The monad starts with an empty state. At any point you can add (setData
),
retrieve (getSData
) or delete (delData
) a data item to or from the
current state. Creation of a task branches the computation, inheriting
the previous state, and collapsing (e.g. collect
) discards the state of
the tasks being collapsed. If you want to use the state in the results you
will have to pass it as part of the results of the tasks.
Reactive Applications
A popular model to handle asynchronous events in imperative languages is the callback model. The control flow of the program is driven by events and callbacks; callbacks are event handlers that are hooked into the event generation code and are invoked every time an event happens. This model makes the overall control flow hard to understand resulting into a "callback hell" because the logic is distributed across various isolated callback handlers, and many different event threads work on the same global state.
Transient provides a better programming model for reactive applications. In contrast to the callback model, transient transparently moves the relevant state to the respective event threads and composes the results to arrive at the new state. The programmer is not aware of the threads, there is no shared state to worry about, and a seamless sequential flow enabling easy reasoning and composable application components. Axiom is a client and server side web UI and reactive application framework built using the transient programming model.
Further Reading
- data TransIO a
- type TransientIO = TransIO
- (**>) :: AdditionalOperators m => m a -> m b -> m b
- (<**) :: AdditionalOperators m => m a -> m b -> m a
- (<***) :: AdditionalOperators m => m a -> m b -> m a
- keep :: Typeable a => TransIO a -> IO (Maybe a)
- keep' :: Typeable a => TransIO a -> IO (Maybe a)
- stop :: Alternative m => m stopped
- exit :: Typeable a => a -> TransIO a
- option :: (Typeable b, Show b, Read b, Eq b) => b -> String -> TransIO b
- input :: (Typeable a, Read a, Show a) => (a -> Bool) -> String -> TransIO a
- input' :: (Typeable a, Read a, Show a) => Maybe a -> (a -> Bool) -> String -> TransIO a
- data StreamData a
- = SMore a
- | SLast a
- | SDone
- | SError SomeException
- parallel :: IO (StreamData b) -> TransIO (StreamData b)
- async :: IO a -> TransIO a
- waitEvents :: IO a -> TransIO a
- sample :: Eq a => IO a -> Int -> TransIO a
- spawn :: IO a -> TransIO a
- react :: Typeable eventdata => ((eventdata -> IO response) -> IO ()) -> IO response -> TransIO eventdata
- abduce :: TransIO ()
- setData :: (MonadState EventF m, Typeable a) => a -> m ()
- getSData :: Typeable a => TransIO a
- getData :: (MonadState EventF m, Typeable a) => m (Maybe a)
- delData :: (MonadState EventF m, Typeable a) => a -> m ()
- modifyData :: (MonadState EventF m, Typeable a) => (Maybe a -> Maybe a) -> m ()
- modifyData' :: (MonadState EventF m, Typeable a) => (a -> a) -> a -> m a
- try :: TransIO a -> TransIO a
- setState :: (MonadState EventF m, Typeable a) => a -> m ()
- getState :: Typeable a => TransIO a
- delState :: (MonadState EventF m, Typeable a) => a -> m ()
- getRState :: Typeable a => TransIO a
- setRState :: Typeable a => a -> TransIO ()
- modifyState :: (MonadState EventF m, Typeable a) => (Maybe a -> Maybe a) -> m ()
- threads :: Int -> TransIO a -> TransIO a
- addThreads :: Int -> TransIO ()
- freeThreads :: TransIO a -> TransIO a
- hookedThreads :: TransIO a -> TransIO a
- oneThread :: TransIO a -> TransIO a
- killChilds :: TransIO ()
- onException :: Exception e => (e -> TransIO ()) -> TransIO ()
- onException' :: Exception e => TransIO a -> (e -> TransIO a) -> TransIO a
- cutExceptions :: TransIO ()
- continue :: TransIO ()
- catcht :: Exception e => TransIO b -> (e -> TransIO b) -> TransIO b
- throwt :: Exception e => e -> TransIO a
- genId :: MonadState EventF m => m Int
The Monad
Monad TransIO Source # | |
Functor TransIO Source # | |
Applicative TransIO Source # | |
MonadIO TransIO Source # | |
Alternative TransIO Source # | |
MonadPlus TransIO Source # | |
AdditionalOperators TransIO Source # | |
MonadState EventF TransIO Source # | |
(Num a, Eq a, Fractional a) => Fractional (TransIO a) Source # | |
(Num a, Eq a) => Num (TransIO a) Source # | |
Monoid a => Monoid (TransIO a) Source # | |
type TransientIO = TransIO Source #
Task Composition Operators
(**>) :: AdditionalOperators m => m a -> m b -> m b infixr 1 Source #
Run m a
discarding its result before running m b
.
(<**) :: AdditionalOperators m => m a -> m b -> m a infixr 1 Source #
Run m b
discarding its result, after the whole task set m a
is
done.
(<***) :: AdditionalOperators m => m a -> m b -> m a infixr 1 Source #
Run m b
discarding its result, once after each task in m a
, and
once again after the whole task set is done.
Running the monad
keep :: Typeable a => TransIO a -> IO (Maybe a) Source #
Runs the transient computation in a child thread and keeps the main thread
running until all the user threads exit or some thread invokes exit
.
The main thread provides facilities for accepting keyboard input in a
non-blocking but line-oriented manner. The program reads the standard input
and feeds it to all the async input consumers (e.g. option
and input
).
All async input consumers contend for each line entered on the standard
input and try to read it atomically. When a consumer consumes the input
others do not get to see it, otherwise it is left in the buffer for others
to consume. If nobody consumes the input, it is discarded.
A /
in the input line is treated as a newline.
When using asynchronous input, regular synchronous IO APIs like getLine cannot be used as they will contend for the standard input along with the asynchronous input thread. Instead you can use the asynchronous input APIs provided by transient.
A built-in interactive command handler also reads the stdin asynchronously. All available options waiting for input are displayed when the program is run. The following commands are available:
ps
: show threadslog
: inspect the log of a threadend
,exit
: terminate the program
An input not handled by the command handler can be handled by the program.
The program's command line is scanned for -p
or --path
command line
options. The arguments to these options are injected into the async input
channel as keyboard input to the program. Each line of input is separated by
a /
. For example:
foo -p ps/end
keep' :: Typeable a => TransIO a -> IO (Maybe a) Source #
Same as keep
but does not read from the standard input, and therefore
the async input APIs (option
and input
) cannot be used in the monad.
However, keyboard input can still be passed via command line arguments as
described in keep
. Useful for debugging or for creating background tasks,
as well as to embed the Transient monad inside another computation. It
returns either the value returned by exit
. or Nothing, when there are no
more threads running
stop :: Alternative m => m stopped Source #
A synonym of empty
that can be used in a monadic expression. It stops
the computation, which allows the next computation in an Alternative
(<|>
) composition to run.
exit :: Typeable a => a -> TransIO a Source #
Exit the main thread, and thus all the Transient threads (and the application if there is no more code)
Asynchronous console IO
option :: (Typeable b, Show b, Read b, Eq b) => b -> String -> TransIO b Source #
listen stdin and triggers a new task every time the input data matches the first parameter. The value contained by the task is the matched value i.e. the first argument itself. The second parameter is a message to the user for the user. The label is displayed in the console when the option match.
input :: (Typeable a, Read a, Show a) => (a -> Bool) -> String -> TransIO a Source #
Waits on stdin and return a value when a console input matches the predicate specified in the first argument. The second parameter is a string to be displayed on the console before waiting.
Task Creation
These primitives are used to create asynchronous and concurrent tasks from an IO action.
data StreamData a Source #
StreamData
represents a task in a task stream being generated.
SMore a | More tasks to come |
SLast a | This is the last task |
SDone | No more tasks, we are done |
SError SomeException | An error occurred |
Read a => Read (StreamData a) Source # | |
Show a => Show (StreamData a) Source # | |
parallel :: IO (StreamData b) -> TransIO (StreamData b) Source #
Run an IO action one or more times to generate a stream of tasks. The IO
action returns a StreamData
. When it returns an SMore
or SLast
a new
task is triggered with the result value. If the return value is SMore
, the
action is run again to generate the next task, otherwise task creation
stops.
Unless the maximum number of threads (set with threads
) has been reached,
the task is generated in a new thread and the current thread returns a void
task.
async :: IO a -> TransIO a Source #
Run an IO computation asynchronously and generate a single task carrying
the result of the computation when it completes. See parallel
for notes on
the return value.
waitEvents :: IO a -> TransIO a Source #
A task stream generator that produces an infinite stream of tasks by
running an IO computation in a loop. A task is triggered carrying the output
of the computation. See parallel
for notes on the return value.
sample :: Eq a => IO a -> Int -> TransIO a Source #
A task stream generator that produces an infinite stream of tasks by
running an IO computation periodically at the specified time interval. The
task carries the result of the computation. A new task is generated only if
the output of the computation is different from the previous one. See
parallel
for notes on the return value.
react :: Typeable eventdata => ((eventdata -> IO response) -> IO ()) -> IO response -> TransIO eventdata Source #
Make a transient task generator from an asynchronous callback handler.
The first parameter is a callback. The second parameter is a value to be
returned to the callback; if the callback expects no return value it
can just be a return ()
. The callback expects a setter function taking the
eventdata
as an argument and returning a value to the callback; this
function is supplied by react
.
Callbacks from foreign code can be wrapped into such a handler and hooked
into the transient monad using react
. Every time the callback is called it
generates a new task for the transient monad.
Runs the rest of the computation in a new thread. Returns empty
to the current thread
State management
setData :: (MonadState EventF m, Typeable a) => a -> m () Source #
setData
stores a data item in the monad state which can be retrieved
later using getData
or getSData
. Stored data items are keyed by their
data type, and therefore only one item of a given type can be stored. A
newtype wrapper can be used to distinguish two data items of the same type.
import Control.Monad.IO.Class (liftIO) import Transient.Base import Data.Typeable data Person = Person { name :: String , age :: Int } deriving Typeable main = keep $ do setData $ Person Alberto 55 Person name age <- getSData liftIO $ print (name, age)
getSData :: Typeable a => TransIO a Source #
Retrieve a previously stored data item of the given data type from the
monad state. The data type to retrieve is implicitly determined by the data type.
If the data item is not found, empty is executed, so the alternative computation will be executed, if any, or
Otherwise, the computation will stop..
If you want to print an error message or a default value, you can use an Alternative
composition. For example:
getSData <|> error "no data of the type desired" getInt = getSData <|> return (0 :: Int)
delData :: (MonadState EventF m, Typeable a) => a -> m () Source #
Delete the data item of the given type from the monad state.
modifyData :: (MonadState EventF m, Typeable a) => (Maybe a -> Maybe a) -> m () Source #
Accepts a function that takes the current value of the stored data type
and returns the modified value. If the function returns Nothing
the value
is deleted otherwise updated.
modifyData' :: (MonadState EventF m, Typeable a) => (a -> a) -> a -> m a Source #
Either modify according with the first parameter or insert according with the second, depending on if the data exist or not.
runTransient $ do modifyData1 (\h -> h ++ " world") "hello new" ; r <- getSData ; liftIO $ putStrLn r -- > "hello new" runTransient $ do setData "hello" ; modifyData1 (\h -> h ++ " world") "hello new" ; r <- getSData ; liftIO $ putStrLn r -- > "hello world"
try :: TransIO a -> TransIO a Source #
Run an action, if it does not succeed, undo any state changes that it might have caused and allow aternative actions to run with the original state
setRState :: Typeable a => a -> TransIO () Source #
mutable state reference that can be updated (similar to STRef in the state monad)
Initialized the first time it is set.
modifyState :: (MonadState EventF m, Typeable a) => (Maybe a -> Maybe a) -> m () Source #
Same as modifyData
Thread management
threads :: Int -> TransIO a -> TransIO a Source #
Sets the maximum number of threads that can be created for the given task
set. When set to 0, new tasks start synchronously in the current thread.
New threads are created by parallel
, and APIs that use parallel.
addThreads :: Int -> TransIO () Source #
Ensure that at least n threads are available for the current task set.
freeThreads :: TransIO a -> TransIO a Source #
Disable tracking and therefore the ability to terminate the child threads. By default, child threads are terminated automatically when the parent thread dies, or they can be terminated using the kill primitives. Disabling it may improve performance a bit, however, all threads must be well-behaved to exit on their own to avoid a leak.
hookedThreads :: TransIO a -> TransIO a Source #
Enable tracking and therefore the ability to terminate the child threads.
This is the default but can be used to re-enable tracking if it was
previously disabled with freeThreads
.
oneThread :: TransIO a -> TransIO a Source #
Terminate all the child threads in the given task set and continue execution in the current thread. Useful to reap the children when a task is done.
killChilds :: TransIO () Source #
Kill all the child threads of the current thread.
Exceptions
Exception handlers are implemented using the backtracking mechanism.
(see back
). Several exception handlers can be
installed using onException
; handlers are run in reverse order when an
exception is raised. The following example prints "3" and then "2".
{-# LANGUAGE ScopedTypeVariables #-} import Transient.Base (keep, onException, cutExceptions) import Control.Monad.IO.Class (liftIO) import Control.Exception (ErrorCall) main = keep $ do onException $ \(e:: ErrorCall) -> liftIO $ putStrLn "1" cutExceptions onException $ \(e:: ErrorCall) -> liftIO $ putStrLn "2" onException $ \(e:: ErrorCall) -> liftIO $ putStrLn "3" liftIO $ error "Raised ErrorCall exception" >> return ()
onException :: Exception e => (e -> TransIO ()) -> TransIO () Source #
Install an exception handler. Handlers are executed in reverse (i.e. last in, first out) order when such exception happens in the continuation. Note that multiple handlers can be installed for the same exception type.
The semantic is, thus, very different than the one of onException
cutExceptions :: TransIO () Source #
Delete all the exception handlers registered till now.
continue :: TransIO () Source #
Use it inside an exception handler. it stop executing any further exception handlers and resume normal execution from this point on.
catcht :: Exception e => TransIO b -> (e -> TransIO b) -> TransIO b Source #
catch an exception in a Transient block
The semantic is the same than catch
but the computation and the exception handler can be multirhreaded