module Network.Wai.Middleware.Consul
( ConsulSettings
, defaultConsulSettings
, getConsulCallback
, getConsulFilter
, getConsulHost
, getConsulKey
, getConsulLimit
, getConsulPort
, mkConsulProxy
, mkConsulWatch
, setConsulCallback
, setConsulFilter
, setConsulHost
, setConsulKey
, setConsulLimit
, setConsulPort
, withConsul
) where
import BasePrelude
import Control.Concurrent.Async ( race )
import Control.Exception.Enclosed ( catchAny )
import Control.Monad.IO.Class ( MonadIO(..), liftIO )
import Control.Monad.Logger ( MonadLoggerIO, logWarn )
import Control.Monad.Trans.Control
( MonadBaseControl(liftBaseWith, restoreM) )
import Control.Monad.Trans.Resource ( runResourceT )
import qualified Data.ByteString.Lazy as LB ( toStrict )
import Data.Conduit ( ($$) )
import Data.Void ( Void(..), absurd )
import qualified Data.Conduit.Binary as C ( take )
import Data.Text ( Text )
import qualified Data.Text as T ( pack )
import Network.Consul
( KeyValue(kvModifyIndex),
KeyValuePut(KeyValuePut, kvpCasIndex, kvpFlags, kvpKey, kvpValue),
putKey,
initializeConsulClient,
getKey )
import Network.HTTP.Client
( defaultManagerSettings, managerResponseTimeout )
import Network.HTTP.Types ( status201, methodPost )
import Network.Socket ( PortNumber(PortNum) )
import Network.Wai
( Middleware, Request, responseBuilder, pathInfo, requestMethod )
import Network.Wai.Conduit ( sourceRequestBody )
data ConsulSettings =
ConsulSettings {csHost :: Text
,csPort :: PortNumber
,csKey :: Text
,csFilter :: Request -> Bool
,csLimit :: Maybe Int
,csCallback :: ConsulCallback
}
type ConsulCallback = forall (m :: * -> *).
(MonadBaseControl IO m,MonadLoggerIO m) => KeyValue -> m ()
defaultConsulSettings :: ConsulSettings
defaultConsulSettings =
ConsulSettings {csHost = "0.0.0.0"
,csPort = PortNum 8500
,csKey = "wai"
,csFilter =
(\req ->
(requestMethod req == methodPost) &&
(pathInfo req ==
["wai"]))
,csLimit = Nothing
,csCallback = liftIO . print}
setConsulHost :: Text -> ConsulSettings -> ConsulSettings
setConsulHost a b = b { csHost = a }
getConsulHost :: ConsulSettings -> Text
getConsulHost = csHost
setConsulPort :: PortNumber -> ConsulSettings -> ConsulSettings
setConsulPort a b = b { csPort = a }
getConsulPort :: ConsulSettings -> PortNumber
getConsulPort = csPort
setConsulKey :: Text -> ConsulSettings -> ConsulSettings
setConsulKey a b = b { csKey = a }
getConsulKey :: ConsulSettings -> Text
getConsulKey = csKey
setConsulFilter :: (Request -> Bool) -> ConsulSettings -> ConsulSettings
setConsulFilter a b = b { csFilter = a }
getConsulFilter :: ConsulSettings -> Request -> Bool
getConsulFilter = csFilter
setConsulLimit :: Maybe Int -> ConsulSettings -> ConsulSettings
setConsulLimit a b = b { csLimit = a }
getConsulLimit :: ConsulSettings -> Maybe Int
getConsulLimit = csLimit
setConsulCallback :: ConsulCallback -> ConsulSettings -> ConsulSettings
setConsulCallback a b = b { csCallback = a }
getConsulCallback :: ConsulSettings -> ConsulCallback
getConsulCallback = csCallback
withConsul :: (Monad m,MonadBaseControl IO m,MonadLoggerIO m)
=> ConsulSettings -> (Middleware -> m a) -> m a
withConsul cs f =
fmap (either absurd id)
(liftRace (mkConsulWatch cs)
(mkConsulProxy cs >>= f))
liftRace :: MonadBaseControl IO m
=> m a -> m b -> m (Either a b)
liftRace x y =
do res <-
liftBaseWith
(\run ->
race (run x)
(run y))
case res of
Left x' -> Left <$> restoreM x'
Right y' -> Right <$> restoreM y'
mkConsulWatch :: (MonadBaseControl IO m,MonadLoggerIO m)
=> ConsulSettings -> m Void
mkConsulWatch cs =
initializeConsulClient (csHost cs)
(csPort cs)
Nothing >>=
go 0 >>=
pure . absurd
where go idx' cc =
catchAny (do kv <-
getKey cc
(csKey cs)
(Just idx')
Nothing
Nothing
case kv of
Nothing ->
do liftIO (threadDelay $ 1000 * 1000)
go idx' cc
(Just kv') ->
do (csCallback cs $ kv')
go (kvModifyIndex kv') cc)
(\ex ->
do liftIO (threadDelay $ 1000 * 1000)
$(logWarn) (T.pack $ show ex)
go idx' cc)
mkConsulProxy :: (MonadIO m,Functor m)
=> ConsulSettings -> m Middleware
mkConsulProxy cs =
proxyToConsul <$>
initializeConsulClient (csHost cs)
(csPort cs)
Nothing
where proxyToConsul cc app' req respond
| csFilter cs req =
do bs <-
liftIO (runResourceT $
sourceRequestBody req $$
C.take (fromMaybe 5242880 (csLimit cs)))
let keyValuePut =
KeyValuePut {kvpKey = csKey cs
,kvpValue = LB.toStrict bs
,kvpCasIndex = Nothing
,kvpFlags = Nothing}
_workedOK <-
putKey cc keyValuePut Nothing
respond (responseBuilder status201 [] mempty)
| otherwise = app' req respond