module Concurrency.OTP.GenServer (
GenServerState(..),
GenServer,
RequestId,
StartStatus(..),
ServerIsDead(..),
CallResult,
CastResult,
start,
call,
cast,
replyWith,
reply,
noreply,
stop,
replyAndStop,
callWithTimeout
) where
import Control.Applicative
import Control.Monad.Catch (try, SomeException)
import Control.Monad.State
import Control.Monad.Reader
import Control.Concurrent.MVar (
newEmptyMVar,
putMVar,
takeMVar
)
import Control.Concurrent.STM
import Data.IORef (
IORef,
newIORef,
readIORef,
writeIORef,
atomicModifyIORef'
)
import Control.Exception (Exception, throw)
import Data.Typeable (Typeable)
import qualified Data.Map as Map
import Data.Unique (Unique, newUnique)
import Data.Maybe (fromJust, isJust)
import Concurrency.OTP.Process
type RequestStore res = IORef (Map.Map RequestId (res -> IO ()))
type RequestId = Unique
newtype GenServerM s req res a = GenServerM {
unGenServer :: ReaderT (RequestStore res) (StateT s (Process (Request req res))) a
} deriving (Applicative, Functor, Monad, MonadIO)
instance MonadState s (GenServerM s req res) where
get = GenServerM $ lift get
put = GenServerM . lift . put
class GenServerState req res s | s -> req, s -> res where
handle_call :: req -> Unique -> GenServerM s req res (CallResult res)
handle_cast :: req -> GenServerM s req res CastResult
onTerminate :: s -> IO ()
onTerminate = const $ return ()
data GenServer req res = GenServer {
gsPid :: Pid (Request req res)
}
instance IsProcess (Request req res) (GenServer req res) where
getPid (GenServer pid) = pid
data Request req res = Call req (res -> IO ())
| Cast req
class HandlerResult a where
noreply :: a
stop :: String -> a
data CallResult res = Reply res
| NoReply
| ReplyAndStop res String
| Stop String
instance HandlerResult (CallResult res) where
noreply = NoReply
stop = Stop
reply :: res -> CallResult res
reply = Reply
replyAndStop :: res -> String -> CallResult res
replyAndStop = ReplyAndStop
data CastResult = CastNoReply
| CastStop String
instance HandlerResult CastResult where
noreply = CastNoReply
stop = CastStop
data StartStatus req res = Ok (GenServer req res) | Fail
start :: (GenServerState req res s) => Process (Request req res) s -> IO (StartStatus req res)
start initFn = do
result <- newEmptyMVar
pid <- spawn $
try (initFn >>= liftIO . newIORef) >>= \case
Right stateRef -> do
liftIO (putMVar result (Right stateRef))
handler stateRef
Left e -> liftIO $ putMVar result (Left e)
takeMVar result >>= \case
Right stateRef -> do
linkIO pid $ const $ do
st <- readIORef stateRef
onTerminate st
return $ Ok $ GenServer pid
Left e -> fail1 e
where
fail1 :: SomeException -> IO (StartStatus req res)
fail1 _ = return Fail
handler :: (GenServerState req res s) => IORef s -> Process (Request req res) ()
handler stateRef = do
requests <- liftIO $ newIORef Map.empty
forever $ do
req <- receive
serverState <- liftIO $ readIORef stateRef
case req of
Call r res -> do
requestId <- liftIO newUnique
let callHandler = unGenServer $ handle_call r requestId
let stateTAction = flip runReaderT requests callHandler
(result, newState) <- runStateT stateTAction serverState
liftIO $ writeIORef stateRef newState
case result of
Reply response ->
liftIO $ res response
NoReply ->
liftIO $ atomicModifyIORef' requests $ \rs ->
(Map.insert requestId res rs, ())
ReplyAndStop response _reason ->
liftIO (res response) >> exit
Stop _reason ->
exit
Cast r -> do
let castHandler = flip runReaderT requests $ unGenServer $ handle_cast r
(result, newState) <- runStateT castHandler serverState
liftIO $ writeIORef stateRef newState
case result of
CastNoReply ->
return ()
CastStop _reason ->
exit
data ServerIsDead = ServerIsDead deriving (Show, Typeable)
instance Exception ServerIsDead
call :: GenServer req res -> req -> IO res
call srv msg =
fromJust <$> callWithTimeout srv Nothing msg
callWithTimeout :: GenServer req res -> Maybe Int -> req -> IO (Maybe res)
callWithTimeout GenServer { gsPid = pid } tm msg = do
response <- newEmptyTMVarIO
isTimeOut <- maybe (newTVarIO False) (registerDelay.(*1000)) tm
result <- withLinkIO_ pid
(const $ void $ atomically $ tryPutTMVar response (Left (Left ())))
(do
sendIO pid $ Call msg (atomically . void . (tryPutTMVar response) . Right)
atomically $ (readTVar isTimeOut >>= flip unless retry >> return (Left (Right ())))
`orElse` (takeTMVar response)
)
case result of
Right x -> return (Just x)
Left (Left _) -> throw ServerIsDead
Left (Right _) -> return Nothing
cast :: GenServer req res -> req -> IO ()
cast GenServer { gsPid = pid } msg =
sendIO pid $ Cast msg
replyWith :: RequestId -> res -> GenServerM s req res ()
replyWith reqId response = do
requestsRef <- GenServerM ask
res <- liftIO $ atomicModifyIORef' requestsRef $ \rs ->
(Map.delete reqId rs, Map.lookup reqId rs)
when (isJust res) $ liftIO $ fromJust res response