{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE GADTs #-}
module Control.Concurrent.Actor
( ActionT
, Actor(..)
, actFinally
, act
, receiveSTM
, receive
, hoistActionT
) where
import Control.Concurrent
import Control.Monad.IO.Class
import Control.Monad.Trans
import Control.Monad.Reader
import Control.Monad.State.Class
import Control.Monad.Reader.Class
import Control.Monad.Writer.Class
import Control.Monad.RWS.Class
import Control.Monad.Error.Class
import Control.Monad.Cont.Class
import Control.Concurrent.STM
import Control.Exception
import Data.Functor.Contravariant
import Data.Queue
newtype ActionT message m a = ActionT
{ runActionT
:: ActorContext message
-> m a
}
deriving via ReaderT (ActorContext message) m instance Functor m => Functor (ActionT message m)
deriving via ReaderT (ActorContext message) m instance Applicative m => Applicative (ActionT message m)
deriving via ReaderT (ActorContext message) m instance Monad m => Monad (ActionT message m)
deriving via ReaderT (ActorContext message) m instance MonadIO m => MonadIO (ActionT message m)
deriving via ReaderT (ActorContext message) instance MonadTrans (ActionT message)
deriving via ReaderT (ActorContext message) m instance MonadError e m => MonadError e (ActionT message m)
deriving via ReaderT (ActorContext message) m instance MonadWriter w m => MonadWriter w (ActionT message m)
deriving via ReaderT (ActorContext message) m instance MonadState s m => MonadState s (ActionT message m)
deriving via ReaderT (ActorContext message) m instance MonadCont m => MonadCont (ActionT message m)
instance MonadReader r m => MonadReader r (ActionT message m) where
ask = ActionT (const ask)
local f (ActionT ma) = ActionT (fmap (local f) ma)
instance (MonadWriter w m, MonadReader r m, MonadState s m) => MonadRWS r w s (ActionT message m)
data ActorContext message = forall a. ActorContext
{ onError :: TVar (Either SomeException a -> IO ())
, messageQueue :: Queue message
}
data Actor message = Actor
{ addAfterEffect :: (Maybe SomeException -> IO ()) -> STM ()
, threadId :: ThreadId
, send :: message -> STM ()
}
instance Contravariant Actor where
contramap f (Actor addAfterEffect threadId send) = Actor addAfterEffect threadId (send . f)
actFinally :: (Either SomeException a -> IO ()) -> ActionT message IO a -> IO (Actor message)
actFinally errorHandler (ActionT act) = do
onError <- atomically $ newTVar errorHandler
messageQueue <- atomically newQueue
let ctx = ActorContext onError messageQueue
threadId <- forkFinally (act ctx) (\result -> atomically (readTVar onError) >>= ($ result))
pure $ Actor
(\afterEffect -> modifyTVar onError (\f x -> f x <* afterEffect (leftToMaybe x)))
threadId
(enqueue messageQueue)
where
leftToMaybe (Left x) = Just x
leftToMaybe _ = Nothing
act :: ActionT message IO a -> IO (Actor message)
act = actFinally (const (pure ()))
receive :: MonadIO m => (message -> ActionT message m a) -> ActionT message m a
receive f = ActionT \ctx -> do
message <- liftIO $ atomically $ dequeue (messageQueue ctx)
runActionT (f message) ctx
receiveSTM :: MonadIO m => (message -> STM a) -> ActionT message m a
receiveSTM f = ActionT \ctx -> liftIO (atomically (dequeue (messageQueue ctx) >>= f))
hoistActionT :: (forall a. m a -> n a) -> ActionT message m a -> ActionT message n a
hoistActionT f (ActionT act) = ActionT (fmap f act)