diff --git a/debian/control b/debian/control index cb635b0..df4ee86 100644 --- a/debian/control +++ b/debian/control @@ -31,6 +31,7 @@ Depends: python3-django, python3-markdown, python3-pil, + python3-prometheus-client, python3-psycopg2, python3-tz, python3-requests, diff --git a/setup.py b/setup.py index eac576f..73d32ba 100755 --- a/setup.py +++ b/setup.py @@ -19,6 +19,7 @@ 'Django == 1.11.*, >= 1.11.19', 'Markdown', 'Pillow', + 'prometheus_client', 'pytz', 'requests', ], diff --git a/src/ctf_gameserver/checker/master.py b/src/ctf_gameserver/checker/master.py index 22c6090..aa34b11 100644 --- a/src/ctf_gameserver/checker/master.py +++ b/src/ctf_gameserver/checker/master.py @@ -2,6 +2,7 @@ import datetime import logging import math +import multiprocessing import os import signal import time @@ -17,7 +18,7 @@ from ctf_gameserver.lib.exceptions import DBDataError import ctf_gameserver.lib.flag as flag_lib -from . import database +from . import database, metrics from .supervisor import RunnerSupervisor from .supervisor import ACTION_FLAG, ACTION_LOAD, ACTION_STORE, ACTION_RESULT @@ -29,6 +30,8 @@ def main(): help='(Old-style) Python formatstring for building the IP to connect to') arg_parser.add_argument('--flagsecret', type=str, required=True, help='Base64 string used as secret in flag generation') + arg_parser.add_argument('--metrics-listen', type=str, help='Expose Prometheus metrics via HTTP ' + '(":")') group = arg_parser.add_argument_group('statedb', 'Checker state database') group.add_argument('--statedbhost', type=str, help='Hostname of the database. If unspecified, the ' @@ -98,6 +101,33 @@ def main(): return os.EX_USAGE logging_params['gelf'] = {'host': gelf_host, 'port': gelf_port} + # Configure metrics + if args.metrics_listen is not None: + metrics_listen = urllib.parse.urlsplit('//' + args.metrics_listen) + metrics_host = metrics_listen.hostname + metrics_port = metrics_listen.port + if metrics_host is None or metrics_port is None: + logging.error('Metrics listen address needs to be specified as ":"') + return os.EX_USAGE + + metrics_queue = multiprocessing.Queue() + metrics_recv, metrics_send = multiprocessing.Pipe() + metrics_collector_process = multiprocessing.Process( + target=metrics.run_collector, + args=(args.service, metrics.checker_metrics_factory, metrics_queue, metrics_send) + ) + metrics_collector_process.start() + metrics_server_process = multiprocessing.Process( + target=metrics.run_http_server, + args=(metrics_host, metrics_port, metrics_queue, metrics_recv) + ) + metrics_server_process.start() + + metrics.set(metrics_queue, 'interval_length_seconds', args.interval) + metrics.set(metrics_queue, 'start_timestamp', time.time()) + else: + metrics_queue = metrics.DummyQueue() + flag_secret = base64.b64decode(args.flagsecret) # Connect to databases @@ -159,7 +189,7 @@ def main(): try: master_loop = MasterLoop(game_db_conn, state_db_conn, args.service, args.checkerscript, args.sudouser, args.stddeviations, args.checkercount, args.interval, - args.ippattern, flag_secret, logging_params) + args.ippattern, flag_secret, logging_params, metrics_queue) break except DBDataError as e: logging.warning('Waiting for valid database state: %s', e) @@ -177,13 +207,18 @@ def sigterm_handler(_, __): if master_loop.shutting_down and master_loop.get_running_script_count() == 0: break + metrics_server_process.terminate() + metrics_collector_process.terminate() + metrics_server_process.join() + metrics_collector_process.join() + return os.EX_OK class MasterLoop: def __init__(self, game_db_conn, state_db_conn, service_slug, checker_script, sudo_user, std_dev_count, - checker_count, interval, ip_pattern, flag_secret, logging_params): + checker_count, interval, ip_pattern, flag_secret, logging_params, metrics_queue): self.game_db_conn = game_db_conn self.state_db_conn = state_db_conn self.checker_script = checker_script @@ -194,12 +229,13 @@ def __init__(self, game_db_conn, state_db_conn, service_slug, checker_script, su self.ip_pattern = ip_pattern self.flag_secret = flag_secret self.logging_params = logging_params + self.metrics_queue = metrics_queue self.refresh_control_info() self.service = database.get_service_attributes(self.game_db_conn, service_slug) self.service['slug'] = service_slug - self.supervisor = RunnerSupervisor() + self.supervisor = RunnerSupervisor(metrics_queue) self.known_tick = -1 # Trigger launch of tasks in first step() self.last_launch = get_monotonic_time() - self.interval @@ -254,6 +290,10 @@ def step(self): if not self.shutting_down: # Launch new tasks and catch up missed intervals while get_monotonic_time() - self.last_launch >= self.interval: + delay = get_monotonic_time() - self.last_launch - self.interval + metrics.observe(self.metrics_queue, 'task_launch_delay_seconds', delay) + metrics.set(self.metrics_queue, 'last_launch_timestamp', time.time()) + self.last_launch += self.interval self.launch_tasks() @@ -304,6 +344,7 @@ def handle_result_request(self, task_info, param): logging.info('Result from Checker Script for team %d (net number %d) in tick %d: %s', task_info['_team_id'], task_info['team'], task_info['tick'], check_result) + metrics.inc(self.metrics_queue, 'completed_tasks', labels={'result': check_result.name}) database.commit_result(self.game_db_conn, self.service['id'], task_info['team'], task_info['tick'], result) @@ -372,6 +413,8 @@ def update_launch_params(self, tick): self.tasks_per_launch = math.ceil(local_tasks / intervals_per_timeframe) logging.info('Planning to start %d tasks per interval with a maximum duration of %d seconds', self.tasks_per_launch, check_duration) + metrics.set(self.metrics_queue, 'tasks_per_launch_count', self.tasks_per_launch) + metrics.set(self.metrics_queue, 'max_task_duration_seconds', check_duration) def get_running_script_count(self): return len(self.supervisor.processes) diff --git a/src/ctf_gameserver/checker/metrics.py b/src/ctf_gameserver/checker/metrics.py new file mode 100644 index 0000000..3151221 --- /dev/null +++ b/src/ctf_gameserver/checker/metrics.py @@ -0,0 +1,187 @@ +import logging +import queue +from wsgiref import simple_server + +import prometheus_client + + +def inc(metrics_queue, name, value=1, labels=None): + + metrics_queue.put(MetricsMessage(name, 'inc', value, labels)) + + +def dec(metrics_queue, name, value=1, labels=None): + + metrics_queue.put(MetricsMessage(name, 'dec', value, labels)) + + +def set(metrics_queue, name, value, labels=None): # pylint: disable=redefined-builtin + + metrics_queue.put(MetricsMessage(name, 'set', value, labels)) + + +def observe(metrics_queue, name, value, labels=None): + + metrics_queue.put(MetricsMessage(name, 'observe', value, labels)) + + +class MetricsMessage: + """ + Message to put into run_collector()'s queue for recording metric changes. + """ + + def __init__(self, name, instruction, value, labels=None): + self.name = name + self.instruction = instruction + self.value = value + + if labels is None: + self.labels = {} + else: + self.labels = labels + + +class HTTPGenMessage: + """ + Message to put into run_collector()'s queue for receiving a text representation of its metrics (for HTTP + export) through its pipe. + """ + + +def checker_metrics_factory(registry): + + metrics = {} + metric_prefix = 'ctf_checkermaster_' + + counters = [ + ('started_tasks', 'Number of started Checker Script instances', []), + ('completed_tasks', 'Number of successfully completed checks', ['result']), + ('terminated_tasks', 'Number of Checker Script instances forcibly terminated', []) + ] + for name, doc, labels in counters: + metrics[name] = prometheus_client.Counter(metric_prefix+name, doc, labels+['service'], + registry=registry) + + gauges = [ + ('start_timestamp', '(Unix timestamp when the process was started', []), + ('interval_length_seconds', 'Configured launch interval length', []), + ('last_launch_timestamp', '(Unix) timestamp when tasks were launched the last time', []), + ('tasks_per_launch_count', 'Number of checks to start in one launch interval', []), + ('max_task_duration_seconds', 'Currently estimated maximum runtime of one check', []) + ] + for name, doc, labels in gauges: + metrics[name] = prometheus_client.Gauge(metric_prefix+name, doc, labels+['service'], + registry=registry) + + histograms = [ + ('task_launch_delay_seconds', 'Differences between supposed and actual task launch times', [], + (0.01, 0.03, 0.05, 0.1, 0.3, 0.5, 1, 3, 5, 10, 30, 60, float('inf'))), + ('script_duration_seconds', 'Observed runtimes of Checker Scripts', [], + (1, 3, 5, 8, 10, 20, 30, 45, 60, 90, 120, 150, 180, 240, 300, float('inf'))) + ] + for name, doc, labels, buckets in histograms: + metrics[name] = prometheus_client.Histogram(metric_prefix+name, doc, labels+['service'], + buckets=buckets, registry=registry) + + return metrics + + +def run_collector(service, metrics_factory, in_queue, pipe_to_server): + """ + Manages Prometheus metrics. Receives changes to the metrics through a queue and emits their text + representation (for HTTP export) over a pipe. Designed to be run as "target" in a multiprocessing.Process + in conjunction with run_http_server(). + + Args: + service: Slug of this checker instance's service. + metrics_factory: Callable returning a dict of the mtrics to use mapping from name to Metric object. + in_queue: Queue over which MetricsMessages and HTTPGenMessages are received. + pipe_to_server: Pipe to which text representations of the metrics are sent in response to + HTTPGenMessages. + """ + + registry = prometheus_client.CollectorRegistry() + metrics = metrics_factory(registry) + + def handle_metrics_message(msg): + try: + metric = metrics[msg.name] + except KeyError: + logging.error('Recevied message for unknown metric "%s", ignoring', msg.name) + return + + # Apparently, there is no nicer way to access the label names + if 'service' in metric._labelnames: # pylint: disable=protected-access + msg.labels['service'] = service + if len(msg.labels) > 0: + try: + metric = metric.labels(**(msg.labels)) + except ValueError: + logging.error('Invalid labels specified for metric "%s", ignoring', msg.name) + return + + try: + bound_method = getattr(metric, msg.instruction) + except AttributeError: + logging.error('Cannot use instruction "%s" on metric "%s", ignoring', msg.instruction, msg.name) + return + try: + bound_method(msg.value) + except: # noqa, pylint: disable=bare-except + logging.exception('Could not update metric "%s":', msg.name) + + def send_metrics_text(): + metrics_text = prometheus_client.generate_latest(registry) + pipe_to_server.send(metrics_text) + + while True: + message = in_queue.get(True) + if isinstance(message, MetricsMessage): + handle_metrics_message(message) + elif isinstance(message, HTTPGenMessage): + send_metrics_text() + else: + logging.error('Received unknown message on collector queue') + + +def run_http_server(host, port, queue_to_collector, pipe_from_collector): + """ + Runs a server exposing Prometheus metrics via HTTP. The metrics are requested through a HTTPGenMessage + and received over the pipe. Designed to be run as "target" in a multiprocessing.Process in conjunction + with run_collector(). + + Args: + host: Host to run the HTTP server on. + port: Port to run the HTTP server on. + queue_to_collector: Queue to which HTTPGenMessages are sent. + pipe_from_collector: Pipe from which text representations of the metrics are received. + """ + + def app(_, start_response): + queue_to_collector.put(HTTPGenMessage()) + output = pipe_from_collector.recv() + + status = '200 OK' + headers = [ + ('Content-Type', prometheus_client.CONTENT_TYPE_LATEST) + ] + start_response(status, headers) + return [output] + + class SilentHandler(simple_server.WSGIRequestHandler): + def log_message(self, _, *args): + """ + Doesn't log anything. + """ + + http_server = simple_server.make_server(host, port, app, handler_class=SilentHandler) + http_server.serve_forever() + + +class DummyQueue(queue.Queue): + """ + Queue that discards all elements put into it. + """ + + def put(self, item, block=True, timeout=None): + pass diff --git a/src/ctf_gameserver/checker/supervisor.py b/src/ctf_gameserver/checker/supervisor.py index 08291ec..2b33ddd 100644 --- a/src/ctf_gameserver/checker/supervisor.py +++ b/src/ctf_gameserver/checker/supervisor.py @@ -11,6 +11,8 @@ from ctf_gameserver.lib.checkresult import CheckResult +from . import metrics + ACTION_FLAG = 'FLAG' ACTION_LOAD = 'LOAD' @@ -34,7 +36,9 @@ class RunnerSupervisor: Launches Checker Script Runners as individual processes and takes care of communicating with them. """ - def __init__(self): + def __init__(self, metrics_queue): + self.metrics_queue = metrics_queue + # Timeout if there are no requests when all Runners are done or blocking self.queue_timeout = 1 self._reset() @@ -42,6 +46,7 @@ def __init__(self): def _reset(self): self.work_queue = multiprocessing.Queue() self.processes = {} + self.start_times = {} self.next_identifier = 0 def start_runner(self, args, sudo_user, info, logging_params): @@ -51,14 +56,17 @@ def start_runner(self, args, sudo_user, info, logging_params): logging_params, self.next_identifier, self.work_queue, receive)) self.processes[self.next_identifier] = (proc, send, info) + self.start_times[self.next_identifier] = time.monotonic() proc.start() self.next_identifier += 1 + metrics.inc(self.metrics_queue, 'started_tasks') def terminate_runner(self, runner_id): logging.info('Terminating Runner process, info: %s', self.processes[runner_id][2]) self.processes[runner_id][0].terminate() # Afterwards, get_request() will join the child and remove it from `self.processes` + metrics.inc(self.metrics_queue, 'terminated_tasks') def terminate_runners(self): if len(self.processes) > 0: @@ -82,9 +90,14 @@ def get_request(self): # Join all terminated child processes if action == ACTION_RUNNER_EXIT: + duration = time.monotonic() - self.start_times[runner_id] + metrics.observe(self.metrics_queue, 'script_duration_seconds', duration) + del self.start_times[runner_id] + proc = self.processes[runner_id][0] proc.join() del self.processes[runner_id] + if self.work_queue.empty(): return None else: diff --git a/tests/checker/test_integration.py b/tests/checker/test_integration.py index 009cd9f..a24df0d 100644 --- a/tests/checker/test_integration.py +++ b/tests/checker/test_integration.py @@ -8,6 +8,7 @@ import time from ctf_gameserver.checker.master import MasterLoop +from ctf_gameserver.checker.metrics import DummyQueue from ctf_gameserver.lib.checkresult import CheckResult from ctf_gameserver.lib.database import transaction_cursor from ctf_gameserver.lib.test_util import DatabaseTestCase @@ -41,7 +42,7 @@ def test_basic(self, monotonic_mock): monotonic_mock.return_value = 10 master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, None, - 2, 1, 10, '0.0.%s.1', b'secret', {}) + 2, 1, 10, '0.0.%s.1', b'secret', {}, DummyQueue()) master_loop.supervisor.queue_timeout = 0.01 # Sanity check before any tick @@ -91,7 +92,7 @@ def test_missing_checkerscript(self, monotonic_mock): monotonic_mock.return_value = 10 master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, None, - 2, 1, 10, '0.0.%s.1', b'secret', {}) + 2, 1, 10, '0.0.%s.1', b'secret', {}, DummyQueue()) with transaction_cursor(self.connection) as cursor: cursor.execute('UPDATE scoring_gamecontrol SET start=NOW()') @@ -121,7 +122,7 @@ def test_exception(self, monotonic_mock): monotonic_mock.return_value = 10 master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, None, - 2, 1, 10, '0.0.%s.1', b'secret', {}) + 2, 1, 10, '0.0.%s.1', b'secret', {}, DummyQueue()) with transaction_cursor(self.connection) as cursor: cursor.execute('UPDATE scoring_gamecontrol SET start=NOW()') @@ -151,7 +152,7 @@ def test_down(self, monotonic_mock): monotonic_mock.return_value = 10 master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, None, - 2, 1, 10, '0.0.%s.1', b'secret', {}) + 2, 1, 10, '0.0.%s.1', b'secret', {}, DummyQueue()) with transaction_cursor(self.connection) as cursor: cursor.execute('UPDATE scoring_gamecontrol SET start=NOW()') @@ -187,7 +188,7 @@ def test_unfinished(self, monotonic_mock, warning_mock): monotonic_mock.return_value = 10 master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, None, - 2, 1, 10, '0.0.%s.1', b'secret', {}) + 2, 1, 10, '0.0.%s.1', b'secret', {}, DummyQueue()) with transaction_cursor(self.connection) as cursor: cursor.execute('UPDATE scoring_gamecontrol SET start=NOW()') @@ -246,7 +247,7 @@ def test_multi_teams_ticks(self, monotonic_mock): monotonic_mock.return_value = 10 master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, None, - 2, 1, 10, '0.0.%s.1', b'secret', {}) + 2, 1, 10, '0.0.%s.1', b'secret', {}, DummyQueue()) # Tick 0 with transaction_cursor(self.connection) as cursor: @@ -338,7 +339,7 @@ def test_state(self, monotonic_mock): monotonic_mock.return_value = 10 master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, None, - 2, 1, 10, '0.0.%s.1', b'secret', {}) + 2, 1, 10, '0.0.%s.1', b'secret', {}, DummyQueue()) with transaction_cursor(self.state_db_conn) as cursor: # Prepopulate state for the non-checked service to ensure we'll never get this data returned @@ -413,7 +414,7 @@ def test_shutdown(self, monotonic_mock): monotonic_mock.return_value = 10 master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, None, - 2, 1, 10, '0.0.%s.1', b'secret', {}) + 2, 1, 10, '0.0.%s.1', b'secret', {}, DummyQueue()) with transaction_cursor(self.connection) as cursor: cursor.execute('UPDATE scoring_gamecontrol SET start=NOW()') @@ -442,7 +443,7 @@ def test_sudo(self, monotonic_mock): monotonic_mock.return_value = 10 master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, - 'ctf-checkerrunner', 2, 1, 10, '0.0.%s.1', b'secret', {}) + 'ctf-checkerrunner', 2, 1, 10, '0.0.%s.1', b'secret', {}, DummyQueue()) with transaction_cursor(self.connection) as cursor: cursor.execute('UPDATE scoring_gamecontrol SET start=NOW()') @@ -482,7 +483,7 @@ def test_sudo_unfinished(self, monotonic_mock, warning_mock): monotonic_mock.return_value = 10 master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, - 'ctf-checkerrunner', 2, 1, 10, '0.0.%s.1', b'secret', {}) + 'ctf-checkerrunner', 2, 1, 10, '0.0.%s.1', b'secret', {}, DummyQueue()) with transaction_cursor(self.connection) as cursor: cursor.execute('UPDATE scoring_gamecontrol SET start=NOW()') diff --git a/tests/checker/test_master.py b/tests/checker/test_master.py index 11995eb..eb5e019 100644 --- a/tests/checker/test_master.py +++ b/tests/checker/test_master.py @@ -2,6 +2,7 @@ from unittest.mock import patch from ctf_gameserver.checker.master import MasterLoop +from ctf_gameserver.checker.metrics import DummyQueue from ctf_gameserver.lib.checkresult import CheckResult from ctf_gameserver.lib.database import transaction_cursor from ctf_gameserver.lib.test_util import DatabaseTestCase @@ -13,7 +14,7 @@ class MasterTest(DatabaseTestCase): def setUp(self): self.master_loop = MasterLoop(self.connection, None, 'service1', '/dev/null', None, 2, 8, 10, - '0.0.%s.1', b'secret', {}) + '0.0.%s.1', b'secret', {}, DummyQueue()) def test_handle_flag_request(self): with transaction_cursor(self.connection) as cursor: diff --git a/tests/checker/test_metrics.py b/tests/checker/test_metrics.py new file mode 100644 index 0000000..62b91be --- /dev/null +++ b/tests/checker/test_metrics.py @@ -0,0 +1,156 @@ +import multiprocessing +import time +from unittest import TestCase + +import prometheus_client +import requests + +from ctf_gameserver.checker import metrics + + +# Support terminate() on multiprocessing.Process with pytest-cov, see +# https://pytest-cov.readthedocs.io/en/v2.10.0/subprocess-support.html#if-you-use-multiprocessing-process +try: + from pytest_cov.embed import cleanup_on_sigterm +except ImportError: + pass +else: + cleanup_on_sigterm() + + +class MetricsTest(TestCase): + + metrics_url = 'http://127.0.0.1:9002/metrics' + + def setUp(self): + def metrics_factory(registry): + return { + 'plain_gauge': prometheus_client.Gauge('plain_gauge', 'Simple gauge', registry=registry), + 'instance_gauge': prometheus_client.Gauge('instance_gauge', 'Gauge with custom label', + ['instance'], registry=registry), + 'service_gauge': prometheus_client.Gauge('service_gauge', 'Gauge with "service" label', + ['service'], registry=registry), + 'counter': prometheus_client.Counter('counter', 'Simple counter', registry=registry), + 'summary': prometheus_client.Summary('summary', 'Simple summary', registry=registry), + 'histogram': prometheus_client.Histogram('histogram', 'Histogram with custom and "service" ' + 'labels', ['instance', 'service'], + registry=registry) + } + + self.queue = multiprocessing.Queue() + recv, send = multiprocessing.Pipe() + + self.collector_process = multiprocessing.Process(target=metrics.run_collector, + args=('test', metrics_factory, self.queue, send)) + self.collector_process.start() + self.http_server_process = multiprocessing.Process(target=metrics.run_http_server, + args=('127.0.0.1', 9002, self.queue, recv)) + self.http_server_process.start() + + # Wait for server start-up to avoid race conditions + retries = 0 + while True: + if retries >= 100: + raise Exception('Metrics server did not start up') + try: + requests.get(self.metrics_url) + except requests.ConnectionError: + retries += 1 + time.sleep(0.1) + else: + break + + def tearDown(self): + self.http_server_process.terminate() + self.collector_process.terminate() + self.http_server_process.join() + self.collector_process.join() + + def test_gauge(self): + metrics.set(self.queue, 'plain_gauge', 42) + resp = requests.get(self.metrics_url) + self.assertEqual(resp.status_code, 200) + self.assertIn('plain_gauge 42.0', resp.text) + + metrics.inc(self.queue, 'plain_gauge') + resp = requests.get(self.metrics_url) + self.assertEqual(resp.status_code, 200) + self.assertIn('plain_gauge 43.0', resp.text) + + metrics.dec(self.queue, 'plain_gauge', 1.5) + resp = requests.get(self.metrics_url) + self.assertEqual(resp.status_code, 200) + self.assertIn('plain_gauge 41.5', resp.text) + + def test_custom_label(self): + metrics.set(self.queue, 'instance_gauge', 42, {'instance': 1}) + metrics.set(self.queue, 'instance_gauge', 13.37, {'instance': 2}) + + resp = requests.get(self.metrics_url) + self.assertEqual(resp.status_code, 200) + self.assertIn('instance_gauge{instance="1"} 42.0', resp.text) + self.assertIn('instance_gauge{instance="2"} 13.37', resp.text) + + def test_service_label(self): + metrics.set(self.queue, 'service_gauge', 23) + + resp = requests.get(self.metrics_url) + self.assertEqual(resp.status_code, 200) + self.assertIn('service_gauge{service="test"} 23.0', resp.text) + + def test_counter(self): + metrics.inc(self.queue, 'counter') + resp = requests.get(self.metrics_url) + self.assertEqual(resp.status_code, 200) + self.assertIn('counter_total 1.0', resp.text) + + metrics.inc(self.queue, 'counter') + resp = requests.get(self.metrics_url) + self.assertEqual(resp.status_code, 200) + self.assertIn('counter_total 2.0', resp.text) + + metrics.inc(self.queue, 'counter', 0) + resp = requests.get(self.metrics_url) + self.assertEqual(resp.status_code, 200) + self.assertIn('counter_total 2.0', resp.text) + + def test_multiple(self): + metrics.set(self.queue, 'plain_gauge', 1337) + metrics.set(self.queue, 'instance_gauge', 23, {'instance': 1}) + metrics.set(self.queue, 'service_gauge', 42) + metrics.inc(self.queue, 'counter') + + resp = requests.get(self.metrics_url) + self.assertEqual(resp.status_code, 200) + self.assertIn('plain_gauge 1337.0', resp.text) + self.assertIn('instance_gauge{instance="1"} 23.0', resp.text) + self.assertIn('service_gauge{service="test"} 42.0', resp.text) + self.assertIn('counter_total 1.0', resp.text) + + def test_summary(self): + metrics.observe(self.queue, 'summary', 10) + resp = requests.get(self.metrics_url) + self.assertEqual(resp.status_code, 200) + self.assertIn('summary_count 1.0', resp.text) + self.assertIn('summary_sum 10.0', resp.text) + + metrics.observe(self.queue, 'summary', 20) + resp = requests.get(self.metrics_url) + self.assertEqual(resp.status_code, 200) + self.assertIn('summary_count 2.0', resp.text) + self.assertIn('summary_sum 30.0', resp.text) + + def test_histogram(self): + metrics.observe(self.queue, 'histogram', 0.02, {'instance': 3}) + resp = requests.get(self.metrics_url) + self.assertEqual(resp.status_code, 200) + self.assertIn('histogram_bucket{instance="3",le="0.01",service="test"} 0.0', resp.text) + self.assertIn('histogram_bucket{instance="3",le="0.025",service="test"} 1.0', resp.text) + self.assertIn('histogram_bucket{instance="3",le="10.0",service="test"} 1.0', resp.text) + + metrics.observe(self.queue, 'histogram', 0.5, {'instance': 3}) + resp = requests.get(self.metrics_url) + self.assertEqual(resp.status_code, 200) + self.assertIn('histogram_bucket{instance="3",le="0.25",service="test"} 1.0', resp.text) + self.assertIn('histogram_bucket{instance="3",le="0.5",service="test"} 2.0', resp.text) + self.assertIn('histogram_bucket{instance="3",le="10.0",service="test"} 2.0', resp.text)