ngx-export-tools-extra-1.2.8.1: More extra tools for Nginx Haskell module
Copyright(c) Alexey Radkov 2019-2023
LicenseBSD-style
Maintaineralexey.radkov@gmail.com
Stabilitystable
Portabilitynon-portable (requires Template Haskell)
Safe HaskellSafe-Inferred
LanguageHaskell2010

NgxExport.Tools.Aggregate

Description

An aggregate service from the more extra tools collection for nginx-haskell-module.

Synopsis

The typed service exporter

An aggregate service collects custom typed data reported by worker processes and sends this via HTTP when requested. This is an ignitionService in terms of module NgxExport.Tools.SplitService, which means that it starts upon the startup of the worker process and runs until termination of the worker. Internally, an aggregate service starts an HTTP server implemented via the Snap framework, which serves incoming requests from worker processes (collecting data) as well as from the Nginx server's clients (reporting collected data for administration purpose).

Below is a simple example.

File test_tools_extra_aggregate.hs

{-# LANGUAGE TemplateHaskell, DeriveGeneric, TypeApplications #-}
{-# LANGUAGE OverloadedStrings, BangPatterns #-}

module TestToolsExtraAggregate where

import           NgxExport
import           NgxExport.Tools
import           NgxExport.Tools.Aggregate

import           Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy.Char8 as C8L
import           Data.Aeson
import           Data.Maybe
import           Data.IORef
import           System.IO.Unsafe
import           GHC.Generics

data Stats = Stats { bytesSent :: Int
                   , requests :: Int
                   , meanBytesSent :: Int
                   } deriving Generic
instance FromJSON Stats
instance ToJSON Stats

stats :: IORef Stats
stats = unsafePerformIO $ newIORef $ Stats 0 0 0
{-# NOINLINE stats #-}

updateStats :: ByteString -> IO C8L.ByteString
updateStats s = voidHandler $ do
    let cbs = readFromByteString @Int s
    modifyIORef' stats $ \(Stats bs rs _) ->
        let !nbs = bs + fromMaybe 0 cbs
            !nrs = rs + 1
            !nmbs = nbs `div` nrs
        in Stats nbs nrs nmbs
ngxExportIOYY 'updateStats

reportStats :: Int -> Bool -> IO C8L.ByteString
reportStats = deferredService $ \port -> voidHandler $ do
    s <- readIORef stats
    reportAggregate port (Just s) "stats"
ngxExportSimpleServiceTyped 'reportStats ''Int $
    PersistentService $ Just $ Sec 5

ngxExportAggregateService "stats" ''Stats

Here, on the bottom line, aggregate service stats is declared. It expects from worker processes reports in JSON format with data of type Stats which includes the number of bytes sent so far, the number of client requests, and the mean value of bytes sent per a single request. Its own configuration (a TCP port and the purge interval) shall be defined in the Nginx configuration file. The reports from worker processes are sent from a deferredService reportStats every 5 seconds: it merely reads data collected in a global IORef stats and then sends this to the aggregate service using reportAggregate. Handler updateStats updates the stats on every run. It accepts a ByteString from Nginx, then converts it to an Int value and interprets this as the number of bytes sent in the current request. It also increments the number or requests and calculates the mean value of bytes sent in all requests to this worker so far. Notice that all the parts of stats are evaluated strictly, it is important!

File nginx.conf

user                    nobody;
worker_processes        2;

events {
    worker_connections  1024;
}

http {
    default_type        application/octet-stream;
    sendfile            on;

    haskell load /var/lib/nginx/test_tools_extra_aggregate.so;

    haskell_run_service simpleService_aggregate_stats $hs_stats
            'AggregateServerConf { asPort = 8100, asPurgeInterval = Min 5 }';

    haskell_service_var_in_shm stats 32k /tmp $hs_stats;

    haskell_run_service simpleService_reportStats $hs_reportStats 8100;

    server {
        listen       8010;
        server_name  main;
        error_log    /tmp/nginx-test-haskell-error.log;
        access_log   /tmp/nginx-test-haskell-access.log;

        haskell_run updateStats !$hs_updateStats $bytes_sent;

        location / {
            echo Ok;
        }
    }

    server {
        listen       8020;
        server_name  stat;

        location / {
            allow 127.0.0.1;
            deny all;
            proxy_pass http://127.0.0.1:8100/get/stats;
        }
    }
}

The aggregate service stats must be referred from the Nginx configuration file with prefix simpleService_aggregate_. Its configuration is typed, the type is AggregateServerConf. Though its only constructor AggregateServerConf is not exported from this module, the service is still configurable from an Nginx configuration. Here, the aggregate service listens on TCP port 8100, and its purge interval is 5 minutes. Notice that an aggregate service must be shared (here, variable $hs_stats is declared as shared with Nginx directive haskell_service_var_in_shm), otherwise it won't even start because the internal HTTP servers on each worker process won't be able to bind to the same TCP port. Inside the upper server clause, handler updateStats runs on every client request. This handler always returns an empty string in variable $hs_updateStats because it is only needed for the side effect of updating the stats. However, as soon as Nginx variable handlers are lazy, evaluation of $hs_updateStats must be forced somehow. To achieve this, we used the strict annotation (the bang symbol) in directive haskell_run that enforces strict evaluation in a late request processing phase, when the value of variable $bytes_sent has been already calculated.

Data collected by the aggregate service can be obtained in a request to the virtual server listening on TCP port 8020. It simply proxies requests to the internal aggregate server with URL /get/stats where stats corresponds to the name of the aggregate service.

A simple test

As far as reportStats is a deferred service, we won't get useful data in 5 seconds after Nginx start.

$ curl -s 'http://127.0.0.1:8020/' | jq
[
  "1858-11-17T00:00:00Z",
  {}
]

However, later we should get some useful data.

$ curl -s 'http://127.0.0.1:8020/' | jq
[
  "2021-12-08T09:56:18.118132083Z",
  {
    "21651": [
      "2021-12-08T09:56:18.12155413Z",
      {
        "meanBytesSent": 0,
        "requests": 0,
        "bytesSent": 0
      }
    ],
    "21652": [
      "2021-12-08T09:56:18.118132083Z",
      {
        "meanBytesSent": 0,
        "requests": 0,
        "bytesSent": 0
      }
    ]
  }
]

Here we have collected stats from the two Nginx worker processes with PIDs 21651 and 21652. The timestamps show when the stats was updated the last time. The topmost timestamp shows the time of the latest purge event. The data itself have only zeros as soon we have made no request to the main server so far. Let's run 100 simultaneous requests and look at the stats (it should update at worst in 5 seconds after running them).

$ for i in {1..100} ; do curl 'http://127.0.0.1:8010/' & done

Wait 5 seconds...

$ curl -s 'http://127.0.0.1:8020/' | jq
[
  "2021-12-08T09:56:18.118132083Z",
  {
    "21651": [
      "2021-12-08T09:56:48.159263993Z",
      {
        "meanBytesSent": 183,
        "requests": 84,
        "bytesSent": 15372
      }
    ],
    "21652": [
      "2021-12-08T09:56:48.136934713Z",
      {
        "meanBytesSent": 183,
        "requests": 16,
        "bytesSent": 2928
      }
    ]
  }
]

data AggregateServerConf Source #

Configuration of an aggregate service.

This type is exported because Template Haskell requires that. Though its only constructor AggregateServerConf is not exported, it is still reachable from Nginx configuration files. Below is definition of the constructor.

    AggregateServerConf { asPort          :: Int
                        , asPurgeInterval :: TimeInterval
                        }

The value of asPort corresponds to the TCP port of the internal aggregate server (the IP address of the internal server is always 127.0.0.1). The asPurgeInterval is the purge interval. An aggregate service should sometimes purge data from worker processes which have not reported for a long time. For example, it makes no sense to keep data from workers that have already been terminated. The inactive PIDs get checked every asPurgeInterval, and data which correspond to PIDs with timestamps older than asPurgeInterval get removed.

ngxExportAggregateService Source #

Arguments

:: String

Name of the service

-> Name

Name of the aggregate type

-> Q [Dec] 

Exports a simple aggregate service with specified name and the aggregate type.

The name of the service can be chosen arbitrarily, however it must be exactly referred from reportAggregate and client requests to the service because the URL of the internal HTTP server contains this.

The aggregate type must have instances of FromJSON and ToJSON as its objects will be transferred via HTTP in JSON format.

The service is implemented via ngxExportSimpleServiceTyped with AggregateServerConf as the name of its custom type. This is an ignitionService with an HTTP server based on the Snap framework running inside. The internal HTTP server collects data from worker processes at URL /put/<name_of_the_service> and reports data at URL /get/<name_of_the_service>.

Nginx-based aggregate service

Service simpleService_aggregate_stats was implemented using Snap framework. Basically, a native Nginx implementation is not easy because the service must listen on a single (not duplicated) file descriptor which is not the case when Nginx spawns more than one worker processes. Running simpleService_aggregate_stats as a shared service is an elegant solution as shared services guarantee that they occupy only one worker at a time. However, nginx-haskell-module provides directive single_listener which can be used to apply the required restriction in a custom Nginx virtual server. This directive requires that the virtual server listens with option reuseport and is only available on Linux with socket option SO_ATTACH_REUSEPORT_CBPF.

Exporter ngxExportAggregateService exports additional handlers to build a native Nginx-based aggregate service. Let's replace service simpleService_aggregate_stats from the previous example with such a native Nginx-based aggregate service using single_listener and listening on port 8100.

File nginx.conf

user                    nobody;
worker_processes        2;

events {
    worker_connections  1024;
}

http {
    default_type        application/octet-stream;
    sendfile            on;

    haskell load /var/lib/nginx/test_tools_extra_aggregate.so;

    haskell_run_service simpleService_reportStats $hs_reportStats 8100;

    haskell_var_empty_on_error $hs_stats;

    server {
        listen       8010;
        server_name  main;
        error_log    /tmp/nginx-test-haskell-error.log;
        access_log   /tmp/nginx-test-haskell-access.log;

        haskell_run updateStats !$hs_updateStats $bytes_sent;

        location / {
            echo Ok;
        }
    }

    server {
        listen       8020;
        server_name  stat;

        location / {
            allow 127.0.0.1;
            deny all;
            proxy_pass http://127.0.0.1:8100/get/stats;
        }
    }

    server {
        listen          8100 reuseport;
        server_name     stats;

        single_listener on;

        location /put/stats {
            haskell_run_async_on_request_body receiveAggregate_stats
                    $hs_stats "Min 1";

            if ($hs_stats = '') {
                return 400;
            }

            return 200;
        }

        location /get/stats {
            haskell_async_content sendAggregate_stats noarg;
        }
    }
}

Handler receiveAggregate_stats accepts a time interval corresponding to the value of asPurgeInterval from service simpleService_aggregate_stats. If the value is not readable (say, noarg) then it is defaulted to Min 5.

Notice that the stats server must listen on IP address 127.0.0.1 because reportAggregate (being the base of service simpleService_reportStats) reports stats to this address.

The worker-side reporter

reportAggregate Source #

Arguments

:: ToJSON a 
=> Int

Port of the aggregate server

-> Maybe a

Reported data

-> ByteString

Name of the aggregate service

-> IO () 

Reports data to an aggregate service.

If reported data is Nothing then data collected on the aggregate service won't alter except that the timestamp associated with the PID of the sending worker process will be updated.

Re-exported data constructors from Foreign.C

Re-exports are needed by the exporter for marshalling in foreign calls.

newtype CInt #

Haskell type representing the C int type. (The concrete types of Foreign.C.Types are platform-specific.)

Constructors

CInt Int32 

Instances

Instances details
Storable CInt 
Instance details

Defined in Foreign.C.Types

Methods

sizeOf :: CInt -> Int #

alignment :: CInt -> Int #

peekElemOff :: Ptr CInt -> Int -> IO CInt #

pokeElemOff :: Ptr CInt -> Int -> CInt -> IO () #

peekByteOff :: Ptr b -> Int -> IO CInt #

pokeByteOff :: Ptr b -> Int -> CInt -> IO () #

peek :: Ptr CInt -> IO CInt #

poke :: Ptr CInt -> CInt -> IO () #

Bits CInt 
Instance details

Defined in Foreign.C.Types

FiniteBits CInt 
Instance details

Defined in Foreign.C.Types

Bounded CInt 
Instance details

Defined in Foreign.C.Types

Enum CInt 
Instance details

Defined in Foreign.C.Types

Methods

succ :: CInt -> CInt #

pred :: CInt -> CInt #

toEnum :: Int -> CInt #

fromEnum :: CInt -> Int #

enumFrom :: CInt -> [CInt] #

enumFromThen :: CInt -> CInt -> [CInt] #

enumFromTo :: CInt -> CInt -> [CInt] #

enumFromThenTo :: CInt -> CInt -> CInt -> [CInt] #

Ix CInt 
Instance details

Defined in Foreign.C.Types

Methods

range :: (CInt, CInt) -> [CInt] #

index :: (CInt, CInt) -> CInt -> Int #

unsafeIndex :: (CInt, CInt) -> CInt -> Int #

inRange :: (CInt, CInt) -> CInt -> Bool #

rangeSize :: (CInt, CInt) -> Int #

unsafeRangeSize :: (CInt, CInt) -> Int #

Num CInt 
Instance details

Defined in Foreign.C.Types

Methods

(+) :: CInt -> CInt -> CInt #

(-) :: CInt -> CInt -> CInt #

(*) :: CInt -> CInt -> CInt #

negate :: CInt -> CInt #

abs :: CInt -> CInt #

signum :: CInt -> CInt #

fromInteger :: Integer -> CInt #

Read CInt 
Instance details

Defined in Foreign.C.Types

Integral CInt 
Instance details

Defined in Foreign.C.Types

Methods

quot :: CInt -> CInt -> CInt #

rem :: CInt -> CInt -> CInt #

div :: CInt -> CInt -> CInt #

mod :: CInt -> CInt -> CInt #

quotRem :: CInt -> CInt -> (CInt, CInt) #

divMod :: CInt -> CInt -> (CInt, CInt) #

toInteger :: CInt -> Integer #

Real CInt 
Instance details

Defined in Foreign.C.Types

Methods

toRational :: CInt -> Rational #

Show CInt 
Instance details

Defined in Foreign.C.Types

Methods

showsPrec :: Int -> CInt -> ShowS #

show :: CInt -> String #

showList :: [CInt] -> ShowS #

Subtractive CInt 
Instance details

Defined in Basement.Numerical.Subtractive

Associated Types

type Difference CInt #

Methods

(-) :: CInt -> CInt -> Difference CInt #

NFData CInt

Since: deepseq-1.4.0.0

Instance details

Defined in Control.DeepSeq

Methods

rnf :: CInt -> () #

Eq CInt 
Instance details

Defined in Foreign.C.Types

Methods

(==) :: CInt -> CInt -> Bool #

(/=) :: CInt -> CInt -> Bool #

Ord CInt 
Instance details

Defined in Foreign.C.Types

Methods

compare :: CInt -> CInt -> Ordering #

(<) :: CInt -> CInt -> Bool #

(<=) :: CInt -> CInt -> Bool #

(>) :: CInt -> CInt -> Bool #

(>=) :: CInt -> CInt -> Bool #

max :: CInt -> CInt -> CInt #

min :: CInt -> CInt -> CInt #

Wrapped CInt 
Instance details

Defined in Control.Lens.Wrapped

Associated Types

type Unwrapped CInt #

Uniform CInt 
Instance details

Defined in System.Random.Internal

Methods

uniformM :: StatefulGen g m => g -> m CInt #

UniformRange CInt 
Instance details

Defined in System.Random.Internal

Methods

uniformRM :: StatefulGen g m => (CInt, CInt) -> g -> m CInt #

Rewrapped CInt t 
Instance details

Defined in Control.Lens.Wrapped

type Difference CInt 
Instance details

Defined in Basement.Numerical.Subtractive

type Unwrapped CInt 
Instance details

Defined in Control.Lens.Wrapped

newtype CUInt #

Haskell type representing the C unsigned int type. (The concrete types of Foreign.C.Types are platform-specific.)

Constructors

CUInt Word32 

Instances

Instances details
Storable CUInt 
Instance details

Defined in Foreign.C.Types

Methods

sizeOf :: CUInt -> Int #

alignment :: CUInt -> Int #

peekElemOff :: Ptr CUInt -> Int -> IO CUInt #

pokeElemOff :: Ptr CUInt -> Int -> CUInt -> IO () #

peekByteOff :: Ptr b -> Int -> IO CUInt #

pokeByteOff :: Ptr b -> Int -> CUInt -> IO () #

peek :: Ptr CUInt -> IO CUInt #

poke :: Ptr CUInt -> CUInt -> IO () #

Bits CUInt 
Instance details

Defined in Foreign.C.Types

FiniteBits CUInt 
Instance details

Defined in Foreign.C.Types

Bounded CUInt 
Instance details

Defined in Foreign.C.Types

Enum CUInt 
Instance details

Defined in Foreign.C.Types

Ix CUInt 
Instance details

Defined in Foreign.C.Types

Num CUInt 
Instance details

Defined in Foreign.C.Types

Read CUInt 
Instance details

Defined in Foreign.C.Types

Integral CUInt 
Instance details

Defined in Foreign.C.Types

Real CUInt 
Instance details

Defined in Foreign.C.Types

Methods

toRational :: CUInt -> Rational #

Show CUInt 
Instance details

Defined in Foreign.C.Types

Methods

showsPrec :: Int -> CUInt -> ShowS #

show :: CUInt -> String #

showList :: [CUInt] -> ShowS #

Subtractive CUInt 
Instance details

Defined in Basement.Numerical.Subtractive

Associated Types

type Difference CUInt #

Methods

(-) :: CUInt -> CUInt -> Difference CUInt #

NFData CUInt

Since: deepseq-1.4.0.0

Instance details

Defined in Control.DeepSeq

Methods

rnf :: CUInt -> () #

Eq CUInt 
Instance details

Defined in Foreign.C.Types

Methods

(==) :: CUInt -> CUInt -> Bool #

(/=) :: CUInt -> CUInt -> Bool #

Ord CUInt 
Instance details

Defined in Foreign.C.Types

Methods

compare :: CUInt -> CUInt -> Ordering #

(<) :: CUInt -> CUInt -> Bool #

(<=) :: CUInt -> CUInt -> Bool #

(>) :: CUInt -> CUInt -> Bool #

(>=) :: CUInt -> CUInt -> Bool #

max :: CUInt -> CUInt -> CUInt #

min :: CUInt -> CUInt -> CUInt #

Wrapped CUInt 
Instance details

Defined in Control.Lens.Wrapped

Associated Types

type Unwrapped CUInt #

Uniform CUInt 
Instance details

Defined in System.Random.Internal

Methods

uniformM :: StatefulGen g m => g -> m CUInt #

UniformRange CUInt 
Instance details

Defined in System.Random.Internal

Methods

uniformRM :: StatefulGen g m => (CUInt, CUInt) -> g -> m CUInt #

Rewrapped CUInt t 
Instance details

Defined in Control.Lens.Wrapped

type Difference CUInt 
Instance details

Defined in Basement.Numerical.Subtractive

type Unwrapped CUInt 
Instance details

Defined in Control.Lens.Wrapped