module TheatreDev.Terminal.Actor
  ( Actor,

    -- * Manipulation
    adaptToList,

    -- * Acquisition
    spawnStatelessGranular,
    spawnStatefulGranular,
    spawnStatefulBatched,

    -- * Control
    tell,
    kill,
    wait,
  )
where

import Control.Concurrent.Chan.Unagi qualified as E
import Control.Concurrent.STM.TBQueue
import Control.Concurrent.STM.TMVar
import TheatreDev.ExtrasFor.List qualified as List
import TheatreDev.ExtrasFor.TBQueue
import TheatreDev.Prelude

-- |
-- Controls of an actor, which processes the messages of type @message@.
--
-- Abstraction over the message channel, thread-forking and killing.
data Actor message = Actor
  { -- | Send a message to the actor.
    forall message. Actor message -> message -> IO ()
tell :: message -> IO (),
    -- | Kill the actor.
    forall message. Actor message -> IO ()
kill :: IO (),
    -- | Wait for the actor to die due to error or being killed.
    forall message. Actor message -> IO ()
wait :: IO ()
  }

instance Semigroup (Actor message) where
  <> :: Actor message -> Actor message -> Actor message
(<>) (Actor message -> IO ()
lTell IO ()
lKill IO ()
lWait) (Actor message -> IO ()
rTell IO ()
rKill IO ()
rWait) =
    forall message.
(message -> IO ()) -> IO () -> IO () -> Actor message
Actor message -> IO ()
tell IO ()
kill IO ()
wait
    where
      tell :: message -> IO ()
tell message
msg = message -> IO ()
lTell message
msg forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> message -> IO ()
rTell message
msg
      kill :: IO ()
kill = IO ()
lKill forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
rKill
      wait :: IO ()
wait = IO ()
lWait forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
rWait

instance Monoid (Actor message) where
  mempty :: Actor message
mempty =
    forall message.
(message -> IO ()) -> IO () -> IO () -> Actor message
Actor (forall a b. a -> b -> a
const (forall (m :: * -> *) a. Monad m => a -> m a
return ())) (forall (m :: * -> *) a. Monad m => a -> m a
return ()) (forall (m :: * -> *) a. Monad m => a -> m a
return ())

instance Contravariant Actor where
  contramap :: forall a' a. (a' -> a) -> Actor a -> Actor a'
contramap a' -> a
fn (Actor a -> IO ()
tell IO ()
kill IO ()
wait) =
    forall message.
(message -> IO ()) -> IO () -> IO () -> Actor message
Actor (a -> IO ()
tell forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a' -> a
fn) IO ()
kill IO ()
wait

instance Divisible Actor where
  conquer :: forall message. Actor message
conquer =
    forall a. Monoid a => a
mempty
  divide :: forall a b c. (a -> (b, c)) -> Actor b -> Actor c -> Actor a
divide a -> (b, c)
divisor (Actor b -> IO ()
lTell IO ()
lKill IO ()
lWait) (Actor c -> IO ()
rTell IO ()
rKill IO ()
rWait) =
    forall message.
(message -> IO ()) -> IO () -> IO () -> Actor message
Actor a -> IO ()
tell IO ()
kill IO ()
wait
    where
      tell :: a -> IO ()
tell a
msg = case a -> (b, c)
divisor a
msg of (b
lMsg, c
rMsg) -> b -> IO ()
lTell b
lMsg forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> c -> IO ()
rTell c
rMsg
      kill :: IO ()
kill = IO ()
lKill forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
rKill
      wait :: IO ()
wait = IO ()
lWait forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
rWait

instance Decidable Actor where
  lose :: forall a. (a -> Void) -> Actor a
lose a -> Void
fn =
    forall message.
(message -> IO ()) -> IO () -> IO () -> Actor message
Actor (forall a b. a -> b -> a
const (forall (m :: * -> *) a. Monad m => a -> m a
return ()) forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. Void -> a
absurd forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> Void
fn) (forall (m :: * -> *) a. Monad m => a -> m a
return ()) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
  choose :: forall a b c. (a -> Either b c) -> Actor b -> Actor c -> Actor a
choose a -> Either b c
choice (Actor b -> IO ()
lTell IO ()
lKill IO ()
lWait) (Actor c -> IO ()
rTell IO ()
rKill IO ()
rWait) =
    forall message.
(message -> IO ()) -> IO () -> IO () -> Actor message
Actor a -> IO ()
tell IO ()
kill IO ()
wait
    where
      tell :: a -> IO ()
tell = forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either b -> IO ()
lTell c -> IO ()
rTell forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> Either b c
choice
      kill :: IO ()
kill = IO ()
lKill forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
rKill
      wait :: IO ()
wait = IO ()
lWait forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
rWait

-- |
-- Adapt the actor to be able to receive lists of messages.
adaptToList :: Actor message -> Actor [message]
adaptToList :: forall message. Actor message -> Actor [message]
adaptToList Actor {IO ()
message -> IO ()
wait :: IO ()
kill :: IO ()
tell :: message -> IO ()
wait :: forall message. Actor message -> IO ()
kill :: forall message. Actor message -> IO ()
tell :: forall message. Actor message -> message -> IO ()
..} =
  case forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ message -> IO ()
tell of
    [message] -> IO ()
tell -> Actor {IO ()
[message] -> IO ()
tell :: [message] -> IO ()
wait :: IO ()
kill :: IO ()
wait :: IO ()
kill :: IO ()
tell :: [message] -> IO ()
..}

-- |
-- Given an interpreter of messages,
-- fork a thread to run the handler daemon on and
-- produce a handle to control that actor.
--
-- Killing that actor will make it process all the messages in the queue first.
-- All the messages sent to it after killing won't be processed.
spawnStatelessGranular ::
  -- | Interpreter of a message.
  (message -> IO ()) ->
  -- | Clean up when killed.
  IO () ->
  -- | Fork a thread to run the handler daemon on and
  -- produce a handle to control it.
  IO (Actor message)
spawnStatelessGranular :: forall message. (message -> IO ()) -> IO () -> IO (Actor message)
spawnStatelessGranular message -> IO ()
interpretMessage IO ()
cleanUp =
  do
    (InChan (Maybe message)
inChan, OutChan (Maybe message)
outChan) <- forall a. IO (InChan a, OutChan a)
E.newChan
    MVar ()
lock <- forall a. IO (MVar a)
newEmptyMVar
    ThreadId
spawningThreadId <- IO ThreadId
myThreadId
    IO () -> IO ThreadId
forkIO
      forall a b. (a -> b) -> a -> b
$ let loop :: IO ()
loop =
              {-# SCC "spawnStatelessGranular/loop" #-}
              do
                Maybe message
message <- forall a. OutChan a -> IO a
E.readChan OutChan (Maybe message)
outChan
                case Maybe message
message of
                  Just message
payload ->
                    do
                      Either SomeException ()
res <- forall e a. Exception e => IO a -> IO (Either e a)
try @SomeException forall a b. (a -> b) -> a -> b
$ message -> IO ()
interpretMessage message
payload
                      case Either SomeException ()
res of
                        Right () -> IO ()
loop
                        Left SomeException
exc ->
                          do
                            IO ()
cleanUp
                            forall a. MVar a -> a -> IO ()
putMVar MVar ()
lock ()
                            forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
spawningThreadId SomeException
exc
                  Maybe message
Nothing ->
                    do
                      IO ()
cleanUp
                      forall a. MVar a -> a -> IO ()
putMVar MVar ()
lock ()
         in IO ()
loop
    return
      ( forall message.
(message -> IO ()) -> IO () -> IO () -> Actor message
Actor
          (forall a. InChan a -> a -> IO ()
E.writeChan InChan (Maybe message)
inChan forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. a -> Maybe a
Just)
          (forall a. InChan a -> a -> IO ()
E.writeChan InChan (Maybe message)
inChan forall a. Maybe a
Nothing)
          (forall a. MVar a -> IO a
takeMVar MVar ()
lock)
      )

-- |
-- Actor with memory.
--
-- Threads a persistent state thru its iterations.
--
-- Given an interpreter of messages and initial state generator,
-- forks a thread to run the computation on and
-- produces a handle to address that actor.
--
-- Killing that actor will make it process all the messages in the queue first.
-- All the messages sent to it after killing won't be processed.
spawnStatefulGranular :: state -> (state -> message -> IO state) -> (state -> IO ()) -> IO (Actor message)
spawnStatefulGranular :: forall state message.
state
-> (state -> message -> IO state)
-> (state -> IO ())
-> IO (Actor message)
spawnStatefulGranular state
zero state -> message -> IO state
step state -> IO ()
finalizer =
  forall state message.
state
-> (state -> NonEmpty message -> IO state)
-> (state -> IO ())
-> IO (Actor message)
spawnStatefulBatched state
zero state -> NonEmpty message -> IO state
newStep state -> IO ()
finalizer
  where
    newStep :: state -> NonEmpty message -> IO state
newStep =
      forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM state -> message -> IO state
step

spawnStatefulBatched :: state -> (state -> NonEmpty message -> IO state) -> (state -> IO ()) -> IO (Actor message)
spawnStatefulBatched :: forall state message.
state
-> (state -> NonEmpty message -> IO state)
-> (state -> IO ())
-> IO (Actor message)
spawnStatefulBatched state
zero state -> NonEmpty message -> IO state
step state -> IO ()
finalizer =
  do
    TBQueue (Maybe message)
queue <- forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
1000
    TVar Bool
aliveVar <- forall a. a -> IO (TVar a)
newTVarIO Bool
True
    TMVar (Maybe SomeException)
resVar <- forall a. IO (TMVar a)
newEmptyTMVarIO @(Maybe SomeException)
    ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask ->
      let loop :: state -> IO ()
loop !state
state =
            forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
              NonEmpty (Maybe message)
flushing <- forall a. TBQueue a -> STM (NonEmpty a)
flushNonEmptyTBQueue TBQueue (Maybe message)
queue
              let ([message]
messages, [Maybe message]
flushingTail) = forall a. [Maybe a] -> ([a], [Maybe a])
List.splitWhileJust (forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty (Maybe message)
flushing)
              case [message]
messages of
                -- Automatically means that the tail is not empty.
                [] -> do
                  forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
aliveVar Bool
False
                  forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe SomeException)
resVar forall a. Maybe a
Nothing
                  return $ do
                    state -> IO ()
finalizer state
state
                message
messagesHead : [message]
messagesTail ->
                  forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ do
                    Either SomeException state
result <- forall e a. Exception e => IO a -> IO (Either e a)
try @SomeException forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO a
unmask forall a b. (a -> b) -> a -> b
$ state -> NonEmpty message -> IO state
step state
state (message
messagesHead forall a. a -> [a] -> NonEmpty a
:| [message]
messagesTail)
                    case Either SomeException state
result of
                      Right state
newState ->
                        case [Maybe message]
flushingTail of
                          [] -> state -> IO ()
loop state
newState
                          [Maybe message]
_ -> do
                            forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
                              forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
aliveVar Bool
False
                              forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe SomeException)
resVar forall a. Maybe a
Nothing
                            state -> IO ()
finalizer state
state
                      Left SomeException
exception -> do
                        forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
                          forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
aliveVar Bool
False
                          forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe SomeException)
resVar forall a. Maybe a
Nothing
                        state -> IO ()
finalizer state
state
       in state -> IO ()
loop state
zero
    return
      Actor
        { tell :: message -> IO ()
tell = \message
message -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
            Bool
alive <- forall a. TVar a -> STM a
readTVar TVar Bool
aliveVar
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
alive
              forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe message)
queue
              forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just message
message,
          kill :: IO ()
kill = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
            Bool
alive <- forall a. TVar a -> STM a
readTVar TVar Bool
aliveVar
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
alive
              forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe message)
queue forall a. Maybe a
Nothing,
          wait :: IO ()
wait = do
            Maybe SomeException
res <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> STM a
takeTMVar TMVar (Maybe SomeException)
resVar
            case Maybe SomeException
res of
              Maybe SomeException
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
              Just SomeException
exception -> forall e a. Exception e => e -> IO a
throwIO SomeException
exception
        }