{-# LANGUAGE DeriveDataTypeable    #-}
{-# LANGUAGE DeriveGeneric         #-}
{-# LANGUAGE StandaloneDeriving    #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE PatternGuards         #-}
{-# LANGUAGE RecordWildCards       #-}
{-# LANGUAGE FlexibleInstances     #-}
{-# LANGUAGE EmptyDataDecls        #-}
{-# LANGUAGE TemplateHaskell       #-}
{-# LANGUAGE ImpredicativeTypes    #-}
{-# LANGUAGE UndecidableInstances  #-}
{-# LANGUAGE MultiParamTypeClasses #-}

-- | A simple API for /routing/, using a custom exchange type.
module Control.Distributed.Process.Execution.Exchange.Router
  ( -- * Types
    HeaderName
  , Binding(..)
  , Bindable
  , BindingSelector
  , RelayType(..)
    -- * Starting a Router
  , router
  , supervisedRouter
  , supervisedRouterRef
    -- * Client (Publishing) API
  , route
  , routeMessage
    -- * Routing via message/binding keys
  , messageKeyRouter
  , bindKey
    -- * Routing via message headers
  , headerContentRouter
  , bindHeader
  ) where

import Control.DeepSeq (NFData)
import Control.Distributed.Process
  ( Process
  , ProcessMonitorNotification(..)
  , ProcessId
  , monitor
  , handleMessage
  , unsafeWrapMessage
  )
import qualified Control.Distributed.Process as P
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Execution.Exchange.Internal
  ( startExchange
  , startSupervised
  , configureExchange
  , Message(..)
  , Exchange
  , ExchangeType(..)
  , post
  , postMessage
  , applyHandlers
  )
import Control.Distributed.Process.Extras.Internal.Primitives
  ( deliver
  , Resolvable(..)
  )
import Control.Distributed.Process.Supervisor (SupervisorPid)
import Data.Binary
import Data.Foldable (forM_)
import Data.Hashable
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as Map
import Data.HashSet (HashSet)
import qualified Data.HashSet as Set
import Data.Typeable (Typeable)
import GHC.Generics

type HeaderName = String

-- | The binding key used by the built-in key and header based
-- routers.
data Binding =
    BindKey    { bindingKey :: !String }
  | BindHeader { bindingKey :: !String
               , headerName :: !HeaderName
               }
  | BindNone
  deriving (Typeable, Generic, Eq, Show)
instance Binary Binding where
instance NFData Binding where
instance Hashable Binding where

-- | Things that can be used as binding keys in a router.
class (Hashable k, Eq k, Serializable k) => Bindable k
instance (Hashable k, Eq k, Serializable k) => Bindable k

-- | Used to convert a 'Message' into a 'Bindable' routing key.
type BindingSelector k = (Message -> Process k)

-- | Given to a /router/ to indicate whether clients should
-- receive 'Message' payloads only, or the whole 'Message' object
-- itself.
data RelayType = PayloadOnly | WholeMessage

data State k = State { bindings  :: !(HashMap k (HashSet ProcessId))
                     , selector  :: !(BindingSelector k)
                     , relayType :: !RelayType
                     }

type Router k = ExchangeType (State k)

--------------------------------------------------------------------------------
-- Starting/Running the Exchange                                              --
--------------------------------------------------------------------------------

-- | A router that matches on a 'Message' 'key'. To bind a client @Process@ to
-- such an exchange, use the 'bindKey' function.
messageKeyRouter :: RelayType -> Process Exchange
messageKeyRouter t = router t matchOnKey -- (return . BindKey . key)
  where
    matchOnKey :: Message -> Process Binding
    matchOnKey m = return $ BindKey (key m)

-- | A router that matches on a specific (named) header. To bind a client
-- @Process@ to such an exchange, use the 'bindHeader' function.
headerContentRouter :: RelayType -> HeaderName -> Process Exchange
headerContentRouter t n = router t (checkHeaders n)
  where
    checkHeaders hn Message{..} = do
      case Map.lookup hn (Map.fromList headers) of
        Nothing -> return BindNone
        Just hv -> return $ BindHeader hn hv

-- | Defines a /router/ exchange. The 'BindingSelector' is used to construct
-- a binding (i.e., an instance of the 'Bindable' type @k@) for each incoming
-- 'Message'. Such bindings are matched against bindings stored in the exchange.
-- Clients of a /router/ exchange are identified by a binding, mapped to
-- one or more 'ProcessId's.
--
-- The format of the bindings, nature of their storage and mechanism for
-- submitting new bindings is implementation dependent (i.e., will vary by
-- exchange type). For example, the 'messageKeyRouter' and 'headerContentRouter'
-- implementations both use the 'Binding' data type, which can represent a
-- 'Message' key or a 'HeaderName' and content. As with all custom exchange
-- types, bindings should be submitted by evaluating 'configureExchange' with
-- a suitable data type.
--
router :: (Bindable k) => RelayType -> BindingSelector k -> Process Exchange
router t s = routerT t s >>= startExchange

supervisedRouterRef :: Bindable k
                    => RelayType
                    -> BindingSelector k
                    -> SupervisorPid
                    -> Process (ProcessId, P.Message)
supervisedRouterRef t sel spid = do
  ex <- supervisedRouter t sel spid
  Just pid <- resolve ex
  return (pid, unsafeWrapMessage ex)

-- | Defines a /router/ that can be used in a supervision tree.
supervisedRouter :: Bindable k
                 => RelayType
                 -> BindingSelector k
                 -> SupervisorPid
                 -> Process Exchange
supervisedRouter t sel spid =
  routerT t sel >>= \t' -> startSupervised t' spid

routerT :: Bindable k
        => RelayType
        -> BindingSelector k
        -> Process (Router k)
routerT t s = do
  return $ ExchangeType { name        = "Router"
                        , state       = State Map.empty s t
                        , configureEx = apiConfigure
                        , routeEx     = apiRoute
                        }

--------------------------------------------------------------------------------
-- Client Facing API                                                          --
--------------------------------------------------------------------------------

-- | Add a binding (for the calling process) to a 'messageKeyRouter' exchange.
bindKey :: String -> Exchange -> Process ()
bindKey k ex = do
  self <- P.getSelfPid
  configureExchange ex (self, BindKey k)

-- | Add a binding (for the calling process) to a 'headerContentRouter' exchange.
bindHeader :: HeaderName -> String -> Exchange -> Process ()
bindHeader n v ex = do
  self <- P.getSelfPid
  configureExchange ex (self, BindHeader v n)

-- | Send a 'Serializable' message to the supplied 'Exchange'. The given datum
-- will be converted to a 'Message', with the 'key' set to @""@ and the
-- 'headers' to @[]@.
--
-- The routing behaviour will be dependent on the choice of 'BindingSelector'
-- given when initialising the /router/.
route :: Serializable m => Exchange -> m -> Process ()
route = post

-- | Send a 'Message' to the supplied 'Exchange'.
-- The routing behaviour will be dependent on the choice of 'BindingSelector'
-- given when initialising the /router/.
routeMessage :: Exchange -> Message -> Process ()
routeMessage = postMessage

--------------------------------------------------------------------------------
-- Exchage Definition/State & API Handlers                                    --
--------------------------------------------------------------------------------

apiRoute :: forall k. Bindable k
         => State k
         -> Message
         -> Process (State k)
apiRoute st@State{..} msg = do
  binding <- selector msg
  case Map.lookup binding bindings of
    Nothing -> return st
    Just bs -> forM_ bs (fwd relayType msg) >> return st
  where
    fwd WholeMessage m = deliver m
    fwd PayloadOnly  m = P.forward (payload m)

-- TODO: implement 'unbind' ???
-- TODO: apiConfigure currently leaks memory if clients die (we don't cleanup)

apiConfigure :: forall k. Bindable k
             => State k
             -> P.Message
             -> Process (State k)
apiConfigure st msg = do
  applyHandlers st msg $ [ \m -> handleMessage m (createBinding st)
                         , \m -> handleMessage m (handleMonitorSignal st)
                         ]
  where
    createBinding s@State{..} (pid, bind) = do
      case Map.lookup bind bindings of
        Nothing -> do _ <- monitor pid
                      return $ s { bindings = newBind bind pid bindings }
        Just ps -> return $ s { bindings = addBind bind pid bindings ps }

    newBind b p bs = Map.insert b (Set.singleton p) bs
    addBind b' p' bs' ps = Map.insert b' (Set.insert p' ps) bs'

    handleMonitorSignal s@State{..} (ProcessMonitorNotification _ p _) =
      let bs  = bindings
          bs' = Map.foldlWithKey' (\a k v -> Map.insert k (Set.delete p v) a) bs bs
      in return $ s { bindings = bs' }