Skip to content

Commit

Permalink
Refactor stats handling for improved accuracy and performance
Browse files Browse the repository at this point in the history
  • Loading branch information
dkmstr committed Nov 5, 2024
1 parent de54abc commit 9ee2bd9
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 35 deletions.
38 changes: 13 additions & 25 deletions server/src/uds/REST/methods/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,9 @@
cache = Cache('StatsDispatcher')

# Enclosed methods under /stats path
POINTS = 70
SINCE = 90 # Days, if higer values used, ensure mysql/mariadb has a bigger sort buffer
USE_MAX = True
CACHE_TIME = 60 * 60 # 1 hour
SINCE: typing.Final[int] = 14 # Days, if higer values used, ensure mysql/mariadb has a bigger sort buffer
USE_MAX: typing.Final[int] = True
CACHE_TIME: typing.Final[int] = 60 * 60 # 1 hour


def get_servicepools_counters(
Expand All @@ -68,9 +67,10 @@ def get_servicepools_counters(
val: list[dict[str, typing.Any]] = []
try:
cache_key = (
(servicepool and str(servicepool.id) or 'all') + str(counter_type) + str(POINTS) + str(since_days)
(servicepool and str(servicepool.id) or 'all') + str(counter_type) + str(since_days) + str(since_days)
)
to = sql_now()
# Get now but with 0 minutes and 0 seconds
to = sql_now().replace(minute=0, second=0, microsecond=0)
since: datetime.datetime = to - datetime.timedelta(days=since_days)

cached_value: typing.Optional[bytes] = cache.get(cache_key)
Expand All @@ -82,12 +82,12 @@ def get_servicepools_counters(
us = servicepool

stats = counters.enumerate_accumulated_counters(
interval_type=models.StatsCountersAccum.IntervalType.DAY,
interval_type=models.StatsCountersAccum.IntervalType.HOUR,
counter_type=counter_type,
owner_type=types.stats.CounterOwnerType.SERVICEPOOL,
owner_id=us.id if us.id != -1 else None,
since=since,
points=since_days, # One point per hour
points=since_days*24, # One point per hour
)
val = [
{
Expand All @@ -97,22 +97,6 @@ def get_servicepools_counters(
for x in stats
]

# val = [
# {
# 'stamp': x[0],
# 'value': int(x[1]),
# }
# for x in counters.enumerate_counters(
# us,
# counter_type,
# since=since,
# to=to,
# max_intervals=POINTS,
# use_max=USE_MAX,
# all=us.id == -1,
# )
# ]

# logger.debug('val: %s', val)
if len(val) >= 2:
cache.put(
Expand All @@ -121,7 +105,11 @@ def get_servicepools_counters(
CACHE_TIME * 2,
)
else:
val = [{'stamp': since, 'value': 0}, {'stamp': to, 'value': 0}]
# Generate as much points as needed with 0 value
val = [
{'stamp': since + datetime.timedelta(hours=i), 'value': 0}
for i in range(since_days * 24)
]
else:
val = pickle.loads(
codecs.decode(cached_value, 'zip')
Expand Down
41 changes: 34 additions & 7 deletions server/src/uds/core/managers/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,26 +176,39 @@ def get_accumulated_counters(
owner_type: typing.Optional[types.stats.CounterOwnerType] = None,
owner_id: typing.Optional[int] = None,
since: typing.Optional[typing.Union[datetime.datetime, int]] = None,
to: typing.Optional[typing.Union[datetime.datetime, int]] = None,
points: typing.Optional[int] = None,
) -> typing.Generator[types.stats.AccumStat, None, None]:
if to is None:
to = sql_now()
elif isinstance(to, int):
to = datetime.datetime.fromtimestamp(to)

if since is None:
if points is None:
points = 100 # If since is not specified, we need at least points, get a default
since = sql_now() - datetime.timedelta(seconds=interval_type.seconds() * points)
since = to - datetime.timedelta(seconds=interval_type.seconds() * points)
elif isinstance(since, int):
since = datetime.datetime.fromtimestamp(since)

# If points has any value, ensure since..to is points long
if points is not None:
# Ensure since is at least points long before to
if (to - since).seconds < interval_type.seconds() * points:
since = to - datetime.timedelta(seconds=interval_type.seconds() * points)

if isinstance(since, datetime.datetime):
since = int(since.timestamp())
since = int(since.replace(minute=0, second=0, microsecond=0).timestamp())
to = int(to.replace(minute=0, second=0, microsecond=0).timestamp())

# Filter from since to now, get at most points
query = StatsCountersAccum.objects.filter(
interval_type=interval_type,
counter_type=counter_type,
stamp__gte=since,
owner_id=owner_id if owner_id is not None else -1,
).order_by('stamp')
if owner_type is not None:
query = query.filter(owner_type=owner_type)
if owner_id is not None:
query = query.filter(owner_id=owner_id)
# If points is NONE, we get all data
query = query[:points]

Expand All @@ -210,6 +223,7 @@ def get_accumulated_counters(
yield last
stamp += interval_type.seconds()
last.stamp = stamp

# The record to be emmitted is the current one, but replace record stamp with current stamp
# The recor is for sure the first one previous to stamp (we have emmited last record until we reach this one)
last = types.stats.AccumStat(
Expand All @@ -219,10 +233,23 @@ def get_accumulated_counters(
rec.v_max,
rec.v_min,
)
# Append to numpy array
yield last
stamp += interval_type.seconds()

# Complete the serie until to
last = types.stats.AccumStat(
stamp,
0,
0,
0,
0,
)

while stamp < to:
yield last
stamp += interval_type.seconds()
last.stamp = stamp

def perform_counters_maintenance(self) -> None:
"""
Removes all counters previous to configured max keep time for stat information from database.
Expand Down Expand Up @@ -256,7 +283,7 @@ def add_event(
stamp: if not None, this will be used as date for cuounter, else current date/time will be get
kwargs: Additional fields for the event. This will be stored as fld1, fld2, fld3, fld4
Note: to see fields equivalency, check _FLDS_EQUIV
Returns:
Expand Down
4 changes: 2 additions & 2 deletions server/src/uds/core/util/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,9 +577,9 @@ class GlobalConfig:
# Statisctis accumulation frequency, in seconds
STATS_ACCUM_FREQUENCY: Config.Value = Config.section(Config.SectionType.GLOBAL).value(
'statsAccumFrequency',
'14400',
'3600',
type=Config.FieldType.NUMERIC,
help=_('Frequency of stats collection in seconds. Default is 4 hours (14400 seconds)'),
help=_('Frequency of stats collection in seconds. Default is 1 hours (3600 seconds)'),
)
# Statisctis accumulation chunk size, in days
STATS_ACCUM_MAX_CHUNK_TIME = Config.section(Config.SectionType.GLOBAL).value(
Expand Down
2 changes: 2 additions & 0 deletions server/src/uds/core/util/stats/counters.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def enumerate_accumulated_counters(
owner_type: typing.Optional[types.stats.CounterOwnerType] = None,
owner_id: typing.Optional[int] = None,
since: typing.Optional[typing.Union[datetime.datetime, int]] = None,
to: typing.Optional[datetime.datetime] = None,
points: typing.Optional[int] = None,
*,
infer_owner_type_from: typing.Optional[CounterClass] = None,
Expand All @@ -234,5 +235,6 @@ def enumerate_accumulated_counters(
owner_type=owner_type,
owner_id=owner_id,
since=since,
to=to,
points=points,
)
2 changes: 1 addition & 1 deletion server/src/uds/workers/stats_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class StatsAccumulator(Job):
This is done by:
* For HOUR, DAY
"""
frecuency = 3600 # Executed every 4 hours
frecuency = 3600 # Executed every hours
frecuency_cfg = (
config.GlobalConfig.STATS_ACCUM_FREQUENCY
)
Expand Down

0 comments on commit 9ee2bd9

Please sign in to comment.