module TheatreDev.Actor
  ( Actor,

    -- * Acquisition
    spawnStatefulIndividual,
    spawnStatefulBatched,
    spawnStatelessIndividual,
    spawnStatelessBatched,

    -- * Control
    tell,
    kill,
    wait,

    -- * Composition
    firstAvailableOneOf,
    byKeyHashOneOf,
    allOf,
  )
where

import TheatreDev.Prelude
import TheatreDev.StmStructures.Runner (Runner)
import TheatreDev.StmStructures.Runner qualified as Runner
import TheatreDev.Tell (Tell)
import TheatreDev.Tell qualified as Tell
import TheatreDev.Wait qualified as Wait

-- |
-- Controls of an actor, which processes the messages of type @message@.
-- The processing runs on a dedicated green thread.
--
-- Provides abstraction over the message channel, thread-forking and killing.
--
-- Monoid instance is not provided for the same reason it is not provided for numbers.
-- This type supports both sum and product composition. See 'allOf', 'firstAvailableOneOf' and 'byKeyHashOneOf'.
data Actor message = Actor
  { -- | Send a message to the actor.
    forall message. Actor message -> message -> STM ()
tell :: message -> STM (),
    -- | Kill the actor.
    forall message. Actor message -> STM ()
kill :: STM (),
    -- | Wait for the actor to die due to error or being killed.
    forall message. Actor message -> STM (Maybe SomeException)
wait :: STM (Maybe SomeException),
    -- | IDs of the constituent actors.
    -- Useful for debugging.
    forall message. Actor message -> [UUID]
ids :: [UUID]
  }

instance Contravariant Actor where
  contramap :: forall a' a. (a' -> a) -> Actor a -> Actor a'
contramap a' -> a
fn (Actor a -> STM ()
tell STM ()
kill STM (Maybe SomeException)
wait [UUID]
ids) =
    forall message.
(message -> STM ())
-> STM () -> STM (Maybe SomeException) -> [UUID] -> Actor message
Actor (a -> STM ()
tell forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a' -> a
fn) STM ()
kill STM (Maybe SomeException)
wait [UUID]
ids

instance Divisible Actor where
  conquer :: forall a. Actor a
conquer =
    forall message.
(message -> STM ())
-> STM () -> STM (Maybe SomeException) -> [UUID] -> Actor message
Actor (forall a b. a -> b -> a
const (forall (m :: * -> *) a. Monad m => a -> m a
return ())) (forall (m :: * -> *) a. Monad m => a -> m a
return ()) (forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing) []
  divide :: forall a b c. (a -> (b, c)) -> Actor b -> Actor c -> Actor a
divide a -> (b, c)
divisor (Actor b -> STM ()
lTell STM ()
lKill STM (Maybe SomeException)
lWait [UUID]
lIds) (Actor c -> STM ()
rTell STM ()
rKill STM (Maybe SomeException)
rWait [UUID]
rIds) =
    Actor
      { $sel:tell:Actor :: a -> STM ()
tell = \a
msg -> case a -> (b, c)
divisor a
msg of (b
lMsg, c
rMsg) -> b -> STM ()
lTell b
lMsg forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> c -> STM ()
rTell c
rMsg,
        $sel:kill:Actor :: STM ()
kill = STM ()
lKill forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM ()
rKill,
        $sel:wait:Actor :: STM (Maybe SomeException)
wait = STM (Maybe SomeException)
-> STM (Maybe SomeException) -> STM (Maybe SomeException)
Wait.both STM (Maybe SomeException)
lWait STM (Maybe SomeException)
rWait,
        $sel:ids:Actor :: [UUID]
ids = [UUID]
lIds forall a. Semigroup a => a -> a -> a
<> [UUID]
rIds
      }

instance Decidable Actor where
  lose :: forall a. (a -> Void) -> Actor a
lose a -> Void
fn =
    forall message.
(message -> STM ())
-> STM () -> STM (Maybe SomeException) -> [UUID] -> Actor message
Actor (forall a b. a -> b -> a
const (forall (m :: * -> *) a. Monad m => a -> m a
return ()) forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. Void -> a
absurd forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> Void
fn) (forall (m :: * -> *) a. Monad m => a -> m a
return ()) (forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing) []
  choose :: forall a b c. (a -> Either b c) -> Actor b -> Actor c -> Actor a
choose a -> Either b c
choice (Actor b -> STM ()
lTell STM ()
lKill STM (Maybe SomeException)
lWait [UUID]
lIds) (Actor c -> STM ()
rTell STM ()
rKill STM (Maybe SomeException)
rWait [UUID]
rIds) =
    Actor
      { $sel:tell:Actor :: a -> STM ()
tell = forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either b -> STM ()
lTell c -> STM ()
rTell forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> Either b c
choice,
        $sel:kill:Actor :: STM ()
kill = STM ()
lKill forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> STM ()
rKill,
        $sel:wait:Actor :: STM (Maybe SomeException)
wait = STM (Maybe SomeException)
-> STM (Maybe SomeException) -> STM (Maybe SomeException)
Wait.both STM (Maybe SomeException)
lWait STM (Maybe SomeException)
rWait,
        $sel:ids:Actor :: [UUID]
ids = [UUID]
lIds forall a. Semigroup a => a -> a -> a
<> [UUID]
rIds
      }

-- * Composition

-- | Distribute the message stream across actors.
-- The message gets delivered to the first available one.
--
-- E.g., using this combinator in combination with 'replicateM'
-- you can construct pools:
--
-- > spawnPool :: Int -> IO (Actor message) -> IO (Actor message)
-- > spawnPool size spawn =
-- >   firstAvailableOneOf <$> replicateM size spawn
--
-- You can consider this being an interface to the Sum monoid.
firstAvailableOneOf :: [Actor message] -> Actor message
firstAvailableOneOf :: forall message. [Actor message] -> Actor message
firstAvailableOneOf = forall message.
([Tell message] -> Tell message)
-> [Actor message] -> Actor message
tellComposition forall a. [Tell a] -> Tell a
Tell.one

-- |
-- Dispatch the message across actors based on a key hash.
--
-- This lets you ensure of a property that messages with
-- the same key will arrive to the same actor,
-- letting you maintain a local associated state in the actors.
--
-- The implementation applies a modulo equal to the amount
-- of actors to the hash and thus determines the index
-- of the actor to dispatch the message to.
-- This is inspired by how partitioning is done in Kafka.
byKeyHashOneOf ::
  -- | Function extracting the key from the message and hashing it.
  (message -> Int) ->
  -- | Pool of actors.
  [Actor message] ->
  Actor message
byKeyHashOneOf :: forall message.
(message -> Int) -> [Actor message] -> Actor message
byKeyHashOneOf = forall message.
([Tell message] -> Tell message)
-> [Actor message] -> Actor message
tellComposition forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. (a -> Int) -> [Tell a] -> Tell a
Tell.byKeyHashOneOf

-- | Distribute the message stream to all provided actors.
--
-- You can consider this being an interface to the Product monoid.
allOf :: [Actor message] -> Actor message
allOf :: forall message. [Actor message] -> Actor message
allOf = forall message.
([Tell message] -> Tell message)
-> [Actor message] -> Actor message
tellComposition forall a. [Tell a] -> Tell a
Tell.all

-- ** Helpers

tellComposition :: ([Tell message] -> Tell message) -> [Actor message] -> Actor message
tellComposition :: forall message.
([Tell message] -> Tell message)
-> [Actor message] -> Actor message
tellComposition [Tell message] -> Tell message
tellReducer [Actor message]
actors =
  Actor
    { $sel:tell:Actor :: Tell message
tell = [Tell message] -> Tell message
tellReducer (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (.tell) [Actor message]
actors),
      $sel:kill:Actor :: STM ()
kill = forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (.kill) [Actor message]
actors,
      $sel:wait:Actor :: STM (Maybe SomeException)
wait = [STM (Maybe SomeException)] -> STM (Maybe SomeException)
Wait.all (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (.wait) [Actor message]
actors),
      $sel:ids:Actor :: [UUID]
ids = forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap (.ids) [Actor message]
actors
    }

fromRunner :: Runner a -> Actor a
fromRunner :: forall a. Runner a -> Actor a
fromRunner Runner a
runner =
  Actor
    { $sel:tell:Actor :: a -> STM ()
tell = forall a. Runner a -> a -> STM ()
Runner.tell Runner a
runner,
      $sel:kill:Actor :: STM ()
kill = forall a. Runner a -> STM ()
Runner.kill Runner a
runner,
      $sel:wait:Actor :: STM (Maybe SomeException)
wait = forall a. Runner a -> STM (Maybe SomeException)
Runner.wait Runner a
runner,
      $sel:ids:Actor :: [UUID]
ids = [forall a. Runner a -> UUID
Runner.getId Runner a
runner]
    }

-- * Acquisition

-- | Spawn an actor which processes messages in isolated executions.
spawnStatelessIndividual ::
  -- | Clean up when killed or exception is thrown.
  IO () ->
  -- | Interpret a message.
  (message -> IO ()) ->
  -- | Fork a thread to run the handler loop on and produce a handle to control it.
  IO (Actor message)
spawnStatelessIndividual :: forall message. IO () -> (message -> IO ()) -> IO (Actor message)
spawnStatelessIndividual IO ()
cleaner message -> IO ()
interpreter =
  -- TODO: Optimize by reimplementing directly.
  forall state message.
state
-> (state -> IO ())
-> (state -> message -> IO state)
-> IO (Actor message)
spawnStatefulIndividual () (forall a b. a -> b -> a
const IO ()
cleaner) (forall a b. a -> b -> a
const message -> IO ()
interpreter)

-- | Spawn an actor which processes all available messages in one execution.
spawnStatelessBatched ::
  -- | Clean up when killed or exception is thrown.
  IO () ->
  -- | Interpret a batch of messages.
  (NonEmpty message -> IO ()) ->
  -- | Fork a thread to run the handler loop on and produce a handle to control it.
  IO (Actor message)
spawnStatelessBatched :: forall message.
IO () -> (NonEmpty message -> IO ()) -> IO (Actor message)
spawnStatelessBatched IO ()
cleaner NonEmpty message -> IO ()
interpreter =
  -- TODO: Optimize by reimplementing directly.
  forall state message.
state
-> (state -> IO ())
-> (state -> NonEmpty message -> IO state)
-> IO (Actor message)
spawnStatefulBatched () (forall a b. a -> b -> a
const IO ()
cleaner) (forall a b. a -> b -> a
const NonEmpty message -> IO ()
interpreter)

-- | Spawn an actor which processes messages in isolated executions
-- and threads state.
spawnStatefulIndividual ::
  -- | Initial state.
  state ->
  -- | Clean up when killed or exception is thrown.
  (state -> IO ()) ->
  -- | Process a message and update state.
  (state -> message -> IO state) ->
  -- | Fork a thread to run the handler loop on and produce a handle to control it.
  IO (Actor message)
spawnStatefulIndividual :: forall state message.
state
-> (state -> IO ())
-> (state -> message -> IO state)
-> IO (Actor message)
spawnStatefulIndividual state
zero state -> IO ()
finalizer state -> message -> IO state
step =
  forall state message.
state
-> (state -> IO ())
-> (state -> NonEmpty message -> IO state)
-> IO (Actor message)
spawnStatefulBatched state
zero state -> IO ()
finalizer forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM state -> message -> IO state
step

-- | Spawn an actor which processes all available messages in one execution
-- and threads state.
spawnStatefulBatched ::
  -- | Initial state.
  state ->
  -- | Clean up when killed or exception is thrown.
  (state -> IO ()) ->
  -- | Process a batch of messages and update state.
  (state -> NonEmpty message -> IO state) ->
  -- | Fork a thread to run the handler loop on and produce a handle to control it.
  IO (Actor message)
spawnStatefulBatched :: forall state message.
state
-> (state -> IO ())
-> (state -> NonEmpty message -> IO state)
-> IO (Actor message)
spawnStatefulBatched state
zero state -> IO ()
finalizer state -> NonEmpty message -> IO state
step =
  do
    Runner message
runner <- forall a. IO (Runner a)
Runner.start
    ((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask ->
      let loop :: state -> IO ()
loop !state
state =
            do
              Maybe (NonEmpty message)
messages <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. Runner a -> STM (Maybe (NonEmpty a))
Runner.receiveMultiple Runner message
runner
              case Maybe (NonEmpty message)
messages of
                Just NonEmpty message
nonEmptyMessages ->
                  do
                    Either SomeException state
result <- forall e a. Exception e => IO a -> IO (Either e a)
try forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO a
unmask forall a b. (a -> b) -> a -> b
$ state -> NonEmpty message -> IO state
step state
state NonEmpty message
nonEmptyMessages
                    case Either SomeException state
result of
                      Right state
newState ->
                        state -> IO ()
loop state
newState
                      Left SomeException
exception ->
                        forall a b. IO a -> IO b -> IO a
finally (state -> IO ()
finalizer state
state)
                          forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically
                          forall a b. (a -> b) -> a -> b
$ forall a. Runner a -> SomeException -> STM ()
Runner.releaseWithException Runner message
runner SomeException
exception
                -- Empty batch means that the runner is finished.
                Maybe (NonEmpty message)
Nothing ->
                  forall a b. IO a -> IO b -> IO a
finally (state -> IO ()
finalizer state
state)
                    forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically
                    forall a b. (a -> b) -> a -> b
$ forall a. Runner a -> STM ()
Runner.releaseNormally Runner message
runner
       in state -> IO ()
loop state
zero
    return $ forall a. Runner a -> Actor a
fromRunner Runner message
runner

-- * Control

-- | Add a message to the end of the queue of the
-- messages to be processed by the provided actor.
tell :: Actor message -> message -> IO ()
tell :: forall message. Actor message -> message -> IO ()
tell Actor message
actor =
  forall a. STM a -> IO a
atomically forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. Actor message
actor.tell

-- | Command the actor to stop registering new messages,
-- process all the registered ones and execute the clean up action.
--
-- This action executes immediately.
-- If you want to block waiting for the actor to actually die,
-- after 'kill' you can run 'wait'.
kill :: Actor message -> IO ()
kill :: forall message. Actor message -> IO ()
kill Actor message
actor =
  forall a. STM a -> IO a
atomically Actor message
actor.kill

-- | Block waiting for the actor to die either due to getting killed
-- or due to its interpreter action throwing an exception.
-- The exception will get rethrown here.
wait :: Actor message -> IO ()
wait :: forall message. Actor message -> IO ()
wait Actor message
actor =
  forall a. STM a -> IO a
atomically Actor message
actor.wait forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) forall e a. Exception e => e -> IO a
throwIO