om-fork: Concurrency utilities.

[ library, mit, unclassified ] [ Propose Tags ] [ Report a vulnerability ]

Actor pattern and some limited structured concurrency tools


[Skip to Readme]

Modules

[Index] [Quick Jump]

Downloads

Maintainer's Corner

Package maintainers

For package maintainers and hackage trustees

Candidates

  • No Candidates
Versions [RSS] 0.7.1.6, 0.7.1.7, 0.7.1.8, 0.7.1.9, 0.7.1.10, 0.7.1.11
Dependencies aeson (>=2.0.3.0 && <2.3), base (>=4.15.1.0 && <4.20), exceptions (>=0.10.4 && <0.11), ki-unlifted (>=1.0.0.2 && <1.1), monad-logger (>=0.3.36 && <0.4), om-show (>=0.1.2.6 && <0.2), text (>=1.2.5.0 && <2.2), unliftio (>=0.2.22.0 && <0.3) [details]
License MIT
Copyright 2023 Owens Murray, LLC.
Author Rick Owens
Maintainer rick@owensmurray.com
Home page https://github.com/owensmurray/om-fork
Uploaded by rickowens at 2024-02-05T00:31:54Z
Distributions NixOS:0.7.1.10
Reverse Dependencies 2 direct, 0 indirect [details]
Downloads 292 total (16 in the last 30 days)
Rating (no votes yet) [estimated by Bayesian average]
Your Rating
  • λ
  • λ
  • λ
Status Docs uploaded by user
Build status unknown [no reports yet]

Readme for om-fork-0.7.1.10

[back to package description]

om-fork

This package provides some concurrency abstractions.

Structured concurrency

This package provides some very limited tools for structured concurrency. These tools are "limited" in the sense that they target a very specific use case: making sure that if any one of a group of cooperating threads ends for any reason, then they all end. For a more complete treatment of structured concurrency, see the ki package.

Motivation

If you are using the actor model (see below) and one of your actors dies for whatever reason, you probably want to crash completely instead of ending up in some kind of "half-crashed" zombie state.

Example

{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeApplications #-}

module Main (main) where

import Control.Concurrent (threadDelay)
import Control.Exception (SomeException, try)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Control.Monad.Logger (runStdoutLoggingT)
import OM.Fork (race, runRace, wait)

main :: IO ()
main =
  void . try @SomeException $
    runStdoutLoggingT $ do
      runRace $ do
        race "loop1" $ loop1
        race "loop2" $ loop2
        race "loopThatCrashes" $ loopThatCrashes
        wait

loop1 :: (MonadIO m) => m void
loop1 = do
  liftIO $ do
    threadDelay 1_000_000
    putStrLn "loop1 cycle"
  loop1
  
loop2 :: (MonadIO m) => m void
loop2 = do
  liftIO $ do
    threadDelay 200_000
    putStrLn "loop2 cycle"
  loop2

loopThatCrashes :: (MonadIO m) => m void
loopThatCrashes = do
  liftIO $ do
    threadDelay 5_000_000
    putStrLn "loopThatCrashes about to crash"
  error "crash"

Actor model

This package provides an abstraction over the "actor model". In particular, the main thing of value is a way get heterogeneously typed blocking responses from the actor, using call.

Motivation

The actor model isn't suitable for every concurrency problem, but maybe you've been programming in Erlang and you have a problem which sits easily in your mind as an Actor Model problem. A full motivation of the actor model is beyond the scope of this package.

Anyway, you probably want a way to interact with your "actor" in a type safe way. This package provides convenient tools and patterns to do so.

Example

{-# LANGUAGE LambdaCase #-}

module Main (main) where

import Control.Concurrent (Chan, forkIO, newChan, readChan)
import Control.Monad (void)
import OM.Fork (Responder, call, cast, respond)


{- | Messages that can be sent to the actor. -}
data MyMsg
  = ReverseEcho String (Responder String)
    {- ^ A blocking message. Responds with a String -}
  | Print String
    {- ^ A non-blocking message. -}
  | GetState (Responder Int)
    {- ^ Another blocking message. Responds with an Int -}


{- | The "state" is just a count of how many messages we've seen so far. -}
actorLoop :: Int -> Chan MyMsg -> IO void
actorLoop state chan = do
  readChan chan >>= \case
      ReverseEcho str responder ->
        void $ respond responder (reverse str)
      Print str ->
        putStrLn ("Printeded in background by actor thread: " <> str)
      GetState responder ->
        void $ respond responder state
  actorLoop (succ state) chan


main :: IO ()
main = do
  {- `Chan a` is an instance of `Actor` -}
  chan <- newChan
  void . forkIO $ actorLoop 0 chan

  {- | Notice that the result of this `call` is a String. -}
  putStrLn =<< call chan (ReverseEcho "hello world")
  cast chan (Print "foo")

  {- | Notice that the result of this `call` is an Int. -}
  actorState <- call chan GetState
  print (actorState * 2)