Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
davidgiga1993 committed Aug 21, 2022
2 parents 8ca1b28 + 30b4562 commit 4323641
Show file tree
Hide file tree
Showing 23 changed files with 553 additions and 63 deletions.
74 changes: 60 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# pollect - python data collection daemon

pollect is a daemon for collecting system and application metrics in periodical intervals.
(similar to collectd). It's designed to require very little dependencies to run.
(similar to collectd). It's designed to require very little dependencies and to be easily customizable.

# Architecture

Expand Down Expand Up @@ -60,16 +60,15 @@ executors:
- collection: slowerMetrics
tickTime: 120
sources:
- type: Http
- type: Http # See Http below for more details
url: https://google.com
labels:
labels: # Additional labels/tags for the metrics
# It is also possible to access env variables anywhere
# in the config
system: prod
home: ${HOME}
```
A more advanced configuration sample can be found in the `pollect.[json|yml]` file.

# Metric names
Expand Down Expand Up @@ -100,13 +99,14 @@ The following parameters are available for all sources:

## Http response time `Http`

Measures the http response time
Measures the http response time in milliseconds

| Param | Desc |
|------------|--------------------------------------------------|
| url | Url to the web service |
| timeout | Timeout in seconds (default 15) |
| statusCode | The expected status code (default any non error) |
| Param | Desc |
|------------|-------------------------------------------------------------------------------------------|
| url | Url to the web service. Can be a list of strings as well (the url will be added as label) |
| timeout | Timeout in seconds (default 15) |
| statusCode | The expected status code (default any non error) |
| proxy | Http proxy which should be used (defaults to none) |

## Disk usage `DiskUsage`

Expand Down Expand Up @@ -338,6 +338,40 @@ Collects download statistics from apple
}
```

## Http Ingress source `HttpIngress`
Requires the `gevent` package
This source starts a simple http webserver and where you can post metrics to.
It's intended if you want to push metrics to pollect, instead of using the default pull probes.

```yml
- type: HttpIngress
name: Ingress
port: 9005 # Listener port
metrics: # You can define multiple metrics
sample_metric: # Name of the metric
type: counter # Optional, gauge by default, counter will cause the value to increment by X every time
labels: # Labels for this metric
- host
```

You can update the metrics using a simple http json post:
```bash
curl -X POST http://pollect:9005 \
-H 'Content-Type: application/json' \
--data-binary @- << EOF
{
"metrics": {
"sample_metric": {
"value": 124
"labels": {
"host": "my-hostname"
}
}
}
}
EOF
```

# Writers

A writer represents the destination where the collected data is written to.
Expand All @@ -351,11 +385,23 @@ Prints the collected data to the stdout
Exports the data via a prometheus endpoint. The port can be configured using
`port`as configuration:

```yaml
writer:
type: Prometheus
port: 9001
```
"writer": {
"type": "Prometheus",
"port": 9001
}

### Https support

Pollect has a custom prometheus exporter which supports https.
This requires the `gevent` package.

```yaml
writer:
type: PrometheusSsl
port: 8000
key: key.key
cert: cert.pem
```

# Multithreading
Expand Down
9 changes: 8 additions & 1 deletion pollect/Pollect.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,21 @@ def load_config(config: str):

def main():
parser = argparse.ArgumentParser()
parser.add_argument('--version', dest='version', action='store_true',
help='Prints the current version')
parser.add_argument('-d', '--debug', dest='debug', action='store_true')
parser.add_argument('-c', '--config', dest='config', default='config',
help='Configuration file which should be read. If no file extension is given '
'both (yml and json) will be checked.')
parser.add_argument('-r', '--dry-run', dest='dry_run', action='store_true',
help='Prints the probed data to the stdout instead of sending it to the writer')
help='Prints the probed data to stdout instead of sending it to the writer')
args = parser.parse_args()

if args.version:
from pollect import __version__
print(f'Pollect {__version__}')
return

if args.debug:
Log.set_debug()

Expand Down
2 changes: 1 addition & 1 deletion pollect/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
name = "pollect"
__version__ = '1.1.9'
36 changes: 21 additions & 15 deletions pollect/core/Core.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def create_executors(self) -> List[Executor]:

class Executor(Log):
"""
Executes probes
Executes a collection of probes.
"""

config: Dict[str, any]
Expand Down Expand Up @@ -124,21 +124,31 @@ def execute(self):
partial_write = self.writer.supports_partial_write()
futures = []

# Probe the actual data
for source in self._sources:
assert isinstance(source, Source)
future = self.thread_pool.submit(self._probe_and_write if partial_write else self._probe, source)
futures.append(future)

if partial_write:
# Data has already been written to the exporter
return

# Wait and merge the results
data = []
for future in futures:
# noinspection PyTypeChecker
self._merge(future.result(), data)
self._write(data)
self._write(data, self)

def _probe_and_write(self, source: Source):
"""
Probes a single source and writes the data to the writer
:param source: Source
"""
value_sets = self._probe(source)
data = []
self._merge(value_sets, data)
self._write(data, source)

def _probe(self, source: Source) -> Optional[List[ValueSet]]:
"""
Expand All @@ -160,16 +170,6 @@ def _probe(self, source: Source) -> Optional[List[ValueSet]]:
self.log.error(f'Error while probing using source {source}: {e}')
return None

def _probe_and_write(self, source: Source):
"""
Probes a single source and writes the data to the writer
:param source: Source
"""
value_sets = self._probe(source)
data = []
self._merge(value_sets, data)
self._write(data)

def _merge(self, value_sets: List[ValueSet], results: List[ValueSet]):
"""
Merges the given value sets
Expand All @@ -185,13 +185,19 @@ def _merge(self, value_sets: List[ValueSet], results: List[ValueSet]):
value_set.name = self.collection_name
results.append(value_set)

def _write(self, value_sets: List[ValueSet]):
def _write(self, value_sets: List[ValueSet], source_ref: object):
"""
Writes the given value sets using the current exporter
:param value_sets: Value sets
:param source_ref: Reference object which collected the data.
This is used to detect if a metric has been removed
"""
if len(value_sets) == 0:
return

# Write the data
self.log.info('Writing data...')
try:
self.writer.write(value_sets)
self.writer.write(value_sets, source_ref)
except Exception as e:
self.log.error(f'Could not write data: {e}')
2 changes: 1 addition & 1 deletion pollect/core/ExecutionScheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def run(self):
schedule.run_all()
while self._active:
schedule.run_pending()
time.sleep(10)
time.sleep(1)

def stop(self):
"""
Expand Down
10 changes: 8 additions & 2 deletions pollect/core/Helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@ def accept(include, exclude, value):
return True


def get_url(url, timeout: int = 5, expected_status: Optional[int] = None):
def get_url(url, timeout: int = 5, expected_status: Optional[int] = None, proxy: Optional[str] = None):
try:
with request.urlopen(url, timeout=timeout) as url:
req = request.Request(url)
if proxy is not None:
req.set_proxy(proxy, 'http')
req.set_proxy(proxy, 'https')
req.timeout = timeout

with request.urlopen(req) as url:
return url.read()
except HTTPError as e:
if expected_status == e.status:
Expand Down
3 changes: 3 additions & 0 deletions pollect/core/config/ConfigContainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ def values(self):
for value in self._data.values():
yield self._resolve(value)

def items(self):
return self._data.items()

def get(self, key: str, default: any = None, required: bool = False,
ignore_missing_env: Optional[str] = None) -> Optional[any]:
if key not in self._data:
Expand Down
18 changes: 18 additions & 0 deletions pollect/libs/Utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import TypeVar, Dict

T = TypeVar('T')
K = TypeVar('K')


def put_if_absent(dict_obj: Dict[K, T], key: any, value: T) -> T:
"""
Returns the value from key
:param dict_obj:
:param key:
:param value:
:return:
"""
if key in dict_obj:
return dict_obj[key]
dict_obj[key] = value
return value
6 changes: 6 additions & 0 deletions pollect/libs/sma/SmaModbus.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,25 @@ class SmaModbus(Log):
Communicates via modbus to SMA PV invertes
"""
_unit_id: int = -1
_is_connected: bool = False

def __init__(self, host: str, port: int = 502):
super().__init__()
self._client = ModbusTcpClient(host, port)

def is_connected(self) -> bool:
return self._is_connected

def connect(self):
self._client.connect()
# Ask for unit ID
reply = self._client.read_holding_registers(42109, 4, unit=1)
self._unit_id = reply.registers[3]
self._is_connected = True

def close(self):
self._client.close()
self._is_connected = False

def read(self, reg: Register) -> ValueWithUnit:
value = u32(self._client.read_holding_registers(reg.id, 2, unit=self._unit_id))
Expand Down
Loading

0 comments on commit 4323641

Please sign in to comment.