Skip to content

Commit

Permalink
feat: support for partial write for prometheus value cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
davidgiga1993 committed Aug 23, 2022
1 parent bf4c404 commit 6c50e36
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 61 deletions.
2 changes: 1 addition & 1 deletion pollect/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.1.11'
__version__ = '1.1.12'
160 changes: 121 additions & 39 deletions pollect/writers/PrometheusWriter.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,140 @@
from __future__ import annotations
import threading
from typing import List, Dict, Optional
from wsgiref.simple_server import WSGIServer

from prometheus_client import Gauge, registry, exposition, REGISTRY

from pollect.core.ValueSet import ValueSet
from pollect.core.ValueSet import ValueSet, Value
from pollect.libs import Utils
from pollect.sources.Source import Source
from pollect.writers.Writer import Writer


class PromMetric:
"""
Represents a single metric
"""

metric: Gauge
"""
Prometheus metric
"""

updated: bool
updated: Dict[str, bool]
"""
Indicates if the metric has been updated in the current run
Indicates if the metric with the given label values has been updated in the current run
"""

def __init__(self, metric: Gauge):
self.metric = metric
self.updated = True
self.updated = {}

def reset_state(self):
for key in self.updated.keys():
self.updated[key] = False

def update(self, value_set: ValueSet, value_obj: Value):
if len(value_set.labels) > 0:
if len(value_obj.label_values) != len(value_set.labels):
raise ValueError('Incorrect label count for ' + str(value_obj) + ': Got ' +
str(len(value_set.labels)) + ' labels and ' +
str(len(value_obj.label_values)) + ' label names')

self.metric.labels(*value_obj.label_values).set(value_obj.value)
label_key = '\t'.join(value_obj.label_values)
self.updated[label_key] = True
return

self.metric.set(value_obj.value)
self.updated[''] = True

class PrometheusWriter(Writer):
_prom_metrics: Dict[object, Dict[str, PromMetric]]
def remove_not_updated(self, cache: MetricsCache):
for key in list(self.updated.keys()):
if self.updated[key] is True:
continue
del self.updated[key]

if key == '':
# In case we don't have any labels
# we can just unregister the metric, since no other
# source is using it
# (Since > 1 sources using the same metric name will cause issues anyways)
cache.unregister(self)
continue
labels = key.split('\t')
self.metric.remove(*labels)


class MetricsCache:
"""
Holds the prometheus metrics objects
as well as all exported metrics so we can check
if a metric was removed
"""
Holds all metrics, mapped to their source object and name
_source_metrics: Dict[object, Dict[str, PromMetric]]
"""
Maps a source object to the metrics created by that object
"""

_prom_counter: Dict[str, Gauge]

def __init__(self):
self._source_metrics = {}
self._prom_counter = {}

def get_or_create(self, path: str, label_names: List[str]) -> Gauge:
"""
Returns an existing gauge or creates a new one if it doesn't exist
:param path: Path
:param label_names: Label anmes
:return: Gauge
"""
gauge = self._prom_counter.get(path)
if gauge is None:
gauge = Gauge(path, path, labelnames=label_names)
self._prom_counter[path] = gauge
return gauge
return gauge

def clear(self):
"""
Removes all metrics
"""
for value in self._prom_counter.values():
registry.REGISTRY.unregister(value)
self._source_metrics.clear()
self._prom_counter.clear()

def unregister(self, metric: PromMetric):
"""
Removes the given metric
:param metric: Metric to be removed
"""
registry.REGISTRY.unregister(metric.metric)
for key, value in self._prom_counter.items():
if value == metric.metric:
del self._prom_counter[key]
break
for metrics in self._source_metrics.values():
for key, value in metrics.items():
if value == metric:
del metrics[key]
return

def get_metrics(self, source_ref) -> Dict[str, PromMetric]:
return Utils.put_if_absent(self._source_metrics, source_ref, {})


class PrometheusWriter(Writer):
_port: int
_httpd: Optional[WSGIServer]
_cache: MetricsCache

def __init__(self, config):
super().__init__(config)
self._port = self.config.get('port', 8080)
self._prom_metrics = {} # type: Dict[str, PromMetric]
self._cache = MetricsCache()

def supports_partial_write(self) -> bool:
return True
Expand Down Expand Up @@ -70,15 +168,14 @@ def clear(self):
"""
Removes all metrics
"""
for value in self._prom_metrics.values():
registry.REGISTRY.unregister(value.metric)
self._prom_metrics.clear()
self._cache.clear()

def write(self, data: List[ValueSet], source_ref: object = None):
existing_metrics = self._prom_metrics
def write(self, data: List[ValueSet], source_ref: Optional[Source] = None):
# Get the previous metrics for the given source
existing_metrics = self._cache.get_metrics(source_ref)

for value in existing_metrics.values():
value.updated = False
value.reset_state()

for value_set in data:
for value_obj in value_set.values:
Expand All @@ -88,28 +185,13 @@ def write(self, data: List[ValueSet], source_ref: object = None):

path = path.replace('-', '_').replace('.', '_').replace('!', '')

prom_metric = existing_metrics.get(path)
if prom_metric is None:
# New metric
prom_metric = PromMetric(Gauge(path, path, labelnames=value_set.labels))
existing_metrics[path] = prom_metric

prom_metric = prom_metric
prom_metric.updated = True
if len(value_set.labels) > 0:
if len(value_obj.label_values) != len(value_set.labels):
raise ValueError('Incorrect label count for ' + path + ': Got ' +
str(len(value_set.labels)) + ' labels and ' +
str(len(value_obj.label_values)) + ' label names')

prom_metric.metric.labels(*value_obj.label_values).set(value_obj.value)
continue
prom_metric.metric.set(value_obj.value)

# Check if any metric hasn't been updated, and if so remove it from the prometheus registry
for key in list(existing_metrics.keys()):
value = existing_metrics[key]
if value.updated:
continue
registry.REGISTRY.unregister(value.metric)
del existing_metrics[key]
if path not in existing_metrics:
# New metric for the current source
gauge = self._cache.get_or_create(path, label_names=value_set.labels)
existing_metrics[path] = PromMetric(gauge)

prom_metric = existing_metrics[path]
prom_metric.update(value_set, value_obj)

for value in list(existing_metrics.values()):
value.remove_not_updated(self._cache)
5 changes: 3 additions & 2 deletions pollect/writers/Writer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from abc import abstractmethod
from typing import List
from typing import List, Optional

from pollect.core.Log import Log
from pollect.core.ValueSet import ValueSet
from pollect.sources.Source import Source


class Writer(Log):
Expand Down Expand Up @@ -35,7 +36,7 @@ def stop(self):
"""

@abstractmethod
def write(self, data: List[ValueSet], source_ref: object = None):
def write(self, data: List[ValueSet], source_ref: Optional[Source] = None):
"""
Writes the given data
Expand Down
62 changes: 43 additions & 19 deletions tests/writers/test_PrometheusWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@
from pollect.core.ValueSet import ValueSet, Value

from pollect.writers.PrometheusWriter import PrometheusWriter
from pollect.writers.Writer import Writer


class TestPrometheusWriter(TestCase):
writer: Writer = None
writer: PrometheusWriter = None

@classmethod
def setUpClass(cls) -> None:
Expand All @@ -25,58 +24,83 @@ def tearDownClass(cls) -> None:
def setUp(self) -> None:
self.writer.clear()

def test_removal(self):
value_set = ValueSet()
value_set.values.append(Value(0, name='test'))
def test_removal_labels(self):
value_set = ValueSet(labels=['a'])
value_set.values.append(Value(0, name='test', label_values=['1']))
value_set.values.append(Value(0, name='test', label_values=['2']))
self.writer.write([value_set])

reply = requests.get('http://localhost:9123')
self.assertIn('test 0.0', reply.text)
self.assertIn('test{a="1"} 0.0', reply.text)
self.assertIn('test{a="2"} 0.0', reply.text)

self.writer.write([])
reply = requests.get('http://localhost:9123')
self.assertNotIn('test 0.0', reply.text)
self.assertNotIn('test{a="1"} 0.0', reply.text)
self.assertNotIn('test{a="2"} 0.0', reply.text)
self.assertEqual(1, len(self.writer._cache._prom_counter))
self.assertEqual(1, len(self.writer._cache._source_metrics))

# And add again
self.writer.write([value_set])
reply = requests.get('http://localhost:9123')
self.assertIn('test 0.0', reply.text)
self.assertIn('test{a="1"} 0.0', reply.text)
self.assertIn('test{a="2"} 0.0', reply.text)

# Only write one
value_set.values.pop()
self.writer.write([value_set])
reply = requests.get('http://localhost:9123')
self.assertIn('test{a="1"} 0.0', reply.text)
self.assertNotIn('test{a="2"} 0.0', reply.text)
self.assertEqual(1, len(self.writer._cache._prom_counter))
self.assertEqual(1, len(self.writer._cache._source_metrics))

def test_multi_label_partial_write(self):
value_set_a = ValueSet(labels=['a'])
value_set_a.values.append(Value(0, name='test', label_values=['2']))

value_set_b = ValueSet(labels=['a'])
value_set_b.values.append(Value(0, name='test', label_values=['1']))
value_set_b.values.append(Value(0, name='test2', label_values=['1']))
self.writer.write([value_set_a], value_set_a)
self.writer.write([value_set_b], value_set_b)

reply = requests.get('http://localhost:9123')
self.assertIn('test{a="1"} 0.0', reply.text)
self.assertIn('test2{a="1"} 0.0', reply.text)
self.assertIn('test{a="2"} 0.0', reply.text)

self.writer.write([])
self.writer.write([], value_set_a)
reply = requests.get('http://localhost:9123')
self.assertNotIn('test{a="1"} 0.0', reply.text)
self.assertIn('test{a="1"} 0.0', reply.text)
self.assertNotIn('test{a="2"} 0.0', reply.text)

# And add again
self.writer.write([value_set_a])
self.writer.write([value_set_b])
self.writer.write([value_set_a], value_set_a)
self.writer.write([value_set_b], value_set_b)
reply = requests.get('http://localhost:9123')
self.assertIn('test{a="1"} 0.0', reply.text)
self.assertIn('test{a="2"} 0.0', reply.text)

# Only write one - at the moment still both metrics should be available
self.writer.write([value_set_b])
# Remove value set b
self.writer.write([], value_set_b)
reply = requests.get('http://localhost:9123')
self.assertIn('test{a="1"} 0.0', reply.text)
self.assertNotIn('test{a="1"} 0.0', reply.text)
self.assertNotIn('test2{a="1"} 0.0', reply.text)
self.assertIn('test{a="2"} 0.0', reply.text)

# Add B again
self.writer.write([value_set_b], value_set_b)
# Remove single value from B
value_set_b.values.pop()
self.writer.write([value_set_b], value_set_b)
reply = requests.get('http://localhost:9123')
self.assertIn('test{a="1"} 0.0', reply.text)
self.assertNotIn('test2{a="1"} 0.0', reply.text)

def test_removal_partial_write(self):
# This test case is currently disabled since
# it the use case isn't supported for partial write
"""value_set = ValueSet()
value_set = ValueSet()
value_set.values.append(Value(0, name='test1'))
self.writer.write([value_set], 1)

Expand All @@ -91,4 +115,4 @@ def test_removal_partial_write(self):
self.writer.write([], 1)
reply = requests.get('http://localhost:9123')
self.assertNotIn('test1 0.0', reply.text)
self.assertIn('test2 0.0', reply.text)"""
self.assertIn('test2 0.0', reply.text)

0 comments on commit 6c50e36

Please sign in to comment.