{-# LANGUAGE CPP, TemplateHaskell, ScopedTypeVariables #-}
{-# LANGUAGE OverloadedStrings, BangPatterns #-}
module NgxExport.Tools.Aggregate (
#ifdef SNAP_AGGREGATE_SERVER
AggregateServerConf,
#endif
ngxExportAggregateService
,reportAggregate
,Foreign.C.Types.CInt (..)
,Foreign.C.Types.CUInt (..)
) where
import NgxExport
#ifdef SNAP_AGGREGATE_SERVER
import NgxExport.Tools.SimpleService
import NgxExport.Tools.SplitService
#endif
import NgxExport.Tools.System
import NgxExport.Tools.TimeInterval
import Language.Haskell.TH
import Network.HTTP.Client
import Foreign.C.Types
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as C8
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.IORef
import Data.Int
#if MIN_VERSION_time(1,9,1)
import Data.Fixed
#endif
import Data.Time.Clock
import Data.Time.Calendar
import Data.Aeson
import Data.Maybe
import Control.Monad
import Control.Exception
import System.IO.Unsafe
import Safe
#ifdef SNAP_AGGREGATE_SERVER
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Control.Monad.IO.Class
import Control.Exception.Enclosed (handleAny)
import Snap.Http.Server
import Snap.Core
#endif
type AggregateValue a = (UTCTime, Map Int32 (UTCTime, Maybe a))
type Aggregate a = IORef (AggregateValue a)
type ReportValue a = Maybe (Int32, Maybe a)
throwUserError :: String -> IO a
throwUserError :: forall a. String -> IO a
throwUserError = IOError -> IO a
forall a. IOError -> IO a
ioError (IOError -> IO a) -> (String -> IOError) -> String -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> IOError
userError
#if MIN_VERSION_time(1,9,1)
asIntegerPart :: forall a. HasResolution a => Integer -> Fixed a
asIntegerPart :: forall a. HasResolution a => Integer -> Fixed a
asIntegerPart = Integer -> Fixed a
forall k (a :: k). Integer -> Fixed a
MkFixed (Integer -> Fixed a) -> (Integer -> Integer) -> Integer -> Fixed a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Fixed a -> Integer
forall k (a :: k) (p :: k -> *). HasResolution a => p a -> Integer
forall (p :: * -> *). p a -> Integer
resolution (Fixed a
forall a. HasCallStack => a
undefined :: Fixed a) Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
*)
{-# SPECIALIZE INLINE asIntegerPart :: Integer -> Pico #-}
#endif
toNominalDiffTime :: TimeInterval -> NominalDiffTime
toNominalDiffTime :: TimeInterval -> NominalDiffTime
toNominalDiffTime =
#if MIN_VERSION_time(1,9,1)
Pico -> NominalDiffTime
secondsToNominalDiffTime (Pico -> NominalDiffTime)
-> (TimeInterval -> Pico) -> TimeInterval -> NominalDiffTime
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Integer -> Pico
forall a. HasResolution a => Integer -> Fixed a
asIntegerPart
#else
fromRational . toRational . secondsToDiffTime
#endif
(Integer -> Pico)
-> (TimeInterval -> Integer) -> TimeInterval -> Pico
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Int -> Integer)
-> (TimeInterval -> Int) -> TimeInterval -> Integer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TimeInterval -> Int
toSec
updateAggregate :: Aggregate a -> ReportValue a -> NominalDiffTime -> IO ()
updateAggregate :: forall a. Aggregate a -> ReportValue a -> NominalDiffTime -> IO ()
updateAggregate Aggregate a
a ReportValue a
s NominalDiffTime
int = do
let (Int32
pid, Maybe a
v) = ReportValue a -> (Int32, Maybe a)
forall a. HasCallStack => Maybe a -> a
fromJust ReportValue a
s
!UTCTime
t <- IO UTCTime
getCurrentTime
Aggregate a
-> (AggregateValue a -> (AggregateValue a, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' Aggregate a
a ((AggregateValue a -> (AggregateValue a, ())) -> IO ())
-> (AggregateValue a -> (AggregateValue a, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$
\(UTCTime
t', Map Int32 (UTCTime, Maybe a)
v') ->
(let (!UTCTime
tn, Map k (UTCTime, b) -> Map k (UTCTime, b)
f) =
if UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
t UTCTime
t' NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
>= NominalDiffTime
int
then (UTCTime
t
,((UTCTime, b) -> Bool) -> Map k (UTCTime, b) -> Map k (UTCTime, b)
forall a k. (a -> Bool) -> Map k a -> Map k a
M.filter (((UTCTime, b) -> Bool)
-> Map k (UTCTime, b) -> Map k (UTCTime, b))
-> ((UTCTime, b) -> Bool)
-> Map k (UTCTime, b)
-> Map k (UTCTime, b)
forall a b. (a -> b) -> a -> b
$
\(UTCTime
t'', b
_) -> UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
t UTCTime
t'' NominalDiffTime -> NominalDiffTime -> Bool
forall a. Ord a => a -> a -> Bool
< NominalDiffTime
int
)
else (UTCTime
t', Map k (UTCTime, b) -> Map k (UTCTime, b)
forall a. a -> a
id)
!vn :: Map Int32 (UTCTime, Maybe a)
vn = Map Int32 (UTCTime, Maybe a) -> Map Int32 (UTCTime, Maybe a)
forall {k} {b}. Map k (UTCTime, b) -> Map k (UTCTime, b)
f (Map Int32 (UTCTime, Maybe a) -> Map Int32 (UTCTime, Maybe a))
-> Map Int32 (UTCTime, Maybe a) -> Map Int32 (UTCTime, Maybe a)
forall a b. (a -> b) -> a -> b
$ (Maybe (UTCTime, Maybe a) -> Maybe (UTCTime, Maybe a))
-> Int32
-> Map Int32 (UTCTime, Maybe a)
-> Map Int32 (UTCTime, Maybe a)
forall k a.
Ord k =>
(Maybe a -> Maybe a) -> k -> Map k a -> Map k a
M.alter
(\Maybe (UTCTime, Maybe a)
old ->
let !new' :: Maybe a
new' = if Maybe (UTCTime, Maybe a) -> Bool
forall a. Maybe a -> Bool
isNothing Maybe (UTCTime, Maybe a)
old Bool -> Bool -> Bool
|| Maybe a -> Bool
forall a. Maybe a -> Bool
isJust Maybe a
v
then Maybe a
v
else (UTCTime, Maybe a) -> Maybe a
forall a b. (a, b) -> b
snd ((UTCTime, Maybe a) -> Maybe a) -> (UTCTime, Maybe a) -> Maybe a
forall a b. (a -> b) -> a -> b
$ Maybe (UTCTime, Maybe a) -> (UTCTime, Maybe a)
forall a. HasCallStack => Maybe a -> a
fromJust Maybe (UTCTime, Maybe a)
old
in (UTCTime, Maybe a) -> Maybe (UTCTime, Maybe a)
forall a. a -> Maybe a
Just (UTCTime
t, Maybe a
new')
) Int32
pid Map Int32 (UTCTime, Maybe a)
v'
in (UTCTime
tn, Map Int32 (UTCTime, Maybe a)
vn)
,()
)
receiveAggregate :: FromJSON a =>
Aggregate a -> L.ByteString -> ByteString -> IO L.ByteString
receiveAggregate :: forall a.
FromJSON a =>
Aggregate a -> ByteString -> ByteString -> IO ByteString
receiveAggregate Aggregate a
a ByteString
v ByteString
sint = do
let !s :: Maybe (ReportValue a)
s = ByteString -> Maybe (ReportValue a)
forall a. FromJSON a => ByteString -> Maybe a
decode' ByteString
v
!int :: NominalDiffTime
int = TimeInterval -> NominalDiffTime
toNominalDiffTime (TimeInterval -> NominalDiffTime)
-> TimeInterval -> NominalDiffTime
forall a b. (a -> b) -> a -> b
$ TimeInterval -> String -> TimeInterval
forall a. Read a => a -> String -> a
readDef (Int -> TimeInterval
Min Int
5) (String -> TimeInterval) -> String -> TimeInterval
forall a b. (a -> b) -> a -> b
$ ByteString -> String
C8.unpack ByteString
sint
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe (ReportValue a) -> Bool
forall a. Maybe a -> Bool
isNothing Maybe (ReportValue a)
s) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. String -> IO a
throwUserError String
"Unreadable aggregate!"
Aggregate a -> ReportValue a -> NominalDiffTime -> IO ()
forall a. Aggregate a -> ReportValue a -> NominalDiffTime -> IO ()
updateAggregate Aggregate a
a (Maybe (ReportValue a) -> ReportValue a
forall a. HasCallStack => Maybe a -> a
fromJust Maybe (ReportValue a)
s) NominalDiffTime
int
ByteString -> IO ByteString
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
"done"
sendAggregate :: ToJSON a =>
Aggregate a -> ByteString -> IO ContentHandlerResult
sendAggregate :: forall a.
ToJSON a =>
Aggregate a -> ByteString -> IO ContentHandlerResult
sendAggregate Aggregate a
a = IO ContentHandlerResult -> ByteString -> IO ContentHandlerResult
forall a b. a -> b -> a
const (IO ContentHandlerResult -> ByteString -> IO ContentHandlerResult)
-> IO ContentHandlerResult -> ByteString -> IO ContentHandlerResult
forall a b. (a -> b) -> a -> b
$ do
AggregateValue a
s <- Aggregate a -> IO (AggregateValue a)
forall a. IORef a -> IO a
readIORef Aggregate a
a
ContentHandlerResult -> IO ContentHandlerResult
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (AggregateValue a -> ByteString
forall a. ToJSON a => a -> ByteString
encode AggregateValue a
s, ByteString
"text/plain", Int
200, [])
#ifdef SNAP_AGGREGATE_SERVER
data AggregateServerConf =
AggregateServerConf { AggregateServerConf -> Int
asPort :: Int
, AggregateServerConf -> TimeInterval
asPurgeInterval :: TimeInterval
} deriving ReadPrec [AggregateServerConf]
ReadPrec AggregateServerConf
Int -> ReadS AggregateServerConf
ReadS [AggregateServerConf]
(Int -> ReadS AggregateServerConf)
-> ReadS [AggregateServerConf]
-> ReadPrec AggregateServerConf
-> ReadPrec [AggregateServerConf]
-> Read AggregateServerConf
forall a.
(Int -> ReadS a)
-> ReadS [a] -> ReadPrec a -> ReadPrec [a] -> Read a
$creadsPrec :: Int -> ReadS AggregateServerConf
readsPrec :: Int -> ReadS AggregateServerConf
$creadList :: ReadS [AggregateServerConf]
readList :: ReadS [AggregateServerConf]
$creadPrec :: ReadPrec AggregateServerConf
readPrec :: ReadPrec AggregateServerConf
$creadListPrec :: ReadPrec [AggregateServerConf]
readListPrec :: ReadPrec [AggregateServerConf]
Read
aggregateServer :: (FromJSON a, ToJSON a) =>
Aggregate a -> ByteString -> AggregateServerConf -> Bool -> IO L.ByteString
aggregateServer :: forall a.
(FromJSON a, ToJSON a) =>
Aggregate a
-> ByteString -> AggregateServerConf -> Bool -> IO ByteString
aggregateServer Aggregate a
a ByteString
u = (AggregateServerConf -> IO ByteString)
-> AggregateServerConf -> Bool -> IO ByteString
forall a. (a -> IO ByteString) -> a -> Bool -> IO ByteString
ignitionService ((AggregateServerConf -> IO ByteString)
-> AggregateServerConf -> Bool -> IO ByteString)
-> (AggregateServerConf -> IO ByteString)
-> AggregateServerConf
-> Bool
-> IO ByteString
forall a b. (a -> b) -> a -> b
$ \AggregateServerConf
conf -> do
let !int :: NominalDiffTime
int = TimeInterval -> NominalDiffTime
toNominalDiffTime (TimeInterval -> NominalDiffTime)
-> TimeInterval -> NominalDiffTime
forall a b. (a -> b) -> a -> b
$ AggregateServerConf -> TimeInterval
asPurgeInterval AggregateServerConf
conf
Config Snap Any -> Snap () -> IO ()
forall (m :: * -> *) a.
MonadSnap m =>
Config m a -> Snap () -> IO ()
simpleHttpServe (Int -> Config Snap Any
forall a. Int -> Config Snap a
asConfig (Int -> Config Snap Any) -> Int -> Config Snap Any
forall a b. (a -> b) -> a -> b
$ AggregateServerConf -> Int
asPort AggregateServerConf
conf) (Snap () -> IO ()) -> Snap () -> IO ()
forall a b. (a -> b) -> a -> b
$ Aggregate a -> ByteString -> NominalDiffTime -> Snap ()
forall a.
(FromJSON a, ToJSON a) =>
Aggregate a -> ByteString -> NominalDiffTime -> Snap ()
asHandler Aggregate a
a ByteString
u NominalDiffTime
int
ByteString -> IO ByteString
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
""
asConfig :: Int -> Config Snap a
asConfig :: forall a. Int -> Config Snap a
asConfig Int
p = Int -> Config Snap a -> Config Snap a
forall (m :: * -> *) a. Int -> Config m a -> Config m a
setPort Int
p
(Config Snap a -> Config Snap a) -> Config Snap a -> Config Snap a
forall a b. (a -> b) -> a -> b
$ ByteString -> Config Snap a -> Config Snap a
forall (m :: * -> *) a. ByteString -> Config m a -> Config m a
setBind ByteString
"127.0.0.1"
(Config Snap a -> Config Snap a) -> Config Snap a -> Config Snap a
forall a b. (a -> b) -> a -> b
$ ConfigLog -> Config Snap a -> Config Snap a
forall (m :: * -> *) a. ConfigLog -> Config m a -> Config m a
setAccessLog ConfigLog
ConfigNoLog
(Config Snap a -> Config Snap a) -> Config Snap a -> Config Snap a
forall a b. (a -> b) -> a -> b
$ ConfigLog -> Config Snap a -> Config Snap a
forall (m :: * -> *) a. ConfigLog -> Config m a -> Config m a
setErrorLog ConfigLog
ConfigNoLog
(Config Snap a -> Config Snap a) -> Config Snap a -> Config Snap a
forall a b. (a -> b) -> a -> b
$ Bool -> Config Snap a -> Config Snap a
forall (m :: * -> *) a. Bool -> Config m a -> Config m a
setVerbose Bool
False Config Snap a
forall a. Monoid a => a
mempty
asHandler :: (FromJSON a, ToJSON a) =>
Aggregate a -> ByteString -> NominalDiffTime -> Snap ()
asHandler :: forall a.
(FromJSON a, ToJSON a) =>
Aggregate a -> ByteString -> NominalDiffTime -> Snap ()
asHandler Aggregate a
a ByteString
u NominalDiffTime
int =
[(ByteString, Snap ())] -> Snap ()
forall (m :: * -> *) a. MonadSnap m => [(ByteString, m a)] -> m a
route [(ByteString -> ByteString -> ByteString
B.append ByteString
"put/" ByteString
u
,Method -> Snap () -> Snap ()
forall (m :: * -> *) a. MonadSnap m => Method -> m a -> m a
Snap.Core.method Method
POST (Snap () -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ Aggregate a -> NominalDiffTime -> Snap ()
forall a. FromJSON a => Aggregate a -> NominalDiffTime -> Snap ()
receiveAggregateSnap Aggregate a
a NominalDiffTime
int
)
,(ByteString -> ByteString -> ByteString
B.append ByteString
"get/" ByteString
u
,Method -> Snap () -> Snap ()
forall (m :: * -> *) a. MonadSnap m => Method -> m a -> m a
Snap.Core.method Method
GET (Snap () -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ Aggregate a -> Snap ()
forall a. ToJSON a => Aggregate a -> Snap ()
sendAggregateSnap Aggregate a
a
)
]
receiveAggregateSnap :: FromJSON a => Aggregate a -> NominalDiffTime -> Snap ()
receiveAggregateSnap :: forall a. FromJSON a => Aggregate a -> NominalDiffTime -> Snap ()
receiveAggregateSnap Aggregate a
a NominalDiffTime
int =
String -> Snap () -> Snap ()
handleAggregateExceptions String
"Exception while receiving aggregate" (Snap () -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ do
!Maybe (ReportValue a)
s <- ByteString -> Maybe (ReportValue a)
forall a. FromJSON a => ByteString -> Maybe a
decode' (ByteString -> Maybe (ReportValue a))
-> Snap ByteString -> Snap (Maybe (ReportValue a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Word64 -> Snap ByteString
forall (m :: * -> *). MonadSnap m => Word64 -> m ByteString
readRequestBody Word64
65536
Bool -> Snap () -> Snap ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe (ReportValue a) -> Bool
forall a. Maybe a -> Bool
isNothing Maybe (ReportValue a)
s) (Snap () -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ IO () -> Snap ()
forall a. IO a -> Snap a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Snap ()) -> IO () -> Snap ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. String -> IO a
throwUserError String
"Unreadable aggregate!"
IO () -> Snap ()
forall a. IO a -> Snap a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Snap ()) -> IO () -> Snap ()
forall a b. (a -> b) -> a -> b
$ Aggregate a -> ReportValue a -> NominalDiffTime -> IO ()
forall a. Aggregate a -> ReportValue a -> NominalDiffTime -> IO ()
updateAggregate Aggregate a
a (Maybe (ReportValue a) -> ReportValue a
forall a. HasCallStack => Maybe a -> a
fromJust Maybe (ReportValue a)
s) NominalDiffTime
int
Response -> Snap ()
forall (m :: * -> *) a. MonadSnap m => Response -> m a
finishWith Response
emptyResponse
sendAggregateSnap :: ToJSON a => Aggregate a -> Snap ()
sendAggregateSnap :: forall a. ToJSON a => Aggregate a -> Snap ()
sendAggregateSnap Aggregate a
a =
String -> Snap () -> Snap ()
handleAggregateExceptions String
"Exception while sending aggregate" (Snap () -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ do
AggregateValue a
s <- IO (AggregateValue a) -> Snap (AggregateValue a)
forall a. IO a -> Snap a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (AggregateValue a) -> Snap (AggregateValue a))
-> IO (AggregateValue a) -> Snap (AggregateValue a)
forall a b. (a -> b) -> a -> b
$ Aggregate a -> IO (AggregateValue a)
forall a. IORef a -> IO a
readIORef Aggregate a
a
(Response -> Response) -> Snap ()
forall (m :: * -> *). MonadSnap m => (Response -> Response) -> m ()
modifyResponse ((Response -> Response) -> Snap ())
-> (Response -> Response) -> Snap ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Response -> Response
setContentType ByteString
"application/json"
ByteString -> Snap ()
forall (m :: * -> *). MonadSnap m => ByteString -> m ()
writeLBS (ByteString -> Snap ()) -> ByteString -> Snap ()
forall a b. (a -> b) -> a -> b
$ AggregateValue a -> ByteString
forall a. ToJSON a => a -> ByteString
encode AggregateValue a
s
handleAggregateExceptions :: String -> Snap () -> Snap ()
handleAggregateExceptions :: String -> Snap () -> Snap ()
handleAggregateExceptions String
cmsg = (SomeException -> Snap ()) -> Snap () -> Snap ()
forall (m :: * -> *) a.
MonadBaseControl IO m =>
(SomeException -> m a) -> m a -> m a
handleAny ((SomeException -> Snap ()) -> Snap () -> Snap ())
-> (SomeException -> Snap ()) -> Snap () -> Snap ()
forall a b. (a -> b) -> a -> b
$ \SomeException
e ->
Int -> String -> Snap ()
forall {m :: * -> *}. MonadSnap m => Int -> String -> m ()
writeErrorResponse Int
500 (String -> Snap ()) -> String -> Snap ()
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show (SomeException
e :: SomeException)
where writeErrorResponse :: Int -> String -> m ()
writeErrorResponse Int
c String
msg = do
(Response -> Response) -> m ()
forall (m :: * -> *). MonadSnap m => (Response -> Response) -> m ()
modifyResponse ((Response -> Response) -> m ()) -> (Response -> Response) -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> Response -> Response
setResponseStatus Int
c (ByteString -> Response -> Response)
-> ByteString -> Response -> Response
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
T.encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
cmsg
ByteString -> m ()
forall (m :: * -> *). MonadSnap m => ByteString -> m ()
writeBS (ByteString -> m ()) -> ByteString -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> ByteString
T.encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
msg
#endif
ngxExportAggregateService :: String
-> Name
-> Q [Dec]
ngxExportAggregateService :: String -> Name -> Q [Dec]
ngxExportAggregateService String
f Name
a = do
let sName :: Name
sName = String -> Name
mkName (String -> Name) -> String -> Name
forall a b. (a -> b) -> a -> b
$ String
"aggregate_storage_" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
f
storage :: Q Exp
storage = Name -> Q Exp
forall (m :: * -> *). Quote m => Name -> m Exp
varE Name
sName
#ifdef SNAP_AGGREGATE_SERVER
nameF :: Name
nameF = 'aggregateServer
fName :: Name
fName = String -> Name
mkName (String -> Name) -> String -> Name
forall a b. (a -> b) -> a -> b
$ String
"aggregate_" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
f
uName :: Name
uName = String -> Name
mkName (String -> Name) -> String -> Name
forall a b. (a -> b) -> a -> b
$ String
"aggregate_url_" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
f
#endif
nameRecv :: Name
nameRecv = 'receiveAggregate
recvName :: Name
recvName = String -> Name
mkName (String -> Name) -> String -> Name
forall a b. (a -> b) -> a -> b
$ String
"receiveAggregate_" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
f
nameSend :: Name
nameSend = 'sendAggregate
sendName :: Name
sendName = String -> Name
mkName (String -> Name) -> String -> Name
forall a b. (a -> b) -> a -> b
$ String
"sendAggregate_" String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
f
[[Dec]] -> [Dec]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[Dec]] -> [Dec]) -> Q [[Dec]] -> Q [Dec]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Q [Dec]] -> Q [[Dec]]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence
[[Q Dec] -> Q [Dec]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence
[Name -> Q Type -> Q Dec
forall (m :: * -> *). Quote m => Name -> m Type -> m Dec
sigD Name
sName [t|Aggregate $(conT a)|]
,Name -> [Q Clause] -> Q Dec
forall (m :: * -> *). Quote m => Name -> [m Clause] -> m Dec
funD Name
sName
[[Q Pat] -> Q Body -> [Q Dec] -> Q Clause
forall (m :: * -> *).
Quote m =>
[m Pat] -> m Body -> [m Dec] -> m Clause
clause []
(Q Exp -> Q Body
forall (m :: * -> *). Quote m => m Exp -> m Body
normalB [|unsafePerformIO $
newIORef (UTCTime (ModifiedJulianDay 0) 0
,M.empty
)
|]
)
[]
]
,Name -> Inline -> RuleMatch -> Phases -> Q Dec
forall (m :: * -> *).
Quote m =>
Name -> Inline -> RuleMatch -> Phases -> m Dec
pragInlD Name
sName Inline
NoInline RuleMatch
FunLike Phases
AllPhases
#ifdef SNAP_AGGREGATE_SERVER
,Name -> Q Type -> Q Dec
forall (m :: * -> *). Quote m => Name -> m Type -> m Dec
sigD Name
uName [t|ByteString|]
,Name -> [Q Clause] -> Q Dec
forall (m :: * -> *). Quote m => Name -> [m Clause] -> m Dec
funD Name
uName [[Q Pat] -> Q Body -> [Q Dec] -> Q Clause
forall (m :: * -> *).
Quote m =>
[m Pat] -> m Body -> [m Dec] -> m Clause
clause [] (Q Exp -> Q Body
forall (m :: * -> *). Quote m => m Exp -> m Body
normalB [|C8.pack f|]) []]
,Name -> Q Type -> Q Dec
forall (m :: * -> *). Quote m => Name -> m Type -> m Dec
sigD Name
fName [t|AggregateServerConf -> Bool -> IO L.ByteString|]
,Name -> [Q Clause] -> Q Dec
forall (m :: * -> *). Quote m => Name -> [m Clause] -> m Dec
funD Name
fName
[[Q Pat] -> Q Body -> [Q Dec] -> Q Clause
forall (m :: * -> *).
Quote m =>
[m Pat] -> m Body -> [m Dec] -> m Clause
clause []
(Q Exp -> Q Body
forall (m :: * -> *). Quote m => m Exp -> m Body
normalB [|$(varE nameF) $(storage) $(varE uName)|])
[]
]
#endif
,Name -> Q Type -> Q Dec
forall (m :: * -> *). Quote m => Name -> m Type -> m Dec
sigD Name
recvName [t|L.ByteString -> ByteString -> IO L.ByteString|]
,Name -> [Q Clause] -> Q Dec
forall (m :: * -> *). Quote m => Name -> [m Clause] -> m Dec
funD Name
recvName
[[Q Pat] -> Q Body -> [Q Dec] -> Q Clause
forall (m :: * -> *).
Quote m =>
[m Pat] -> m Body -> [m Dec] -> m Clause
clause []
(Q Exp -> Q Body
forall (m :: * -> *). Quote m => m Exp -> m Body
normalB [|$(varE nameRecv) $(storage)|])
[]
]
,Name -> Q Type -> Q Dec
forall (m :: * -> *). Quote m => Name -> m Type -> m Dec
sigD Name
sendName [t|ByteString -> IO ContentHandlerResult|]
,Name -> [Q Clause] -> Q Dec
forall (m :: * -> *). Quote m => Name -> [m Clause] -> m Dec
funD Name
sendName
[[Q Pat] -> Q Body -> [Q Dec] -> Q Clause
forall (m :: * -> *).
Quote m =>
[m Pat] -> m Body -> [m Dec] -> m Clause
clause []
(Q Exp -> Q Body
forall (m :: * -> *). Quote m => m Exp -> m Body
normalB [|$(varE nameSend) $(storage)|])
[]
]
]
#ifdef SNAP_AGGREGATE_SERVER
,Name -> Name -> ServiceMode -> Q [Dec]
ngxExportSimpleServiceTyped
Name
fName ''AggregateServerConf ServiceMode
SingleShotService
#endif
,Name -> Q [Dec]
ngxExportAsyncOnReqBody Name
recvName
,Name -> Q [Dec]
ngxExportAsyncHandler Name
sendName
]
reportAggregate :: ToJSON a => Int
-> Maybe a
-> ByteString
-> IO ()
reportAggregate :: forall a. ToJSON a => Int -> Maybe a -> ByteString -> IO ()
reportAggregate Int
p Maybe a
v ByteString
u =
(SomeException -> IO ()) -> IO () -> IO ()
forall e a. Exception e => (e -> IO a) -> IO a -> IO a
handle (IO () -> SomeException -> IO ()
forall a b. a -> b -> a
const (IO () -> SomeException -> IO ())
-> IO () -> SomeException -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return () :: SomeException -> IO ()) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Request
req <- String -> IO Request
forall (m :: * -> *). MonadThrow m => String -> m Request
parseRequest String
"POST http://127.0.0.1"
Int32
pid <- CPid -> Int32
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CPid -> Int32) -> IO CPid -> IO Int32
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO CPid
ngxPid :: IO Int32
let !req' :: Request
req' = Request
req { requestBody :: RequestBody
requestBody = ByteString -> RequestBody
RequestBodyLBS (ByteString -> RequestBody) -> ByteString -> RequestBody
forall a b. (a -> b) -> a -> b
$ (Int32, Maybe a) -> ByteString
forall a. ToJSON a => a -> ByteString
encode (Int32
pid, Maybe a
v)
, port :: Int
port = Int
p
, path :: ByteString
Network.HTTP.Client.path = ByteString -> ByteString -> ByteString
B.append ByteString
"put/" ByteString
u
}
IO (Response ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Response ()) -> IO ()) -> IO (Response ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Request -> Manager -> IO (Response ())
httpNoBody Request
req' Manager
httpManager
httpManager :: Manager
httpManager :: Manager
httpManager = IO Manager -> Manager
forall a. IO a -> a
unsafePerformIO (IO Manager -> Manager) -> IO Manager -> Manager
forall a b. (a -> b) -> a -> b
$ ManagerSettings -> IO Manager
newManager ManagerSettings
defaultManagerSettings
{-# NOINLINE httpManager #-}