module Control.Concurrent.Actor (
Address
, Handler(..)
, ActorM
, Actor
, RemoteException
, ActorExitNormal
, Flag(..)
, send
, (◁)
, (▷)
, self
, receive
, receiveWithTimeout
, spawn
, monitor
, link
, setFlag
, clearFlag
, toggleFlag
, testFlag
) where
import Control.Concurrent
( forkIO
, myThreadId
, ThreadId
)
import Control.Exception
( Exception(..)
, SomeException
, catches
, throwTo
, throwIO
, PatternMatchFail(..)
)
import qualified Control.Exception as E (Handler(..))
import Control.Concurrent.Chan
( Chan
, newChan
, readChan
, writeChan
)
import Control.Concurrent.MVar
( MVar
, newMVar
, modifyMVar_
, withMVar
)
import Control.Monad.Reader
( ReaderT
, runReaderT
, asks
, ask
, liftIO
)
import System.Timeout (timeout)
import Data.Dynamic
import Data.Set
( Set
, empty
, insert
, delete
, elems
)
import Data.Word (Word64)
import Data.Bits (
setBit
, clearBit
, complementBit
, testBit
)
data ActorExitNormal = ActorExitNormal deriving (Typeable, Show)
instance Exception ActorExitNormal
data RemoteException = RemoteException Address SomeException
deriving (Typeable, Show)
instance Exception RemoteException
type Flags = Word64
data Flag = TrapRemoteExceptions
deriving (Eq, Enum)
defaultFlags :: [Flag]
defaultFlags = []
setF :: Flag -> Flags -> Flags
setF = flip setBit . fromEnum
clearF :: Flag -> Flags -> Flags
clearF = flip clearBit . fromEnum
toggleF :: Flag -> Flags -> Flags
toggleF = flip complementBit . fromEnum
isSetF :: Flag -> Flags -> Bool
isSetF = flip testBit . fromEnum
data Context = Ctxt
{ lSet :: MVar (Set Address)
, chan :: Chan Message
, flags :: MVar Flags
} deriving (Typeable)
newtype Message = Msg { unMsg :: Dynamic }
deriving (Typeable)
instance Show Message where
show = show . unMsg
toMsg :: Typeable a => a -> Message
toMsg = Msg . toDyn
fromMsg :: Typeable a => Message -> Maybe a
fromMsg = fromDynamic . unMsg
data Address = Addr
{ thId :: ThreadId
, ctxt :: Context
} deriving (Typeable)
instance Show Address where
show (Addr ti _) = "Address(" ++ (show ti) ++ ")"
instance Eq Address where
addr1 == addr2 = (thId addr1) == (thId addr2)
instance Ord Address where
addr1 `compare` addr2 = (thId addr1) `compare` (thId addr2)
type ActorM = ReaderT Context IO
type Actor = ActorM ()
data Handler = forall m . (Typeable m)
=> Case (m -> ActorM ())
| Default (ActorM ())
self :: ActorM Address
self = do
c <- ask
i <- liftIO myThreadId
return $ Addr i c
receive :: [Handler] -> ActorM ()
receive hs = do
ch <- asks chan
msg <- liftIO . readChan $ ch
rec msg hs
receiveWithTimeout :: Int -> [Handler] -> ActorM () -> ActorM ()
receiveWithTimeout n hs act = do
ch <- asks chan
msg <- liftIO . timeout n . readChan $ ch
case msg of
Just m -> rec m hs
Nothing -> act
rec :: Message -> [Handler] -> ActorM ()
rec msg [] = liftIO . throwIO $ PatternMatchFail err where
err = "no handler for messages of type " ++ (show msg)
rec msg ((Case hdl):hs) = case fromMsg msg of
Just m -> hdl m
Nothing -> rec msg hs
rec msg ((Default act):_) = act
send :: Typeable m => Address -> m -> ActorM ()
send addr msg = do
let ch = chan . ctxt $ addr
liftIO . writeChan ch . toMsg $ msg
(◁) :: Typeable m => Address -> m -> ActorM ()
(◁) = send
(▷) :: Typeable m => m -> Address -> ActorM ()
(▷) = flip send
spawn' :: [Flag] -> Actor -> IO Address
spawn' fs act = do
ch <- liftIO newChan
ls <- newMVar empty
fl <- newMVar $ foldl (flip setF) 0x00 fs
let cx = Ctxt ls ch fl
let orig = runReaderT act cx >> throwIO ActorExitNormal
wrap = orig `catches` [E.Handler remoteExH, E.Handler someExH]
remoteExH :: RemoteException -> IO ()
remoteExH e@(RemoteException a _) = do
modifyMVar_ ls (return . delete a)
me <- myThreadId
let se = toException e
forward (RemoteException (Addr me cx) se)
someExH :: SomeException -> IO ()
someExH e = do
me <- myThreadId
forward (RemoteException (Addr me cx) e)
forward :: RemoteException -> IO ()
forward ex = do
lset <- withMVar ls return
mapM_ (fwdaux ex) $ elems lset
fwdaux :: RemoteException -> Address -> IO ()
fwdaux ex addr = do
let rfs = flags . ctxt $ addr
rch = chan . ctxt $ addr
trap <- withMVar rfs (return . isSetF TrapRemoteExceptions)
if trap
then
writeChan rch (toMsg ex)
else
throwTo (thId addr) ex
ti <- forkIO wrap
return $ Addr ti cx
spawn :: Actor -> IO Address
spawn = spawn' defaultFlags
monitor :: Address -> ActorM ()
monitor addr = do
me <- self
let ls = lSet . ctxt $ addr
liftIO $ modifyMVar_ ls (return . insert me)
link :: Address -> ActorM ()
link addr = do
monitor addr
ls <- asks lSet
liftIO $ modifyMVar_ ls (return . insert addr)
setFlag :: Flag -> ActorM ()
setFlag flag = do
fs <- asks flags
liftIO $ modifyMVar_ fs (return . setF flag)
clearFlag :: Flag -> ActorM ()
clearFlag flag = do
fs <- asks flags
liftIO $ modifyMVar_ fs (return . clearF flag)
toggleFlag :: Flag -> ActorM ()
toggleFlag flag = do
fs <- asks flags
liftIO $ modifyMVar_ fs (return . toggleF flag)
testFlag :: Flag -> ActorM Bool
testFlag flag = do
fs <- asks flags
liftIO $ withMVar fs (return . isSetF flag)