{-# LANGUAGE ExistentialQuantification  #-}
{-# LANGUAGE ScopedTypeVariables        #-}
{-# LANGUAGE ViewPatterns               #-}
{-# LANGUAGE PatternGuards              #-}

-- | This is the @Process@ implementation of a /managed process/
module Control.Distributed.Process.ManagedProcess.Internal.GenProcess
  (recvLoop, precvLoop) where

import Control.Applicative ((<$>))
import Control.Concurrent (threadDelay)
import Control.Concurrent.STM hiding (check)
import Control.Distributed.Process hiding (call, Message)
import qualified Control.Distributed.Process as P (Message)
import Control.Distributed.Process.ManagedProcess.Server
import Control.Distributed.Process.ManagedProcess.Internal.Types
import Control.Distributed.Process.Extras.Internal.Queue.PriorityQ
  ( PriorityQ
  , enqueue
  , dequeue
  )
import qualified Control.Distributed.Process.Extras.Internal.Queue.PriorityQ as PriorityQ
  ( empty
  )
import Control.Distributed.Process.Extras
  ( ExitReason(..)
  , Shutdown(..)
  )
import qualified Control.Distributed.Process.Extras.SystemLog as Log
import Control.Distributed.Process.Extras.Time
import Control.Distributed.Process.Extras.Timer
  ( cancelTimer
  , runAfter
  , TimerRef
  )
import Control.Monad (void)
import Prelude hiding (init)

--------------------------------------------------------------------------------
-- Priority Mailbox Handling                                                  --
--------------------------------------------------------------------------------

type Queue = PriorityQ Int P.Message
type TimeoutSpec = (Delay, Maybe (TimerRef, (STM ())))
data TimeoutAction s = Stop s ExitReason | Go Delay s

precvLoop :: PrioritisedProcessDefinition s -> s -> Delay -> Process ExitReason
precvLoop ppDef pState recvDelay = do
    void $ verify $ processDef ppDef
    tref <- startTimer recvDelay
    recvQueue ppDef pState tref $ PriorityQ.empty
  where
    verify pDef = mapM_ disallowCC $ apiHandlers pDef

    disallowCC (DispatchCC _ _) = die $ ExitOther "IllegalControlChannel"
    disallowCC _                = return ()

recvQueue :: PrioritisedProcessDefinition s
          -> s
          -> TimeoutSpec
          -> Queue
          -> Process ExitReason
recvQueue p s t q =
  let pDef = processDef p
      ps   = priorities p
  in do (ac, d, q') <- catchExit (processNext pDef ps s t q)
                                 (\_ (r :: ExitReason) ->
                                   return (ProcessStop r, Infinity, q))
        nextAction ac d q'
  where
    nextAction ac d q'
      | ProcessContinue  s'    <- ac = recvQueueAux p (priorities p) s' d  q'
      | ProcessTimeout   t' s' <- ac = recvQueueAux p (priorities p) s' t' q'
      | ProcessHibernate d' s' <- ac = block d' >> recvQueueAux p (priorities p) s' d q'
      | ProcessStop      r     <- ac = (shutdownHandler $ processDef p) s r >> return r
      | ProcessStopping  s' r  <- ac = (shutdownHandler $ processDef p) s' r >> return r
      | otherwise {- compiler foo -} = die "IllegalState"

    recvQueueAux ppDef prioritizers pState delay queue =
      let ex = (trapExit:(exitHandlers $ processDef ppDef))
          eh = map (\d' -> (dispatchExit d') pState) ex
      in (do t' <- startTimer delay
             mq <- drainMessageQueue pState prioritizers queue
             recvQueue ppDef pState t' mq)
         `catchExit`
         (\pid (reason :: ExitReason) -> do
             let pd = processDef ppDef
             let ps = pState
             let pq = queue
             let em = unsafeWrapMessage reason
             (a, d, q') <- findExitHandlerOrStop pd ps pq eh pid em
             nextAction a d q')

    findExitHandlerOrStop :: ProcessDefinition s
                          -> s
                          -> Queue
                          -> [ProcessId -> P.Message -> Process (Maybe (ProcessAction s))]
                          -> ProcessId
                          -> P.Message
                          -> Process (ProcessAction s, Delay, Queue)
    findExitHandlerOrStop _ _ pq [] _ er = do
      mEr <- unwrapMessage er :: Process (Maybe ExitReason)
      case mEr of
        Nothing -> die "InvalidExitHandler"  -- TODO: better error message?
        Just er' -> return (ProcessStop er', Infinity, pq)
    findExitHandlerOrStop pd ps pq (eh:ehs) pid er = do
      mAct <- eh pid er
      case mAct of
        Nothing -> findExitHandlerOrStop pd ps pq ehs pid er
        Just pa -> return (pa, Infinity, pq)

    processNext def ps' pState tSpec queue =
      let ex = (trapExit:(exitHandlers def))
          h  = timeoutHandler def in do
        -- as a side effect, this check will cancel the timer
        timedOut <- checkTimer pState tSpec h
        case timedOut of
          Stop s' r -> return $ (ProcessStopping s' r, (fst tSpec), queue)
          Go t' s'  -> do
            -- checkTimer could've run our timeoutHandler, which changes "s"
            case (dequeue queue) of
              Nothing -> do
                -- if the internal queue is empty, we fall back to reading the
                -- actual mailbox, however if /that/ times out, then we need
                -- to let the timeout handler kick in again and make a decision
                drainOrTimeout s' t' queue ps' h
              Just (m', q') -> do
                act <- catchesExit (processApply def s' m')
                                   (map (\d' -> (dispatchExit d') s') ex)
                return (act, t', q')

    processApply def pState msg =
      let pol          = unhandledMessagePolicy def
          apiMatchers  = map (dynHandleMessage pol pState) (apiHandlers def)
          infoMatchers = map (dynHandleMessage pol pState) (infoHandlers def)
          shutdown'    = dynHandleMessage pol pState shutdownHandler'
          ms'          = (shutdown':apiMatchers) ++ infoMatchers
      in processApplyAux ms' pol pState msg

    processApplyAux []     p' s' m' = applyPolicy p' s' m'
    processApplyAux (h:hs) p' s' m' = do
      attempt <- h m'
      case attempt of
        Nothing  -> processApplyAux hs p' s' m'
        Just act -> return act

    drainOrTimeout pState delay queue ps' h = do
      let matches = [ matchMessage return ]
          recv    = case delay of
                      Infinity -> receiveWait matches >>= return . Just
                      NoDelay  -> receiveTimeout 0 matches
                      Delay i  -> receiveTimeout (asTimeout i) matches in do
        r <- recv
        case r of
          Nothing -> h pState delay >>= \act -> return $ (act, delay, queue)
          Just m  -> do
            queue' <- enqueueMessage pState ps' m queue
            -- Returning @ProcessContinue@ simply causes the main loop to go
            -- into 'recvQueueAux', which ends up in 'drainMessageQueue'.
            -- In other words, we continue draining the /real/ mailbox.
            return $ (ProcessContinue pState, delay, queue')

drainMessageQueue :: s -> [DispatchPriority s] -> Queue -> Process Queue
drainMessageQueue pState priorities' queue = do
  m <- receiveTimeout 0 [ matchMessage return ]
  case m of
    Nothing -> return queue
    Just m' -> do
      queue' <- enqueueMessage pState priorities' m' queue
      drainMessageQueue pState priorities' queue'

enqueueMessage :: s
               -> [DispatchPriority s]
               -> P.Message
               -> Queue
               -> Process Queue
enqueueMessage _ []     m' q = return $ enqueue (-1 :: Int) m' q
enqueueMessage s (p:ps) m' q = let checkPrio = prioritise p s in do
  checkPrio m' >>= maybeEnqueue s m' q ps
  where
    maybeEnqueue :: s
                 -> P.Message
                 -> Queue
                 -> [DispatchPriority s]
                 -> Maybe (Int, P.Message)
                 -> Process Queue
    maybeEnqueue s' msg q' ps' Nothing       = enqueueMessage s' ps' msg q'
    maybeEnqueue _  _   q' _   (Just (i, m)) = return $ enqueue (i * (-1 :: Int)) m q'

--------------------------------------------------------------------------------
-- Ordinary/Blocking Mailbox Handling                                         --
--------------------------------------------------------------------------------

recvLoop :: ProcessDefinition s -> s -> Delay -> Process ExitReason
recvLoop pDef pState recvDelay =
  let p             = unhandledMessagePolicy pDef
      handleTimeout = timeoutHandler pDef
      handleStop    = shutdownHandler pDef
      shutdown'     = matchDispatch p pState shutdownHandler'
      matchers      = map (matchDispatch p pState) (apiHandlers pDef)
      ex'           = (trapExit:(exitHandlers pDef))
      ms' = (shutdown':matchers) ++ matchAux p pState (infoHandlers pDef)
  in do
    ac <- catchesExit (processReceive ms' handleTimeout pState recvDelay)
                      (map (\d' -> (dispatchExit d') pState) ex')
    case ac of
        (ProcessContinue s')     -> recvLoop pDef s' recvDelay
        (ProcessTimeout t' s')   -> recvLoop pDef s' t'
        (ProcessHibernate d' s') -> block d' >> recvLoop pDef s' recvDelay
        (ProcessStop r) -> handleStop pState r >> return (r :: ExitReason)
        (ProcessStopping s' r)   -> handleStop s' r >> return (r :: ExitReason)
  where
    matchAux :: UnhandledMessagePolicy
             -> s
             -> [DeferredDispatcher s]
             -> [Match (ProcessAction s)]
    matchAux p ps ds = [matchAny (auxHandler (applyPolicy p ps) ps ds)]

    auxHandler :: (P.Message -> Process (ProcessAction s))
               -> s
               -> [DeferredDispatcher s]
               -> P.Message
               -> Process (ProcessAction s)
    auxHandler policy _  [] msg = policy msg
    auxHandler policy st (d:ds :: [DeferredDispatcher s]) msg
      | length ds > 0  = let dh = dispatchInfo d in do
        -- NB: we *do not* want to terminate/dead-letter messages until
        -- we've exhausted all the possible info handlers
        m <- dh st msg
        case m of
          Nothing  -> auxHandler policy st ds msg
          Just act -> return act
        -- but here we *do* let the policy kick in
      | otherwise = let dh = dispatchInfo d in do
        m <- dh st msg
        case m of
          Nothing  -> policy msg
          Just act -> return act

    processReceive :: [Match (ProcessAction s)]
                   -> TimeoutHandler s
                   -> s
                   -> Delay
                   -> Process (ProcessAction s)
    processReceive ms handleTimeout st d = do
      next <- recv ms d
      case next of
        Nothing -> handleTimeout st d
        Just pa -> return pa

    recv :: [Match (ProcessAction s)]
         -> Delay
         -> Process (Maybe (ProcessAction s))
    recv matches d' =
      case d' of
        Infinity -> receiveWait matches >>= return . Just
        NoDelay  -> receiveTimeout 0 matches
        Delay t' -> receiveTimeout (asTimeout t') matches

--------------------------------------------------------------------------------
-- Simulated Receive Timeouts                                                 --
--------------------------------------------------------------------------------

startTimer :: Delay -> Process TimeoutSpec
startTimer d
  | Delay t <- d = do sig <- liftIO $ newEmptyTMVarIO
                      tref <- runAfter t $ liftIO $ atomically $ putTMVar sig ()
                      return (d, Just (tref, (readTMVar sig)))
  | otherwise    = return (d, Nothing)

checkTimer :: s
           -> TimeoutSpec
           -> TimeoutHandler s
           -> Process (TimeoutAction s)
checkTimer pState spec handler = let delay = fst spec in do
  timedOut <- pollTimer spec  -- this will cancel the timer
  case timedOut of
    False -> go spec pState
    True  -> do
      act <- handler pState delay
      case act of
        ProcessTimeout   t' s' -> return $ Go t' s'
        ProcessStop      r     -> return $ Stop pState r
        ProcessStopping  s' r  -> return $ Stop s' r
        ProcessHibernate d' s' -> block d' >> go spec s'
        ProcessContinue  s'    -> go spec s'
  where
    go d s = return $ Go (fst d) s

pollTimer :: TimeoutSpec -> Process Bool
pollTimer (_, Nothing         ) = return False
pollTimer (_, Just (tref, sig)) = do
  cancelTimer tref  -- cancelling a dead/completed timer is a no-op
  gotSignal <- liftIO $ atomically $ pollSTM sig
  return $ maybe False (const True) gotSignal
  where
    pollSTM :: (STM ()) -> STM (Maybe ())
    pollSTM sig' = (Just <$> sig') `orElse` return Nothing

--------------------------------------------------------------------------------
-- Utilities                                                                  --
--------------------------------------------------------------------------------

-- an explicit 'cast' giving 'Shutdown' will stop the server gracefully
shutdownHandler' :: Dispatcher s
shutdownHandler' = handleCast (\_ Shutdown -> stop $ ExitNormal)

-- @(ProcessExitException from ExitShutdown)@ will stop the server gracefully
trapExit :: ExitSignalDispatcher s
trapExit = handleExit (\_ _ (r :: ExitReason) -> stop r)

block :: TimeInterval -> Process ()
block i = liftIO $ threadDelay (asTimeout i)

applyPolicy :: UnhandledMessagePolicy
            -> s
            -> P.Message
            -> Process (ProcessAction s)
applyPolicy p s m =
  case p of
    Terminate      -> stop $ ExitOther "UnhandledInput"
    DeadLetter pid -> forward m pid >> continue s
    Drop           -> continue s
    Log            -> logIt >> continue s
  where
    logIt =
      Log.report Log.info Log.logChannel $ "Unhandled Gen Input Message: " ++ (show m)