module Control.Concurrent.Actor (
Address
, Handler(..)
, ActorM
, Actor
, ActorException
, ActorExit
, Flag(..)
, send
, self
, receive
, receiveWithTimeout
, spawn
, runActor
, monitor
, link
, kill
, status
, setFlag
, clearFlag
, toggleFlag
, testFlag
) where
import Control.Concurrent
( forkIO
, killThread
, myThreadId
, ThreadId
)
import Control.Exception
( Exception(..)
, SomeException
, catches
, throwTo
, throwIO
, PatternMatchFail(..)
)
import qualified Control.Exception as E (Handler(..))
import Control.Concurrent.Chan
( Chan
, newChan
, readChan
, writeChan
)
import Control.Concurrent.MVar
( MVar
, newMVar
, modifyMVar_
, withMVar
, readMVar
)
import GHC.Conc (ThreadStatus, threadStatus)
import Control.Monad.Reader
( ReaderT
, runReaderT
, asks
, ask
, liftIO
)
import System.Timeout (timeout)
import Data.Dynamic
import Data.Set
( Set
, empty
, insert
, delete
, elems
)
import Data.Word (Word64)
import Data.Bits (
setBit
, clearBit
, complementBit
, testBit
)
data ActorExit = ActorExit deriving (Typeable, Show)
instance Exception ActorExit
data ActorException = ActorException Address SomeException
deriving (Typeable, Show)
instance Exception ActorException
type Flags = Word64
data Flag = TrapActorExceptions
deriving (Eq, Enum)
defaultFlags :: [Flag]
defaultFlags = []
setF :: Flag -> Flags -> Flags
setF = flip setBit . fromEnum
clearF :: Flag -> Flags -> Flags
clearF = flip clearBit . fromEnum
toggleF :: Flag -> Flags -> Flags
toggleF = flip complementBit . fromEnum
isSetF :: Flag -> Flags -> Bool
isSetF = flip testBit . fromEnum
data Context = Context
{ ctxMonitors :: MVar (Set Address)
, ctxChan :: Chan Message
, ctxFlags :: MVar Flags
} deriving (Typeable)
newtype Message = Msg { unMsg :: Dynamic }
deriving (Typeable)
instance Show Message where
show = show . unMsg
toMsg :: Typeable a => a -> Message
toMsg = Msg . toDyn
fromMsg :: Typeable a => Message -> Maybe a
fromMsg = fromDynamic . unMsg
data Address = Address
{ addrThread :: ThreadId
, addrContext :: Context
} deriving (Typeable)
instance Show Address where
show (Address ti _) = "Address(" ++ (show ti) ++ ")"
instance Eq Address where
addr1 == addr2 = (addrThread addr1) == (addrThread addr2)
instance Ord Address where
addr1 `compare` addr2 = (addrThread addr1) `compare` (addrThread addr2)
type ActorM = ReaderT Context IO
type Actor = ActorM ()
data Handler = forall m . (Typeable m)
=> Case (m -> ActorM ())
| Default (ActorM ())
self :: ActorM Address
self = do
c <- ask
i <- liftIO myThreadId
return $ Address i c
receive :: [Handler] -> ActorM ()
receive hs = do
ch <- asks ctxChan
msg <- liftIO . readChan $ ch
rec msg hs
receiveWithTimeout :: Int -> [Handler] -> ActorM () -> ActorM ()
receiveWithTimeout n hs act = do
ch <- asks ctxChan
msg <- liftIO . timeout n . readChan $ ch
case msg of
Just m -> rec m hs
Nothing -> act
rec :: Message -> [Handler] -> ActorM ()
rec msg [] = liftIO . throwIO $ PatternMatchFail err where
err = "no handler for messages of type " ++ (show msg)
rec msg ((Case hdl):hs) = case fromMsg msg of
Just m -> hdl m
Nothing -> rec msg hs
rec _ ((Default act):_) = act
send :: Typeable m => Address -> m -> ActorM ()
send addr msg = do
let ch = ctxChan . addrContext $ addr
liftIO . writeChan ch . toMsg $ msg
mkActor :: Actor -> [Flag] -> IO (IO (), Context)
mkActor actor flagList = do
chan <- liftIO newChan
linkedActors <- newMVar empty
flags <- newMVar $ foldl (flip setF) 0x00 flagList
let context = Context linkedActors chan flags
let actorFunc = actorInternal `catches` [ E.Handler linkedHandler
, E.Handler exceptionHandler]
actorInternal = runReaderT actor context
linkedHandler :: ActorException -> IO ()
linkedHandler ex@(ActorException addr iex) = do
modifyMVar_ linkedActors (return . delete addr)
me <- myThreadId
forward $ ActorException (Address me context) iex
throwIO ex
exceptionHandler :: SomeException -> IO ()
exceptionHandler ex = do
me <- myThreadId
forward $ ActorException (Address me context) ex
throwIO ex
forward :: ActorException -> IO ()
forward ex = do
linkedSet <- readMVar linkedActors
mapM_ (forwardTo ex) $ elems linkedSet
forwardTo :: ActorException -> Address -> IO ()
forwardTo ex addr = do
let remoteFlags = ctxFlags . addrContext $ addr
remoteChan = ctxChan . addrContext $ addr
trap <- withMVar remoteFlags (return . isSetF TrapActorExceptions)
if trap
then writeChan remoteChan $ toMsg ex
else throwTo (addrThread addr) ex
return (actorFunc, context)
spawn :: Actor -> IO Address
spawn actor = do
(actorFunc, context) <- mkActor actor defaultFlags
threadId <- forkIO actorFunc
return $ Address threadId context
runActor :: Actor -> IO ()
runActor actor = do
(actorFunc, _) <- mkActor actor defaultFlags
actorFunc
monitor :: Address -> ActorM ()
monitor addr = do
me <- self
let mons = ctxMonitors . addrContext $ addr
liftIO $ modifyMVar_ mons (return . insert me)
link :: Address -> ActorM ()
link addr = do
monitor addr
mons <- asks ctxMonitors
liftIO $ modifyMVar_ mons (return . insert addr)
kill :: Address -> ActorM ()
kill = liftIO . killThread . addrThread
status :: Address -> ActorM ThreadStatus
status = liftIO . threadStatus . addrThread
setFlag :: Flag -> ActorM ()
setFlag flag = do
fs <- asks ctxFlags
liftIO $ modifyMVar_ fs (return . setF flag)
clearFlag :: Flag -> ActorM ()
clearFlag flag = do
fs <- asks ctxFlags
liftIO $ modifyMVar_ fs (return . clearF flag)
toggleFlag :: Flag -> ActorM ()
toggleFlag flag = do
fs <- asks ctxFlags
liftIO $ modifyMVar_ fs (return . toggleF flag)
testFlag :: Flag -> ActorM Bool
testFlag flag = do
fs <- asks ctxFlags
liftIO $ withMVar fs (return . isSetF flag)