-- |
-- Module      :  Control.Concurrent.Actor
-- Copyright   :  (c) 2011 Alex Constandache
-- License     :  BSD3
-- Maintainer  :  alexander.the.average@gmail.com
-- Stability   :  alpha
-- Portability :  GHC only (requires throwTo)
--
-- This module implements Erlang-style actors 
-- (what Erlang calls processes). It does not implement 
-- network distribution (yet?). Here is an example:
--
-- @
--act1 :: Actor
--act1 = do
--    me <- self
--    liftIO $ print "act1 started"
--    forever $ receive
--      [ Case $ \((n, a) :: (Int, Address)) ->
--            if n > 10000
--                then do
--                    liftIO . throwIO $ NonTermination 
--                else do
--                    liftIO . putStrLn $ "act1 got " ++ (show n) ++ " from " ++ (show a)
--                    send a (n+1, me)
--      , Case $ \(e :: RemoteException) -> 
--            liftIO . print $ "act1 received a remote exception"
--      , Default $ liftIO . print $ "act1: received a malformed message"
--      ]
--    
--act2 :: Address -> Actor
--act2 addr = do
--    monitor addr
--    -- setFlag TrapRemoteExceptions
--    me <- self
--    send addr (0 :: Int, me)
--    forever $ receive 
--      [ Case $ \((n, a) :: (Int, Address)) -> do
--                    liftIO . putStrLn $ "act2 got " ++ (show n) ++ " from " ++ (show a)
--                    send a (n+1, me)
--      , Case $ \(e :: RemoteException) -> 
--            liftIO . print $ "act2 received a remote exception: " ++ (show e)
--      ]
--
--act3 :: Address -> Actor
--act3 addr = do
--    monitor addr
--    setFlag TrapRemoteExceptions
--    forever $ receive
--      [ Case $ \(e :: RemoteException) -> 
--            liftIO . print $ "act3 received a remote exception: " ++ (show e)
--      ]
--
--main = do
--    addr1 <- spawn act1
--    addr2 <- spawn (act2 addr1)
--    spawn (act3 addr2)
--    threadDelay 20000000
-- @
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ExistentialQuantification #-}
module Control.Concurrent.Actor (
  -- * Types
    Address
  , Handler(..)
  , ActorM
  , Actor
  , RemoteException
  , ActorExitNormal
  , Flag(..)
  -- * Actor actions
  , send
  , ()
  , ()
  , self
  , receive
  , receiveWithTimeout
  , spawn
  , monitor
  , link
  , setFlag
  , clearFlag
  , toggleFlag
  , testFlag
  ) where

import Control.Concurrent 
  ( forkIO
  , myThreadId
  , ThreadId
  )
import Control.Exception 
  ( Exception(..)
  , SomeException
  , catches
  , throwTo
  , throwIO
  , PatternMatchFail(..)
  )
import qualified Control.Exception as E (Handler(..))
import Control.Concurrent.Chan 
  ( Chan
  , newChan
  , readChan
  , writeChan
  )
import Control.Concurrent.MVar 
  ( MVar
  , newMVar
  , modifyMVar_
  , withMVar
  )
import Control.Monad.Reader 
  ( ReaderT
  , runReaderT
  , asks
  , ask
  , liftIO
  )
import System.Timeout (timeout)
import Data.Dynamic
import Data.Set 
  ( Set
  , empty
  , insert
  , delete
  , elems
  )
import Data.Word (Word64)
import Data.Bits (
    setBit
  , clearBit
  , complementBit
  , testBit
  )

-- | Exception raised by an actor on exit
data ActorExitNormal = ActorExitNormal deriving (Typeable, Show)

instance Exception ActorExitNormal

data RemoteException = RemoteException Address SomeException
  deriving (Typeable, Show)

instance Exception RemoteException

type Flags = Word64

data Flag = TrapRemoteExceptions
    deriving (Eq, Enum)

defaultFlags :: [Flag]
defaultFlags = []

setF :: Flag -> Flags -> Flags
setF = flip setBit . fromEnum

clearF :: Flag -> Flags -> Flags
clearF = flip clearBit . fromEnum

toggleF :: Flag -> Flags -> Flags
toggleF = flip complementBit . fromEnum

isSetF :: Flag -> Flags -> Bool
isSetF = flip testBit . fromEnum

data Context = Ctxt 
  { lSet  :: MVar (Set Address)
  , chan  :: Chan Message
  , flags :: MVar Flags
  } deriving (Typeable)

newtype Message = Msg { unMsg :: Dynamic }
    deriving (Typeable)

instance Show Message where
    show = show . unMsg

toMsg :: Typeable a => a -> Message
toMsg = Msg . toDyn

fromMsg :: Typeable a => Message -> Maybe a
fromMsg = fromDynamic . unMsg

-- | The address of an actor, used to send messages 
data Address = Addr 
  { thId  :: ThreadId
  , ctxt  :: Context
  } deriving (Typeable)

instance Show Address where
    show (Addr ti _) = "Address(" ++ (show ti) ++ ")"

instance Eq Address where
    addr1 == addr2 = (thId addr1) == (thId addr2)

instance Ord Address where
    addr1 `compare` addr2 = (thId addr1) `compare` (thId addr2)

-- | The actor monad, just a reader monad on top of 'IO'.
type ActorM = ReaderT Context IO 

-- | The type of an actor. It is just a monadic action
-- in the 'ActorM' monad, returning ()
type Actor = ActorM ()

data Handler = forall m . (Typeable m) 
             => Case (m -> ActorM ())
             |  Default (ActorM ())

-- | Used to obtain an actor's own address inside the actor
self :: ActorM Address
self = do
    c <- ask
    i <- liftIO myThreadId
    return $ Addr i c

-- | Try to handle a message using a list of handlers.
-- The first handler matching the type of the message 
-- is used.
receive :: [Handler] -> ActorM ()
receive hs = do
    ch  <- asks chan
    msg <- liftIO . readChan $ ch 
    rec msg hs

-- | Same as receive, but times out after a specified 
-- amount of time and runs a default action
receiveWithTimeout :: Int -> [Handler] -> ActorM () -> ActorM ()
receiveWithTimeout n hs act = do 
    ch <- asks chan 
    msg <- liftIO . timeout n . readChan $ ch
    case msg of
        Just m  -> rec m hs
        Nothing -> act

rec :: Message -> [Handler] -> ActorM ()
rec msg [] = liftIO . throwIO $ PatternMatchFail err where
    err = "no handler for messages of type " ++ (show msg)
rec msg ((Case hdl):hs) = case fromMsg msg of
    Just m  -> hdl m
    Nothing -> rec msg hs
rec msg ((Default act):_) = act


-- | Sends a message from inside the 'ActorM' monad
send :: Typeable m => Address -> m -> ActorM ()
send addr msg = do
    let ch = chan . ctxt $ addr
    liftIO . writeChan ch . toMsg $ msg

-- | Infix form of 'send'
() :: Typeable m => Address -> m -> ActorM ()
() = send
    
-- | Infix form of 'send' with the arguments flipped
() :: Typeable m => m -> Address -> ActorM ()
() = flip send

-- | Spawns a new actor, with the given flags set
spawn' :: [Flag] -> Actor -> IO Address
spawn' fs act = do
    ch <- liftIO newChan
    ls <- newMVar empty
    fl <- newMVar $ foldl (flip setF) 0x00 fs
    let cx = Ctxt ls ch fl
    let orig = runReaderT act cx >> throwIO ActorExitNormal
        wrap = orig `catches` [E.Handler remoteExH, E.Handler someExH]
        remoteExH :: RemoteException -> IO ()
        remoteExH e@(RemoteException a _) = do
            modifyMVar_ ls (return . delete a)
            me  <- myThreadId 
            let se = toException e
            forward (RemoteException (Addr me cx) se)
        someExH :: SomeException -> IO ()
        someExH e = do
            me  <- myThreadId
            forward (RemoteException (Addr me cx) e)
        forward :: RemoteException -> IO ()
        forward ex = do
            lset <- withMVar ls return
            mapM_ (fwdaux ex) $ elems lset
        fwdaux :: RemoteException -> Address -> IO ()
        fwdaux ex addr = do
            let rfs = flags . ctxt $ addr
                rch = chan  . ctxt $ addr
            trap <- withMVar rfs (return . isSetF TrapRemoteExceptions)
            if trap
                then
                    writeChan rch (toMsg ex)
                else
                    throwTo (thId addr) ex
    ti <- forkIO wrap
    return $ Addr ti cx

-- | Spawn a new actor with default flags
spawn :: Actor -> IO Address
spawn = spawn' defaultFlags 

-- | Monitors the actor at the specified address.
-- If an exception is raised in the monitored actor's 
-- thread, it is wrapped in an 'ActorException' and 
-- forwarded to the monitoring actor. If the monitored
-- actor terminates, an 'ActorException' is raised in
-- the monitoring Actor
monitor :: Address -> ActorM ()
monitor addr = do
    me <- self
    let ls = lSet . ctxt $ addr
    liftIO $ modifyMVar_ ls (return . insert me)

-- | Like `monitor`, but bi-directional
link :: Address -> ActorM ()
link addr = do
    monitor addr
    ls <- asks lSet
    liftIO $ modifyMVar_ ls (return . insert addr)

-- | Sets the specified flag in the actor's environment
setFlag :: Flag -> ActorM ()
setFlag flag = do
    fs <- asks flags
    liftIO $ modifyMVar_ fs (return . setF flag)

-- | Clears the specified flag in the actor's environment
clearFlag :: Flag -> ActorM ()
clearFlag flag = do
    fs <- asks flags
    liftIO $ modifyMVar_ fs (return . clearF flag)

-- | Toggles the specified flag in the actor's environment
toggleFlag :: Flag -> ActorM ()
toggleFlag flag = do
    fs <- asks flags
    liftIO $ modifyMVar_ fs (return . toggleF flag)

-- | Checks if the specified flag is set in the actor's environment
testFlag :: Flag -> ActorM Bool
testFlag flag = do 
    fs <- asks flags
    liftIO $ withMVar fs (return . isSetF flag)