{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# OPTIONS_HADDOCK prune #-}

{- |
Utility functions for running 'Program' actions concurrently.

Haskell uses green threads: small lines of work that are scheduled down onto
actual execution contexts (set by default by this library to be one per core).
Haskell threads are incredibly lightweight, and you are encouraged to use them
freely. Haskell provides a rich ecosystem of tools to do work concurrently and
to communicate safely between threads.

This module provides wrappers around some of these primatives so you can use
them easily from the 'Program' monad.

Note that when you fire off a new thread the top-level application state is
/shared/; it's the same @τ@ inherited from the parent 'Program'.
-}
module Core.Program.Threads (
    -- * Concurrency
    forkThread,
    waitThread,
    waitThread_,

    -- * Helper functions
    concurrentThreads,
    concurrentThreads_,
    raceThreads,
    raceThreads_,

    -- * Internals
    Thread,
    unThread,
) where

import Control.Concurrent.Async (Async)
import qualified Control.Concurrent.Async as Async (
    async,
    concurrently,
    concurrently_,
    link,
    race,
    race_,
    wait,
 )
import Control.Concurrent.MVar (
    newMVar,
    readMVar,
 )
import Control.Monad (
    void,
 )
import Control.Monad.Reader.Class (MonadReader (ask))
import Core.Program.Context
import Core.System.Base

{- |
A thread for concurrent computation.

(this wraps __async__'s 'Async')
-}
newtype Thread α = Thread (Async α)

unThread :: Thread α -> Async α
unThread :: Thread α -> Async α
unThread (Thread Async α
a) = Async α
a

{- |
Fork a thread. The child thread will run in the same @Context@ as the calling
@Program@, including sharing the user-defined application state value.

(this wraps __async__\'s 'Control.Concurrent.Async.async' which in turn wraps
__base__'s 'Control.Concurrent.forkIO')
-}
forkThread :: Program τ α -> Program τ (Thread α)
forkThread :: Program τ α -> Program τ (Thread α)
forkThread Program τ α
program = do
    Context τ
context <- Program τ (Context τ)
forall r (m :: * -> *). MonadReader r m => m r
ask
    let i :: MVar TimeStamp
i = Context τ -> MVar TimeStamp
forall τ. Context τ -> MVar TimeStamp
startTimeFrom Context τ
context

    IO (Thread α) -> Program τ (Thread α)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Thread α) -> Program τ (Thread α))
-> IO (Thread α) -> Program τ (Thread α)
forall a b. (a -> b) -> a -> b
$ do
        TimeStamp
start <- MVar TimeStamp -> IO TimeStamp
forall a. MVar a -> IO a
readMVar MVar TimeStamp
i
        MVar TimeStamp
i' <- TimeStamp -> IO (MVar TimeStamp)
forall a. a -> IO (MVar a)
newMVar TimeStamp
start

        let context' :: Context τ
context' = Context τ
context{$sel:startTimeFrom:Context :: MVar TimeStamp
startTimeFrom = MVar TimeStamp
i'}

        Async α
a <- IO α -> IO (Async α)
forall a. IO a -> IO (Async a)
Async.async (IO α -> IO (Async α)) -> IO α -> IO (Async α)
forall a b. (a -> b) -> a -> b
$ do
            Context τ -> Program τ α -> IO α
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context' Program τ α
program
        Async α -> IO ()
forall a. Async a -> IO ()
Async.link Async α
a
        Thread α -> IO (Thread α)
forall (m :: * -> *) a. Monad m => a -> m a
return (Async α -> Thread α
forall α. Async α -> Thread α
Thread Async α
a)

{- |
Wait for the completion of a thread, returning the result. This is a blocking
operation.

(this wraps __async__\'s 'wait')
-}
waitThread :: Thread α -> Program τ α
waitThread :: Thread α -> Program τ α
waitThread (Thread Async α
a) = IO α -> Program τ α
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO α -> Program τ α) -> IO α -> Program τ α
forall a b. (a -> b) -> a -> b
$ Async α -> IO α
forall a. Async a -> IO a
Async.wait Async α
a

{- |
Wait for the completion of a thread, discarding its result. This is
particularly useful at the end of a do-block if you're waiting on a worker
thread to finish but don't need its return value, if any; otherwise you have
to explicily deal with the unused return value:

@
    _ <- 'waitThread' t1
    'return' ()
@

which is a bit tedious. Instead, you can just use this convenience function:

@
    'waitThread_' t1
@

The trailing underscore in the name of this function follows the same
convetion as found in "Control.Monad", which has 'Control.Monad.mapM_' which
does the same as 'Control.Monad.mapM' but which likewise discards the return
value.
-}
waitThread_ :: Thread α -> Program τ ()
waitThread_ :: Thread α -> Program τ ()
waitThread_ = Program τ α -> Program τ ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Program τ α -> Program τ ())
-> (Thread α -> Program τ α) -> Thread α -> Program τ ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Thread α -> Program τ α
forall α τ. Thread α -> Program τ α
waitThread

{- |
Fork two threads and wait for both to finish. The return value is the pair of
each action's return types.

This is the same as calling 'forkThread' and 'waitThread' twice, except that
if either sub-program fails with an exception the other program which is still
running will be cancelled and the original exception is then re-thrown.

@
    (a,b) <- 'concurrentThreads' one two

    -- continue, doing something with both results.
@

For a variant that ingores the return values and just waits for both see
'concurrentThreads_' below.

(this wraps __async__\'s 'Control.Concurrent.Async.concurrently')
-}
concurrentThreads :: Program τ α -> Program τ β -> Program τ (α, β)
concurrentThreads :: Program τ α -> Program τ β -> Program τ (α, β)
concurrentThreads Program τ α
one Program τ β
two = do
    Context τ
context <- Program τ (Context τ)
forall r (m :: * -> *). MonadReader r m => m r
ask
    IO (α, β) -> Program τ (α, β)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (α, β) -> Program τ (α, β)) -> IO (α, β) -> Program τ (α, β)
forall a b. (a -> b) -> a -> b
$ do
        IO α -> IO β -> IO (α, β)
forall a b. IO a -> IO b -> IO (a, b)
Async.concurrently
            (Context τ -> Program τ α -> IO α
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context Program τ α
one)
            (Context τ -> Program τ β -> IO β
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context Program τ β
two)

{- |
Fork two threads and wait for both to finish.

This is the same as calling 'forkThread' and 'waitThread_' twice, except that
if either sub-program fails with an exception the other program which is still
running will be cancelled and the original exception is then re-thrown.

(this wraps __async__\'s 'Control.Concurrent.Async.concurrently_')
-}
concurrentThreads_ :: Program τ α -> Program τ β -> Program τ ()
concurrentThreads_ :: Program τ α -> Program τ β -> Program τ ()
concurrentThreads_ Program τ α
one Program τ β
two = do
    Context τ
context <- Program τ (Context τ)
forall r (m :: * -> *). MonadReader r m => m r
ask
    IO () -> Program τ ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Program τ ()) -> IO () -> Program τ ()
forall a b. (a -> b) -> a -> b
$ do
        IO α -> IO β -> IO ()
forall a b. IO a -> IO b -> IO ()
Async.concurrently_
            (Context τ -> Program τ α -> IO α
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context Program τ α
one)
            (Context τ -> Program τ β -> IO β
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context Program τ β
two)

{- |
Fork two threads and race them against each other. This blocks until one or
the other of the threads finishes. The return value will be 'Left' @α@ if the
first program (@one@) completes first, and 'Right' @β@ if it is the second
program (@two@) which finishes first. The sub program which is still running
will be cancelled with an exception.

@
    result <- 'raceThreads' one two
    case result of
        Left a -> do
            -- one finished first
        Right b -> do
            -- two finished first
@

For a variant that ingores the return value and just races the threads see
'raceThreads_' below.

(this wraps __async__\'s 'Control.Concurrent.Async.race')
-}
raceThreads :: Program τ α -> Program τ β -> Program τ (Either α β)
raceThreads :: Program τ α -> Program τ β -> Program τ (Either α β)
raceThreads Program τ α
one Program τ β
two = do
    Context τ
context <- Program τ (Context τ)
forall r (m :: * -> *). MonadReader r m => m r
ask
    IO (Either α β) -> Program τ (Either α β)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either α β) -> Program τ (Either α β))
-> IO (Either α β) -> Program τ (Either α β)
forall a b. (a -> b) -> a -> b
$ do
        IO α -> IO β -> IO (Either α β)
forall a b. IO a -> IO b -> IO (Either a b)
Async.race
            (Context τ -> Program τ α -> IO α
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context Program τ α
one)
            (Context τ -> Program τ β -> IO β
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context Program τ β
two)

{- |
Fork two threads and race them against each other. When one action completes
the other will be cancelled with an exception. This is useful for enforcing
timeouts:

@
    'raceThreads_'
        ('sleepThread' 300)
        (do
            -- We expect this to complete within 5 minutes.
            performAction
        )
@

(this wraps __async__\'s 'Control.Concurrent.Async.race_')
-}
raceThreads_ :: Program τ α -> Program τ β -> Program τ ()
raceThreads_ :: Program τ α -> Program τ β -> Program τ ()
raceThreads_ Program τ α
one Program τ β
two = do
    Context τ
context <- Program τ (Context τ)
forall r (m :: * -> *). MonadReader r m => m r
ask
    IO () -> Program τ ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Program τ ()) -> IO () -> Program τ ()
forall a b. (a -> b) -> a -> b
$ do
        IO α -> IO β -> IO ()
forall a b. IO a -> IO b -> IO ()
Async.race_
            (Context τ -> Program τ α -> IO α
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context Program τ α
one)
            (Context τ -> Program τ β -> IO β
forall τ α. Context τ -> Program τ α -> IO α
subProgram Context τ
context Program τ β
two)