module TheatreDev.Terminal.Actor
( Actor,
adaptToList,
spawnStatelessGranular,
spawnStatefulGranular,
spawnStatefulBatched,
tell,
kill,
wait,
)
where
import Control.Concurrent.Chan.Unagi qualified as E
import Control.Concurrent.STM.TBQueue
import Control.Concurrent.STM.TMVar
import TheatreDev.ExtrasFor.List qualified as List
import TheatreDev.ExtrasFor.TBQueue
import TheatreDev.Prelude
data Actor message = Actor
{
forall message. Actor message -> message -> IO ()
tell :: message -> IO (),
forall message. Actor message -> IO ()
kill :: IO (),
forall message. Actor message -> IO ()
wait :: IO ()
}
instance Semigroup (Actor message) where
<> :: Actor message -> Actor message -> Actor message
(<>) (Actor message -> IO ()
lTell IO ()
lKill IO ()
lWait) (Actor message -> IO ()
rTell IO ()
rKill IO ()
rWait) =
forall message.
(message -> IO ()) -> IO () -> IO () -> Actor message
Actor message -> IO ()
tell IO ()
kill IO ()
wait
where
tell :: message -> IO ()
tell message
msg = message -> IO ()
lTell message
msg forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> message -> IO ()
rTell message
msg
kill :: IO ()
kill = IO ()
lKill forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
rKill
wait :: IO ()
wait = IO ()
lWait forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
rWait
instance Monoid (Actor message) where
mempty :: Actor message
mempty =
forall message.
(message -> IO ()) -> IO () -> IO () -> Actor message
Actor (forall a b. a -> b -> a
const (forall (m :: * -> *) a. Monad m => a -> m a
return ())) (forall (m :: * -> *) a. Monad m => a -> m a
return ()) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
instance Contravariant Actor where
contramap :: forall a' a. (a' -> a) -> Actor a -> Actor a'
contramap a' -> a
fn (Actor a -> IO ()
tell IO ()
kill IO ()
wait) =
forall message.
(message -> IO ()) -> IO () -> IO () -> Actor message
Actor (a -> IO ()
tell forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a' -> a
fn) IO ()
kill IO ()
wait
instance Divisible Actor where
conquer :: forall message. Actor message
conquer =
forall a. Monoid a => a
mempty
divide :: forall a b c. (a -> (b, c)) -> Actor b -> Actor c -> Actor a
divide a -> (b, c)
divisor (Actor b -> IO ()
lTell IO ()
lKill IO ()
lWait) (Actor c -> IO ()
rTell IO ()
rKill IO ()
rWait) =
forall message.
(message -> IO ()) -> IO () -> IO () -> Actor message
Actor a -> IO ()
tell IO ()
kill IO ()
wait
where
tell :: a -> IO ()
tell a
msg = case a -> (b, c)
divisor a
msg of (b
lMsg, c
rMsg) -> b -> IO ()
lTell b
lMsg forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> c -> IO ()
rTell c
rMsg
kill :: IO ()
kill = IO ()
lKill forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
rKill
wait :: IO ()
wait = IO ()
lWait forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
rWait
instance Decidable Actor where
lose :: forall a. (a -> Void) -> Actor a
lose a -> Void
fn =
forall message.
(message -> IO ()) -> IO () -> IO () -> Actor message
Actor (forall a b. a -> b -> a
const (forall (m :: * -> *) a. Monad m => a -> m a
return ()) forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. Void -> a
absurd forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> Void
fn) (forall (m :: * -> *) a. Monad m => a -> m a
return ()) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
choose :: forall a b c. (a -> Either b c) -> Actor b -> Actor c -> Actor a
choose a -> Either b c
choice (Actor b -> IO ()
lTell IO ()
lKill IO ()
lWait) (Actor c -> IO ()
rTell IO ()
rKill IO ()
rWait) =
forall message.
(message -> IO ()) -> IO () -> IO () -> Actor message
Actor a -> IO ()
tell IO ()
kill IO ()
wait
where
tell :: a -> IO ()
tell = forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either b -> IO ()
lTell c -> IO ()
rTell forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> Either b c
choice
kill :: IO ()
kill = IO ()
lKill forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
rKill
wait :: IO ()
wait = IO ()
lWait forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
rWait
adaptToList :: Actor message -> Actor [message]
adaptToList :: forall message. Actor message -> Actor [message]
adaptToList Actor {IO ()
message -> IO ()
wait :: IO ()
kill :: IO ()
tell :: message -> IO ()
wait :: forall message. Actor message -> IO ()
kill :: forall message. Actor message -> IO ()
tell :: forall message. Actor message -> message -> IO ()
..} =
case forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ message -> IO ()
tell of
[message] -> IO ()
tell -> Actor {IO ()
[message] -> IO ()
tell :: [message] -> IO ()
wait :: IO ()
kill :: IO ()
wait :: IO ()
kill :: IO ()
tell :: [message] -> IO ()
..}
spawnStatelessGranular ::
(message -> IO ()) ->
IO () ->
IO (Actor message)
spawnStatelessGranular :: forall message. (message -> IO ()) -> IO () -> IO (Actor message)
spawnStatelessGranular message -> IO ()
interpretMessage IO ()
cleanUp =
do
(InChan (Maybe message)
inChan, OutChan (Maybe message)
outChan) <- forall a. IO (InChan a, OutChan a)
E.newChan
MVar ()
lock <- forall a. IO (MVar a)
newEmptyMVar
ThreadId
spawningThreadId <- IO ThreadId
myThreadId
IO () -> IO ThreadId
forkIO
forall a b. (a -> b) -> a -> b
$ let loop :: IO ()
loop =
{-# SCC "spawnStatelessGranular/loop" #-}
do
Maybe message
message <- forall a. OutChan a -> IO a
E.readChan OutChan (Maybe message)
outChan
case Maybe message
message of
Just message
payload ->
do
Either SomeException ()
res <- forall e a. Exception e => IO a -> IO (Either e a)
try @SomeException forall a b. (a -> b) -> a -> b
$ message -> IO ()
interpretMessage message
payload
case Either SomeException ()
res of
Right () -> IO ()
loop
Left SomeException
exc ->
do
IO ()
cleanUp
forall a. MVar a -> a -> IO ()
putMVar MVar ()
lock ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
spawningThreadId SomeException
exc
Maybe message
Nothing ->
do
IO ()
cleanUp
forall a. MVar a -> a -> IO ()
putMVar MVar ()
lock ()
in IO ()
loop
return
( forall message.
(message -> IO ()) -> IO () -> IO () -> Actor message
Actor
(forall a. InChan a -> a -> IO ()
E.writeChan InChan (Maybe message)
inChan forall {k} (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. forall a. a -> Maybe a
Just)
(forall a. InChan a -> a -> IO ()
E.writeChan InChan (Maybe message)
inChan forall a. Maybe a
Nothing)
(forall a. MVar a -> IO a
takeMVar MVar ()
lock)
)
spawnStatefulGranular :: state -> (state -> message -> IO state) -> (state -> IO ()) -> IO (Actor message)
spawnStatefulGranular :: forall state message.
state
-> (state -> message -> IO state)
-> (state -> IO ())
-> IO (Actor message)
spawnStatefulGranular state
zero state -> message -> IO state
step state -> IO ()
finalizer =
forall state message.
state
-> (state -> NonEmpty message -> IO state)
-> (state -> IO ())
-> IO (Actor message)
spawnStatefulBatched state
zero state -> NonEmpty message -> IO state
newStep state -> IO ()
finalizer
where
newStep :: state -> NonEmpty message -> IO state
newStep =
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM state -> message -> IO state
step
spawnStatefulBatched :: state -> (state -> NonEmpty message -> IO state) -> (state -> IO ()) -> IO (Actor message)
spawnStatefulBatched :: forall state message.
state
-> (state -> NonEmpty message -> IO state)
-> (state -> IO ())
-> IO (Actor message)
spawnStatefulBatched state
zero state -> NonEmpty message -> IO state
step state -> IO ()
finalizer =
do
TBQueue (Maybe message)
queue <- forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
1000
TVar Bool
aliveVar <- forall a. a -> IO (TVar a)
newTVarIO Bool
True
TMVar (Maybe SomeException)
resVar <- forall a. IO (TMVar a)
newEmptyTMVarIO @(Maybe SomeException)
((forall a. IO a -> IO a) -> IO ()) -> IO ThreadId
forkIOWithUnmask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
unmask ->
let loop :: state -> IO ()
loop !state
state =
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
NonEmpty (Maybe message)
flushing <- forall a. TBQueue a -> STM (NonEmpty a)
flushNonEmptyTBQueue TBQueue (Maybe message)
queue
let ([message]
messages, [Maybe message]
flushingTail) = forall a. [Maybe a] -> ([a], [Maybe a])
List.splitWhileJust (forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty (Maybe message)
flushing)
case [message]
messages of
[] -> do
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
aliveVar Bool
False
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe SomeException)
resVar forall a. Maybe a
Nothing
return $ do
state -> IO ()
finalizer state
state
message
messagesHead : [message]
messagesTail ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ do
Either SomeException state
result <- forall e a. Exception e => IO a -> IO (Either e a)
try @SomeException forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO a
unmask forall a b. (a -> b) -> a -> b
$ state -> NonEmpty message -> IO state
step state
state (message
messagesHead forall a. a -> [a] -> NonEmpty a
:| [message]
messagesTail)
case Either SomeException state
result of
Right state
newState ->
case [Maybe message]
flushingTail of
[] -> state -> IO ()
loop state
newState
[Maybe message]
_ -> do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
aliveVar Bool
False
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe SomeException)
resVar forall a. Maybe a
Nothing
state -> IO ()
finalizer state
state
Left SomeException
exception -> do
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
aliveVar Bool
False
forall a. TMVar a -> a -> STM ()
putTMVar TMVar (Maybe SomeException)
resVar forall a. Maybe a
Nothing
state -> IO ()
finalizer state
state
in state -> IO ()
loop state
zero
return
Actor
{ tell :: message -> IO ()
tell = \message
message -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
Bool
alive <- forall a. TVar a -> STM a
readTVar TVar Bool
aliveVar
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
alive
forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe message)
queue
forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just message
message,
kill :: IO ()
kill = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
Bool
alive <- forall a. TVar a -> STM a
readTVar TVar Bool
aliveVar
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
alive
forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue (Maybe message)
queue forall a. Maybe a
Nothing,
wait :: IO ()
wait = do
Maybe SomeException
res <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> STM a
takeTMVar TMVar (Maybe SomeException)
resVar
case Maybe SomeException
res of
Maybe SomeException
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just SomeException
exception -> forall e a. Exception e => e -> IO a
throwIO SomeException
exception
}