Skip to content

Commit

Permalink
Checker: Collect and provide metrics in Prometheus format
Browse files Browse the repository at this point in the history
Improves: #30
  • Loading branch information
F30 committed Oct 5, 2020
1 parent 57b289c commit a0a3aa7
Show file tree
Hide file tree
Showing 8 changed files with 419 additions and 16 deletions.
1 change: 1 addition & 0 deletions debian/control
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Depends:
python3-django,
python3-markdown,
python3-pil,
python3-prometheus-client,
python3-psycopg2,
python3-tz,
python3-requests,
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
'Django == 1.11.*, >= 1.11.19',
'Markdown',
'Pillow',
'prometheus_client',
'pytz',
'requests',
],
Expand Down
51 changes: 47 additions & 4 deletions src/ctf_gameserver/checker/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import datetime
import logging
import math
import multiprocessing
import os
import signal
import time
Expand All @@ -17,7 +18,7 @@
from ctf_gameserver.lib.exceptions import DBDataError
import ctf_gameserver.lib.flag as flag_lib

from . import database
from . import database, metrics
from .supervisor import RunnerSupervisor
from .supervisor import ACTION_FLAG, ACTION_LOAD, ACTION_STORE, ACTION_RESULT

Expand All @@ -29,6 +30,8 @@ def main():
help='(Old-style) Python formatstring for building the IP to connect to')
arg_parser.add_argument('--flagsecret', type=str, required=True,
help='Base64 string used as secret in flag generation')
arg_parser.add_argument('--metrics-listen', type=str, help='Expose Prometheus metrics via HTTP '
'("<host>:<port>")')

group = arg_parser.add_argument_group('statedb', 'Checker state database')
group.add_argument('--statedbhost', type=str, help='Hostname of the database. If unspecified, the '
Expand Down Expand Up @@ -98,6 +101,33 @@ def main():
return os.EX_USAGE
logging_params['gelf'] = {'host': gelf_host, 'port': gelf_port}

# Configure metrics
if args.metrics_listen is not None:
metrics_listen = urllib.parse.urlsplit('//' + args.metrics_listen)
metrics_host = metrics_listen.hostname
metrics_port = metrics_listen.port
if metrics_host is None or metrics_port is None:
logging.error('Metrics listen address needs to be specified as "<host>:<port>"')
return os.EX_USAGE

metrics_queue = multiprocessing.Queue()
metrics_recv, metrics_send = multiprocessing.Pipe()
metrics_collector_process = multiprocessing.Process(
target=metrics.run_collector,
args=(args.service, metrics.checker_metrics_factory, metrics_queue, metrics_send)
)
metrics_collector_process.start()
metrics_server_process = multiprocessing.Process(
target=metrics.run_http_server,
args=(metrics_host, metrics_port, metrics_queue, metrics_recv)
)
metrics_server_process.start()

metrics.set(metrics_queue, 'interval_length_seconds', args.interval)
metrics.set(metrics_queue, 'start_timestamp', time.time())
else:
metrics_queue = metrics.DummyQueue()

flag_secret = base64.b64decode(args.flagsecret)

# Connect to databases
Expand Down Expand Up @@ -159,7 +189,7 @@ def main():
try:
master_loop = MasterLoop(game_db_conn, state_db_conn, args.service, args.checkerscript,
args.sudouser, args.stddeviations, args.checkercount, args.interval,
args.ippattern, flag_secret, logging_params)
args.ippattern, flag_secret, logging_params, metrics_queue)
break
except DBDataError as e:
logging.warning('Waiting for valid database state: %s', e)
Expand All @@ -177,13 +207,18 @@ def sigterm_handler(_, __):
if master_loop.shutting_down and master_loop.get_running_script_count() == 0:
break

metrics_server_process.terminate()
metrics_collector_process.terminate()
metrics_server_process.join()
metrics_collector_process.join()

return os.EX_OK


class MasterLoop:

def __init__(self, game_db_conn, state_db_conn, service_slug, checker_script, sudo_user, std_dev_count,
checker_count, interval, ip_pattern, flag_secret, logging_params):
checker_count, interval, ip_pattern, flag_secret, logging_params, metrics_queue):
self.game_db_conn = game_db_conn
self.state_db_conn = state_db_conn
self.checker_script = checker_script
Expand All @@ -194,12 +229,13 @@ def __init__(self, game_db_conn, state_db_conn, service_slug, checker_script, su
self.ip_pattern = ip_pattern
self.flag_secret = flag_secret
self.logging_params = logging_params
self.metrics_queue = metrics_queue

self.refresh_control_info()
self.service = database.get_service_attributes(self.game_db_conn, service_slug)
self.service['slug'] = service_slug

self.supervisor = RunnerSupervisor()
self.supervisor = RunnerSupervisor(metrics_queue)
self.known_tick = -1
# Trigger launch of tasks in first step()
self.last_launch = get_monotonic_time() - self.interval
Expand Down Expand Up @@ -254,6 +290,10 @@ def step(self):
if not self.shutting_down:
# Launch new tasks and catch up missed intervals
while get_monotonic_time() - self.last_launch >= self.interval:
delay = get_monotonic_time() - self.last_launch - self.interval
metrics.observe(self.metrics_queue, 'task_launch_delay_seconds', delay)
metrics.set(self.metrics_queue, 'last_launch_timestamp', time.time())

self.last_launch += self.interval
self.launch_tasks()

Expand Down Expand Up @@ -304,6 +344,7 @@ def handle_result_request(self, task_info, param):

logging.info('Result from Checker Script for team %d (net number %d) in tick %d: %s',
task_info['_team_id'], task_info['team'], task_info['tick'], check_result)
metrics.inc(self.metrics_queue, 'completed_tasks', labels={'result': check_result.name})
database.commit_result(self.game_db_conn, self.service['id'], task_info['team'], task_info['tick'],
result)

Expand Down Expand Up @@ -372,6 +413,8 @@ def update_launch_params(self, tick):
self.tasks_per_launch = math.ceil(local_tasks / intervals_per_timeframe)
logging.info('Planning to start %d tasks per interval with a maximum duration of %d seconds',
self.tasks_per_launch, check_duration)
metrics.set(self.metrics_queue, 'tasks_per_launch_count', self.tasks_per_launch)
metrics.set(self.metrics_queue, 'max_task_duration_seconds', check_duration)

def get_running_script_count(self):
return len(self.supervisor.processes)
Expand Down
187 changes: 187 additions & 0 deletions src/ctf_gameserver/checker/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import logging
import queue
from wsgiref import simple_server

import prometheus_client


def inc(metrics_queue, name, value=1, labels=None):

metrics_queue.put(MetricsMessage(name, 'inc', value, labels))


def dec(metrics_queue, name, value=1, labels=None):

metrics_queue.put(MetricsMessage(name, 'dec', value, labels))


def set(metrics_queue, name, value, labels=None): # pylint: disable=redefined-builtin

metrics_queue.put(MetricsMessage(name, 'set', value, labels))


def observe(metrics_queue, name, value, labels=None):

metrics_queue.put(MetricsMessage(name, 'observe', value, labels))


class MetricsMessage:
"""
Message to put into run_collector()'s queue for recording metric changes.
"""

def __init__(self, name, instruction, value, labels=None):
self.name = name
self.instruction = instruction
self.value = value

if labels is None:
self.labels = {}
else:
self.labels = labels


class HTTPGenMessage:
"""
Message to put into run_collector()'s queue for receiving a text representation of its metrics (for HTTP
export) through its pipe.
"""


def checker_metrics_factory(registry):

metrics = {}
metric_prefix = 'ctf_checkermaster_'

counters = [
('started_tasks', 'Number of started Checker Script instances', []),
('completed_tasks', 'Number of successfully completed checks', ['result']),
('terminated_tasks', 'Number of Checker Script instances forcibly terminated', [])
]
for name, doc, labels in counters:
metrics[name] = prometheus_client.Counter(metric_prefix+name, doc, labels+['service'],
registry=registry)

gauges = [
('start_timestamp', '(Unix timestamp when the process was started', []),
('interval_length_seconds', 'Configured launch interval length', []),
('last_launch_timestamp', '(Unix) timestamp when tasks were launched the last time', []),
('tasks_per_launch_count', 'Number of checks to start in one launch interval', []),
('max_task_duration_seconds', 'Currently estimated maximum runtime of one check', [])
]
for name, doc, labels in gauges:
metrics[name] = prometheus_client.Gauge(metric_prefix+name, doc, labels+['service'],
registry=registry)

histograms = [
('task_launch_delay_seconds', 'Differences between supposed and actual task launch times', [],
(0.01, 0.03, 0.05, 0.1, 0.3, 0.5, 1, 3, 5, 10, 30, 60, float('inf'))),
('script_duration_seconds', 'Observed runtimes of Checker Scripts', [],
(1, 3, 5, 8, 10, 20, 30, 45, 60, 90, 120, 150, 180, 240, 300, float('inf')))
]
for name, doc, labels, buckets in histograms:
metrics[name] = prometheus_client.Histogram(metric_prefix+name, doc, labels+['service'],
buckets=buckets, registry=registry)

return metrics


def run_collector(service, metrics_factory, in_queue, pipe_to_server):
"""
Manages Prometheus metrics. Receives changes to the metrics through a queue and emits their text
representation (for HTTP export) over a pipe. Designed to be run as "target" in a multiprocessing.Process
in conjunction with run_http_server().
Args:
service: Slug of this checker instance's service.
metrics_factory: Callable returning a dict of the mtrics to use mapping from name to Metric object.
in_queue: Queue over which MetricsMessages and HTTPGenMessages are received.
pipe_to_server: Pipe to which text representations of the metrics are sent in response to
HTTPGenMessages.
"""

registry = prometheus_client.CollectorRegistry()
metrics = metrics_factory(registry)

def handle_metrics_message(msg):
try:
metric = metrics[msg.name]
except KeyError:
logging.error('Recevied message for unknown metric "%s", ignoring', msg.name)
return

# Apparently, there is no nicer way to access the label names
if 'service' in metric._labelnames: # pylint: disable=protected-access
msg.labels['service'] = service
if len(msg.labels) > 0:
try:
metric = metric.labels(**(msg.labels))
except ValueError:
logging.error('Invalid labels specified for metric "%s", ignoring', msg.name)
return

try:
bound_method = getattr(metric, msg.instruction)
except AttributeError:
logging.error('Cannot use instruction "%s" on metric "%s", ignoring', msg.instruction, msg.name)
return
try:
bound_method(msg.value)
except: # noqa, pylint: disable=bare-except
logging.exception('Could not update metric "%s":', msg.name)

def send_metrics_text():
metrics_text = prometheus_client.generate_latest(registry)
pipe_to_server.send(metrics_text)

while True:
message = in_queue.get(True)
if isinstance(message, MetricsMessage):
handle_metrics_message(message)
elif isinstance(message, HTTPGenMessage):
send_metrics_text()
else:
logging.error('Received unknown message on collector queue')


def run_http_server(host, port, queue_to_collector, pipe_from_collector):
"""
Runs a server exposing Prometheus metrics via HTTP. The metrics are requested through a HTTPGenMessage
and received over the pipe. Designed to be run as "target" in a multiprocessing.Process in conjunction
with run_collector().
Args:
host: Host to run the HTTP server on.
port: Port to run the HTTP server on.
queue_to_collector: Queue to which HTTPGenMessages are sent.
pipe_from_collector: Pipe from which text representations of the metrics are received.
"""

def app(_, start_response):
queue_to_collector.put(HTTPGenMessage())
output = pipe_from_collector.recv()

status = '200 OK'
headers = [
('Content-Type', prometheus_client.CONTENT_TYPE_LATEST)
]
start_response(status, headers)
return [output]

class SilentHandler(simple_server.WSGIRequestHandler):
def log_message(self, _, *args):
"""
Doesn't log anything.
"""

http_server = simple_server.make_server(host, port, app, handler_class=SilentHandler)
http_server.serve_forever()


class DummyQueue(queue.Queue):
"""
Queue that discards all elements put into it.
"""

def put(self, item, block=True, timeout=None):
pass
15 changes: 14 additions & 1 deletion src/ctf_gameserver/checker/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from ctf_gameserver.lib.checkresult import CheckResult

from . import metrics


ACTION_FLAG = 'FLAG'
ACTION_LOAD = 'LOAD'
Expand All @@ -34,14 +36,17 @@ class RunnerSupervisor:
Launches Checker Script Runners as individual processes and takes care of communicating with them.
"""

def __init__(self):
def __init__(self, metrics_queue):
self.metrics_queue = metrics_queue

# Timeout if there are no requests when all Runners are done or blocking
self.queue_timeout = 1
self._reset()

def _reset(self):
self.work_queue = multiprocessing.Queue()
self.processes = {}
self.start_times = {}
self.next_identifier = 0

def start_runner(self, args, sudo_user, info, logging_params):
Expand All @@ -51,14 +56,17 @@ def start_runner(self, args, sudo_user, info, logging_params):
logging_params, self.next_identifier,
self.work_queue, receive))
self.processes[self.next_identifier] = (proc, send, info)
self.start_times[self.next_identifier] = time.monotonic()

proc.start()
self.next_identifier += 1
metrics.inc(self.metrics_queue, 'started_tasks')

def terminate_runner(self, runner_id):
logging.info('Terminating Runner process, info: %s', self.processes[runner_id][2])
self.processes[runner_id][0].terminate()
# Afterwards, get_request() will join the child and remove it from `self.processes`
metrics.inc(self.metrics_queue, 'terminated_tasks')

def terminate_runners(self):
if len(self.processes) > 0:
Expand All @@ -82,9 +90,14 @@ def get_request(self):

# Join all terminated child processes
if action == ACTION_RUNNER_EXIT:
duration = time.monotonic() - self.start_times[runner_id]
metrics.observe(self.metrics_queue, 'script_duration_seconds', duration)
del self.start_times[runner_id]

proc = self.processes[runner_id][0]
proc.join()
del self.processes[runner_id]

if self.work_queue.empty():
return None
else:
Expand Down
Loading

0 comments on commit a0a3aa7

Please sign in to comment.