-
Notifications
You must be signed in to change notification settings - Fork 1
/
influxdb_logger.py
78 lines (60 loc) · 2.36 KB
/
influxdb_logger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import logging
import logging.handlers
import os
import time
class InfluxdbLogger:
"""Repeatedly logs data from all devices"""
# the influxdb measurement we're outputting
MEASUREMENT = "bmnode"
# how big should the log file get before rotating?
MAX_SIZE_BYTES = 1024 * 1024 * 10 # 10 megabytes
# how many backup files to keep after rotating?
# must be at least 1 to enable rotation
MAX_BACKUP_FILES = 1
def __init__(self, path: str) -> None:
self.path = os.path.abspath(path)
@property
def datalog(self) -> logging.Logger:
if not hasattr(self, "_datalog"):
# make sure our destination exists
try:
os.makedirs(os.path.dirname(self.path))
except FileExistsError:
pass
# grab the data logger
datalog = logging.getLogger("monitor.data")
# ignore any parent loggers -- these lines get written to file ONLY
datalog.propagate = False
# log at INFO
datalog.setLevel(logging.INFO)
# rotate the files every once in a while to allow files to close
handler = logging.handlers.RotatingFileHandler(
self.path,
maxBytes=self.MAX_SIZE_BYTES,
backupCount=self.MAX_BACKUP_FILES,
)
datalog.addHandler(handler)
# save as root logger
self._datalog = datalog
return self._datalog
@property
def hostname(self) -> str:
if not hasattr(self, "_hostname"):
self._hostname = open("/etc/hostname").read().strip()
return self._hostname
@classmethod
def d2str(cls, d) -> str:
"""convert dictionary of key/value pairs to a string"""
pairs = [f"{k.replace(' ', '_')}={v}" for k, v in d.items()]
return ",".join(pairs)
def emit(self, fields, tags, measurement=None) -> None:
"""logs specified fields and tags in influxdb format"""
# grab timestamp
ts = time.time_ns()
# what's the measurement
measurement = measurement if measurement else self.MEASUREMENT
# output the log;
# https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/
self.datalog.info(
f"{self.MEASUREMENT},{self.d2str(tags)} {self.d2str(fields)} {ts}"
)