thread-supervisor: A simplified implementation of Erlang/OTP like supervisor over thread

[ concurrency, library, mit ] [ Propose Tags ] [ Report a vulnerability ]

Downloads

Maintainer's Corner

Package maintainers

For package maintainers and hackage trustees

Candidates

  • No Candidates
Versions [RSS] 0.1.0.0, 0.1.0.1, 0.2.0.0
Change log ChangeLog.md
Dependencies base (>=4.7 && <5), clock, containers, data-default, unliftio [details]
License MIT
Copyright 2018-2020 Naoto Shimazaki
Author Naoto Shimazaki
Maintainer Naoto.Shimazaki@gmail.com
Category Concurrency
Home page https://github.com/nshimaza/thread-supervisor#readme
Bug tracker https://github.com/nshimaza/thread-supervisor/issues
Source repo head: git clone https://github.com/nshimaza/thread-supervisor
Uploaded by nshimaza at 2020-08-08T03:05:40Z
Distributions LTSHaskell:0.2.0.0, NixOS:0.2.0.0, Stackage:0.2.0.0
Downloads 1081 total (12 in the last 30 days)
Rating (no votes yet) [estimated by Bayesian average]
Your Rating
  • λ
  • λ
  • λ
Status Docs available [build log]
Last success reported on 2020-08-08 [all 1 reports]

Readme for thread-supervisor-0.2.0.0

[back to package description]

thread-supervisor

License: MIT Build Status Hackage Stackage Nightly Stackage LTS

A simplified implementation of Erlang/OTP like supervisor over thread.

Overview

This package provides Erlang/OTP like thread supervision. It provides automatic restart, escalation of intense crash, guaranteed cleanup of child threads on supervisor termination.

Motivation

Unlike Unix process, plain Haskell thread, created by forkIO, has no parent-child relation each other in its lifecycle management. This means termination of parent thread doesn't result its children also terminated. This is good design as a low level API because it gives user greatest flexibility. However, it also means managing entire lifecycle of thread is totally a responsibility of user.

Here one thing you need to be aware. Garbage collection doesn't work on living thread. When you lost reference to an object, garbage collector frees up the object for you. However, even though you lost the thread ID of your child thread, Haskell runtime doesn't consider the thread is orphaned. The child thread continues running.

This is prone to create thread leakage. You can accidentally lose thread ID of child thread by crash of parent thread. Now you no longer have way to kill orphaned child thread. This is thread leakage.

The low level forkIO API requires you keep track and manage entire thread lifecycle including accidental case like the above. Hand crafting it might be painful.

This package is intended to provide better wrapper API over plain forkIO. Not just providing parent-child thread lifecycle management, this package provides Erlang/OTP like API so that user can leverage well proven practices from Erlang/OTP.

If you need to keep your child running after parent terminated, this package is not for you.

Why not withAsync?

In short, withAsync addresses different problem than this package.

  • withAsync: Accessing multiple REST server concurrently then gather all responses with guarantee of cancellation of all the request on termination of calling thread.
  • thread-supervisor: Implementing server where unknown number of independent, concurrent, and indeterministic lifecycle requests will arrive.

A typical use case for this package is TCP server style use case. In such use case, you have to create unpredictable number of threads in order to serve to clients and those threads finish in random timings.

The withAsync coming with async package solves different problem than this package. It is good for taking actions asynchronously but eventually you need their return values. Or, even you aren't care of return values, you only need to take several finite number of actions concurrently.

Below is explanation why withAsync is not good for managing large number of threads.

withAsync is essentially a sugar over bracket pattern like this.

withAsync action inner = bracket (async action) uninterruptibleCancel inner

It guarantees execution of uninterruptibleCancel to the action on asynchronous exception occurrence at parent thread where withAsync itself is living. However it also guarantees the uninterruptibleCancel is executed on normal exit from inner too. Thus, the action can only live within the lifecycle of the withAsync call. If you want to keep your action alive, you have to keep inner continue running until your action finishes.

So, what if you kick async action go and make recursive call form inner back to your loop? It is a bad idea. Because withAsync is a bracket, recursive call from inner makes non-tail-recurse call. It consumes stack every time you make recurring.

In other words, the difference between withAsync and thread-supervisor is strategy of installing / un-installing cleanup handler. withAsync installs cleanup handler on stack so it uninstalls handler based on its lexical scope. thread-supervisor installs cleanup handler surrounding user supplied action so it uninstalls handlers at actual dynamic thread termination.

Quick Start

High level steps to use

  1. Create a MonitoredAction from your IO action
  2. Create a ChildSpec from the MonitoredAction
  3. Let a supervisor run the ChildSpec in a supervised thread

Detail will be different whether you create static child thread or dynamic child thread.

Create a static child

Static child is thread automatically spawned when supervisor starts. Following procedure makes your IO action a static child.

  1. Create a MonitoredAction from your IO action
  2. Create a ChildSpec from the MonitoredAction
  3. Give the ChildSpec to newSupervisor
  4. Run generated supervisor

Static children are automatically forked when supervisor started or one-for-all supervisor performed restarting action. When IO action inside of static child terminated, regardless normal completion or exception, supervisor checks if restart operation needed based on combination of restart type of terminated child and reason of termination. If supervisor decides restart is needed, it performs restarting operation based on its restart strategy, which can be one-for-one or one-for-all.

A supervisor can have any number of static children. Static children must be given when supervisor is created by newSupervisor.

Static child example

Following code creates a supervisor actor with two static children and run it in new thread.

runYourSupervisorWithStaticChildren = do
    Actor svQ svAction <- newActor . newSupervisor $ OneForAll def
        [ newChildSpec Permanent yourIOAction1
        , newChildSpec Permanent yourIOAction2
        ]
    async svAction

The idiom newActor . newSupervisor returns Actor svQ svAction where svQ is write-end of message queue for the supervisor actor, which we don't use here, and svAction is body IO action of the supervisor. When the svAction is actually evaluated, it automatically forks two threads. One is for yourIOAction1 and the other is for yourIOAction2. Because restart type of given static children are both Permanent, the supervisor always kicks restarting operation when one of yourIOAction1 or yourIOAction2 is terminated. When restarting operation is kicked, the supervisor kills remaining thread and restarts all children again because its restarting strategy is one-for-all.

When the supervisor is terminated, both yourIOAction1 and yourIOAction2 are automatically killed by the supervisor. To kill the supervisor, apply cancel to the async object returned by async svAction.

Create a dynamic child

Dynamic child is thread explicitly forked via newChild function. Following procedure runs your IO action as a dynamic child.

  1. Run a supervisor
  2. Create a ChildSpec from your IO action
  3. Request the supervisor to create a dynamic child based on the ChildSpec by calling newChild

Dynamic children are explicitly forked to each thread via newChild request to running supervisor. Supervisor never restarts dynamic child. It ignores restart type defined in ChildSpec of dynamic child.

Dynamic child example

Following code runs a supervisor in different thread then request it to run a dynamic child.

    -- Run supervisor in another thread
    Actor svQ svAction <- newActor $ newSimpleOneForOneSupervisor
    asyncSv <- async svAction
    -- Request to run your action under the supervisor
    let yourChildSpec = newChildSpec Temporary yourIOAction
    maybeChildThreadId <- newChild def svQ yourChildSpec

The idiom newActor $ newSimpleOneForOneSupervisor returns Actor svQ svAction where svQ is write-end of message queue for the supervisor actor and svAction is body IO action of the supervisor. When the svAction is actually evaluated, it listens svQ and wait for request to run dynamic child.

When newChild is called with svQ, it sends request to the supervisor to run a dynamic child with given ChildSpec.

When the supervisor is terminated, requested children are automatically killed by the supervisor if they are still running.

To kill the supervisor, apply cancel to asyncSv.

Building Blocks

This package consists of following building blocks.

  • Actor and Message queue
  • Monitored IO action and supervisable IO action
  • Behaviors (state machine, server, and supervisor)

Actor and message queue is lowest layer block of this package. Behaviors are built upon this block. It is exposed to user so that you can use it for implementing actor style concurrent program.

Monitored IO action is the heart of this package. It implements most sensitive part of dealing with asynchronous exception. Monitored IO action provides guaranteed notification on thread termination so that supervisor can provide guaranteed supervision on threads.

Behaviors - state machine, server, and supervisor - implement simplified Erlang/OTP behaviors so that user can leverage best practice of concurrent programming from Erlang/OTP.

Actor and Message queue

Actor is restartable IO action with inbound message queue. Actor is designed to allow other threads sending messages to an actor keep using the same write-end of the queue before and after restart of the actor. Actor consists of message queue and its handler. Inbox is a message queue designed for actor's message inbox. It is thread-safe, bounded or unbounded, and selectively readable queue.

To protect read-end of the queue, separate types are given to read-end and write-end. Message handler of actor can access to both end but only write-end is accessible from outside of message handler. To realize this, constructor of message queue are not exposed. The only way to create a new Inbox object is creating a new actor using newActor function.

newActor :: (Inbox message -> IO result) -> IO (Actor message result)

This package provides type synonym for message handler as below.

type ActorHandler message result = (Inbox message -> IO result)

newActor receives an user supplied message handler, creates a new Inbox value, then returns write-end of actor's message queue and IO action of the actor's body wrapped by Actor. Actor is defined as following.

data Actor message result = Actor
    { actorQueue  :: ActorQ message -- ^ Write end of message queue of 'Actor'
    , actorAction :: IO result      -- ^ IO action to execute 'Actor'
    }

The ActorQ message in the Actor is the write-end of created Inbox. While user supplied message handler receives Inbox, which is read-end of created queue, caller of newActor gets write-end only.

Message Queue

Inbox is specifically designed queue for implementing actor. All behaviors available in this package depend on it. It provides following capabilities.

  • Thread-safe read and write.
  • Blocking and non-blocking read operation.
  • Selective read operation.
  • Current queue length.
  • Bounded queue.

The type Inbox is intended to be used only for reading side as inbox of actor. Single Inbox object is only readable from single actor. In order to avoid reading from other actors, no constructors are exposed but instead you can get it only via newActor or newBoundedActor.

Read an oldest message from Inbox

To read a message at the head of message queue, apply receive to Inbox. If one or more message is available, receive returns oldest one. If no message is available, receive blocks until at least one message arrives. A skeleton of actor message handler will look like this.

myActorHandler :: Inbox YourMessageType -> IO ()
myActorHandler inbox = do
    newMessage <- receive inbox
    doSomethingWith newMessage
    myActorHandler inbox

Send a message to an actor

To send a message to an actor, call send with write-end of the actor's inbox and the message.

    send :: ActorQ message -> message -> IO ()

ActorQ is write-end of actor's message queue. ActorQ is actually just a wrapper of Inbox. Its role is hiding read-end API of Inbox. From outside of actor, only write-end is exposed via ActorQ. From inside of actor, both read-end and write-end are available. You can read from given inbox directly. You can write to given inbox with sendToMe.

Send a message from an actor to itself

When you need to send a message from an actor to the actor itself, call sendToMe.

    sendToMe :: Inbox message -> message -> IO ()

Following code demonstrates how entire actor handler will look like.

myActorHandler :: Inbox YourMessageType -> IO ()
myActorHandler inbox = do
    newMessage <- receive inbox
    doSomethingWith newMessage

    sendToMe inbox messageToMyself  -- Send a message to itself.

    myActorHandler inbox

Actor

Actor is IO action emulating Erlang's actor. It has a dedicated Inbox and processes incoming messages until reaching end state.

Actor is restartable without replacing message queue. When actor's IO action crashed and restarted, the new execution of the IO action continue referring the same message queue. Thus, threads sending messages to the actor can continue using the same write-end of the queue.

newActor and newBoundedActor create an actor with new Inbox. It is the only exposed way to create a new Inbox. This limitation is intended. It prevents any code other than message handler of the actor from reading the inbox.

From perspective of outside of actor, user supplies an IO action with type ActorHandler to newActor or newBoundedActor then user gets IO action of created actor and write-end of message queue of the actor, which is ActorQ type value.

From perspective of inside of actor, in other word, from perspective of user supplied message handler, it has a message queue both read and write side available.

Shared Inbox

You can run created actor multiple time simultaneously with different thread each. In such case, each actor instances share single Inbox. This would be useful to distribute task stream to multiple worker actor instances, however, keep in mind there is no way to control which message is routed to what actor.

Monitored IO action

This package provides facility for supervising IO actions. With types and functions described in this section, you can run IO action with its own thread and receive notification on its termination at another thread with reason of termination. Functions in this section provides guaranteed supervision of your thread.

It looks something similar to UnliftIO.bracket. What distinguishes from bracket is guaranteed work through entire lifetime of thread.

Use UnliftIO.bracket when you need guaranteed cleanup of resources acquired within the same thread. It works as you expect. However, installing callback for thread supervision using bracket (or UnliftIO.finally or even low level UnliftIO.catch) within a thread has NO guarantee. There is a little window where asynchronous exception is thrown after the thread is started but callback is not yet installed. We will discuss this later in this section.

Notification is delivered via user supplied callback. Helper functions described in this section install your callback to your IO action. Then the callback will be called on termination of the IO action.

Important: Callback is called in terminated thread

Callback is called in terminated thread. You have to use inter-thread communication in order to notify to another thread.

User supplied callback receives ExitReason and UnliftIO.Concurrent.ThreadId so that user can determine witch thread was terminated and why it was terminated. In order to receive those parameters, user supplied callback must have type signature Monitor, which is following.

ExitReason -> ThreadId -> IO ()

Function watch installs your callback to your plain IO action then returns monitored action.

Callback can be nested. Use nestWatch to install another callback to already monitored action.

Helper functions return IO action with signature MonitoredAction instead of plain IO (). From here to the end of this section it will be a little technical deep dive for describing why it has such signature.

The signature of MonitoredAction is this.

(IO () -> IO ()) -> IO ()

It requires an extra function argument. It is because MonitoredAction will be invoked with UnliftIO.Concurrent.forkIOWithUnmask.

In order to ensure callback on termination works in any timing, the callback must be installed under asynchronous exception masked. At the same time, in order to allow killing the tread from another thread, body of IO action must be executed under asynchronous exception unmasked. In order to satisfy both conditions, the IO action and callback must be called using UnliftIO.Concurrent.forkIOWithUnmask. Typically it looks like following.

mask_ $ forkIOWithUnmask $ \unmask -> unmask action `finally` callback

The extra function parameter in the signature of MonitoredAction is used for accepting the @unmask@ function which is passed by UnliftIO.Concurrent.forkIOWithUnmask. Functions defined in this section help installing callback and converting type to fit to UnliftIO.Concurrent.forkIOWithUnmask.

Child specification - supervisable process

ChildSpec is casting mold of child thread IO action which supervisor spawns and manages. It is passed to supervisor, then supervisor let it run with its own thread, monitor it, and restart it if needed. ChildSpec provides additional attributes to MonitoredAction for controlling restart on thread termination. That is Restart. Restart represents restart type concept came from Erlang/OTP. The value of Restart defines how restart operation by supervisor is triggered on termination of the thread. ChildSpec with Permanent restart type triggers restart operation regardless its reason of termination. It triggers restarting even by normal exit. Transient triggers restarting only when the thread is terminated by exception. Temporary never triggers restarting.

Refer to Erlang/OTP for more detail of restart type concept.

newMonitoredChildSpec creates a new ChildSpec from a MonitoredAction and a restart type value. newChildSpec is short cut function creating a ChildSpec from a plain IO action and a restart type value. addMonitor adds another monitor to existing ChildSpec.

Behaviors

This package provides state machine, server, and supervisor behavior from Erlang/OTP with slight modifications.

All behaviors available in this package are defined as ActorHandler so that they can be easily supervised by converting them to actor using newActor.

Server behavior is built upon state machine behavior. Supervisor is built on top of server behavior.

State Machine behavior

State machine behavior is most essential behavior in this package. It provides framework for creating IO action of finite state machine running on its own thread. State machine has single Inbox, its local state, and a user supplied message handler. State machine is created with initial state value, waits for incoming message, passes received message and current state to user supplied handler, updates state to returned value from user supplied handler, stops or continue to listen message queue based on what the handler returned.

To create a new state machine, prepare initial state of your state machine and define your message handler driving your state machine, apply newStateMachine to the initial state and handler. You will get a ActorHandler so you can get an actor of the state machine by applying newActor to it.

Actor queue action <-  newActor $ newStateMachine initialState handler

Or you can use short-cut helper.

Actor queue action <-  newStateMachineActor initialState handler

The newStateMachine returns write-end of message queue for the state machine and IO action to run. You can run the IO action by Control.Concurrent.forkIO or Control.Concurrent.async, or you can let supervisor run it.

User supplied message handler must have following type signature.

handler :: (state -> message -> IO (Either result state))

When a message is sent to state machine's queue, it is automatically received by state machine framework, then the handler is called with current state and the message. The handler must return either result or next state. When Left result is returned, the state machine stops and returned value of the IO action is IO result. When Right state is returned, the state machine updates current state with the returned state and wait for next incoming message.

Server behavior

Server behavior provides synchronous request-response style communication, a.k.a. ask pattern, with actor. Server behavior allows user to send a request to an actor then wait for response form the actor. This package provides a framework for implementing such actor.

Server behavior in this package is actually a set of helper functions and type synonym to help implementing ask pattern over actor. User need to follow some of rules described below to utilize those helpers.

Define ADT type for messages

First, user need to define an algebraic data type for message to the server in following form.

data myServerCommand
    = ReqWithoutResp1
    | ReqWithoutResp2 Arg1
    | ReqWithoutResp3 Arg2 Arg3
    | ReqWithResp1 (ServerCallback Result1)
    | ReqWithResp1 ArgX (ServerCallback Result2)
    | ReqWithResp2 ArgY ArgZ (ServerCallback Result3)

The rule is this:

  • Define an ADT containing all requests.
  • If a request doesn't return response, define a value type for the request as usual element of sum type.
  • If a request returns a response, put (ServerCallback ResultType) at the last argument of the constructor for the request where ResultType is type of returned value.

ServerCallback is type synonym of a function type as following.

type ServerCallback a = (a -> IO ())

So real definition of your myServerCommand is:

data MyServerCommand
    = ReqWithoutResp1
    | ReqWithoutResp2 Arg1
    | ReqWithoutResp3 Arg2 Arg3
    | ReqWithResp1 (Result1 -> IO ())
    | ReqWithResp2 ArgX (Result2 -> IO ())
    | ReqWithResp3 ArgY ArgZ (Result3 -> IO ())

Define message handler

Next, user need to define an actor handling the message. In this example, we will use state machine behavior so that we can focus on core message handling part. For simplicity, this example doesn't have internal state and it never finishes.

Define a state machine message handler handling myServerCommand.

myHandler :: () -> MyServerCommand -> IO (Either () ())
myHandler _  ReqWithoutResp1                  = doJob1 $> Right ()
myHandler _ (ReqWithoutResp2 arg1)            = doJob2 arg1 $> Right ()
myHandler _ (ReqWithoutResp3 arg2 arg3)       = doJob3 arg2 arg3 $> Right ()
myHandler _ (ReqWithResp1 cont1)              = (doJob4 >>= cont1) $> Right ()
myHandler _ (ReqWithResp2 argX cont2)         = (doJob5 argX >>= cont2) $> Right ()
myHandler _ (ReqWithResp3 argY argZ cont3)    = (doJob6 argY argZ >>= cont3) $> Right ()

The core idea here is implementing request handler in CPS style. If a request returns a response, the request message comes with callback function (a.k.a. continuation). You can send back response for the request by calling the callback.

Requesting to server

Function call, callAsync, and callIgnore are helper functions to implement request-response communication with server. They install callback to message, send the message, returns response to caller. They receive partially applied server message constructor, apply it to callback function, then send it to server. The installed callback handles response from the server. You can use call like following.

    maybeResult1 <- call def myServerActor ReqWithResp1
    maybeResult2 <- call def myServerActor $ ReqWithResp2 argX
    maybeResult3 <- call def myServerActor $ ReqWithResp3 argY argZ

When you send a request without response, use cast.

    cast myServerActor ReqWithoutResp1
    cast myServerActor $ ReqWithoutResp2 arg1
    cast myServerActor $ ReqWithoutResp3 arg2 arg3

When you send a request with response but ignore it, use callIgnore.

    callIgnore myServerActor ReqWithResp1
    callIgnore myServerActor $ ReqWithResp2 argX
    callIgnore myServerActor $ ReqWithResp3 argY argZ

Generally, ask pattern, or synchronous request-response communication is not recommended in actor model. It is because synchronous request blocks entire actor until it receives response or timeout. You can mitigate the situation by wrapping the synchronous call with async. Use callAsync for such purpose.

Supervisor behavior

WIP

Supervisor behavior provides Erlang/OTP like thread supervision with some simplification.

Design Considerations

Separate role of threads

When you design thread hierarchy with this package, you have to follow design rule of Erlang/OTP where only supervisor can have children threads.

In Erlang/OTP, there are two type of Erlang process.

  • Supervisor
  • Worker

Supervisor has children processes and supervise them. Worker does real task but never has child process.

Without this rule, you have to have both supervision functionality and real task processing functionality within single process. That leads more complex implementation of process.

With this rule, worker no longer have to take care of supervising children. But at the same time you cannot create child process directly from worker.

Key Difference from Erlang/OTP Supervisor

  • Mutable variables are shared
  • Dynamic children are always Temporary
  • No shutdown method to terminate child
  • No RestForOne strategy
  • Every actor has dedicated Haskell thread

Mutable variables are shared

While "share nothing" is a key concept of Erlang, there is no such guarantee in this package. Message passed from one Haskell thread to another thread is shared between both threads. This isn't a problem as long as message content is normal Haskell object. Normal Haskell object is immutable. Nobody mutates its value. So, in normal Haskell object, sharing is identical to copying.

However, when you pass mutable object like IORef, MVar, or TVar, do it with care. Those object can be mutated by other thread.

Dynamic children are always Temporary

Child thread created by newChild always created as Temporary child regardless which restart type is designated in its spec. Temporary children are never been restarted by supervisor. Permanent or Transient child must be a part of ChildSpec list given to supervisor spec.

No shutdown method to terminate child

When supervisor terminates its children, supervisor always throw asynchronous exception to children. There is no option like exit(Child, shutdown) found in Erlang/OTP.

You must implement appropriate resource cleanup on asynchronous exception. You can implement graceful shutdown by yourself and it is always a good practice. However it does not arrow you escape from dealing with asynchronous exception. This package decided not to enforce you to implement graceful shutdown but leaves it your choice.

No RestForOne strategy

Only OneForOne and OneForAll restart strategy is supported.

Every actor has dedicated Haskell thread

Unlike some of other actor implementations, each actor in this package has its own Haskell thread. It means every actor has dedicated stack for each. Thus calling blocking API in middle of message handling does NOT prevent other actor running.

Some actor implementation give thread and stack to an actor only when it handles incoming message. In such implementation, actor has no thread and stack when it is waiting for next message. This maximizes scalability. Even though there are billions of actors, you only need n threads and stacks while you have n core micro processor.

A downside of such implementation is it strictly disallows blocking operation in middle of message handling. In such implementation, calling a blocking API in an actor system running with single thread causes stall of entire actor system until the blocking API returns.

That doesn't happen in this package. Though you call any blocking API in middle of actor message handler, other Haskell threads continue running.

Giving dedicated thread to each actor requires giving dedicated stack frame to each actor too. It consumes more memory than the above design. However, in Haskell, it won't be a serious problem. These are the reason why.

  • In Haskell, size of stack frame starts from 1KB and grows as needed.
  • It can be moved by GC so no continuous address space is required at beginning.

It is one of the greatest characteristic of GHC's runtime. This package decided to leverage it.

Resource management

The word resource in this context means object kept in runtime but not garbage collected. For example, file handles, network sockets, and threads are resources. In Haskell, losing reference to those objects does NOT mean those objects will be closed or terminated. You have to explicitly close handles and sockets, terminate threads before you lose reference to them.

This becomes more complex under threaded GHC environment. Under GHC, thread can receive asynchronous exception in any timing. You have to cleanup resources when your thread received asynchronous exception as well as in case of normal exit and synchronous exception scenario.

This package does take care of threads managed by supervisor but you have to take care of any other resources.