From d38e1671bc329903447dee68930e964a48fc4f17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hasan=20=C3=96zt=C3=BCrk?= <32415369+haozturk@users.noreply.github.com> Date: Wed, 10 Apr 2024 09:10:55 +0200 Subject: [PATCH] Dynamically update the ddm_quota of tape rses (#728) Dynamically update the dm_weight of tape rses as well - Keep setting ddm_quota for disk rses until we deprecate it - Add docs link, make the lowest weight non-zero and make the RSE expressions consistent --------- Co-authored-by: Hasan Ozturk --- docker/rucio_client/scripts/updateDDMQuota | 194 ++++++++++++--------- 1 file changed, 111 insertions(+), 83 deletions(-) diff --git a/docker/rucio_client/scripts/updateDDMQuota b/docker/rucio_client/scripts/updateDDMQuota index b4dfdfbc..05e20f57 100755 --- a/docker/rucio_client/scripts/updateDDMQuota +++ b/docker/rucio_client/scripts/updateDDMQuota @@ -2,54 +2,38 @@ import math from rucio.client import Client +from tabulate import tabulate client = Client() -# ddm_quota are weight given to RSEs based on the amount of free space -# This is calulated as static use - rucio use -# The rule evaluation algorithm uses a weighted random selection of RSEs based on this value - -# NOTE: This probably needs to be reviewed after an assement of age of dynamic data at different sites -# and if this can be used to normalise that - +# See the following link for documentation and please update it if you change the logic +# https://cmsdmops.docs.cern.ch/Operators/ManageDMWeight/ DRY_RUN = False -VERBOSE = False -MAKE_QUADRATIC = False -MAX_DDM_QUOTA = 1000 - -STATIC_WEIGHT = 0.2 -FREE_WEIGHT = 0.5 -EXPIRED_WEIGHT = 0.3 - -# Adding ddm_quota attribute to all disk RSEs -# T3s do not have the "static" usage set, they are quasi-static -RSE_EXPRESSION = "rse_type=DISK&cms_type=real&tier<3&tier>0" -rses = [rse["rse"] for rse in client.list_rses(rse_expression=RSE_EXPRESSION)] -ddm_quotas = {} -overridden_ddm_quotas = {} - - -def get_stats(): - values = list(ddm_quotas.values()) - print(" ====== STATISTICS ====== ") - - sorted_items = sorted(ddm_quotas.items(), key=lambda x: x[1]) - for item in sorted_items: - print(item) - - print("") - - # Calculate the mean - mean = sum(values) / len(values) - print("Mean: ", mean) - - # Calculate the standard deviation - variance = sum((x - mean) ** 2 for x in values) / len(values) - std_dev = math.sqrt(variance) - print("Std: ", std_dev) - - -def get_sum_of_all_rse_statics(): +MAX_DM_WEIGHT = 100 +MIN_DM_WEIGHT = 1 +stats = {} + +def print_results(): + headers = ['RSE', 'PLEDGE (PB)', 'FREE SPACE (PB)', "RELATIVE FREE (%)", "DM WEIGHT COEFFICIENT", "DM_WEIGHT"] + table_disk = [] + table_tape = [] + for rse, values in stats.items(): + pledge = values["pledge"] / 1e15 + free = values["free"] / 1e15 + relative_free = (free/pledge) * 100 + dm_weight_coefficient = values["dm_weight_coefficient"] + dm_weight = values["dm_weight"] + if "Tape" in rse: + table_tape.append([rse,pledge,free,relative_free,dm_weight_coefficient, dm_weight]) + else: + table_disk.append([rse,pledge,free,relative_free,dm_weight_coefficient, dm_weight]) + + print(tabulate(table_disk, headers=headers)) + print(tabulate(table_tape, headers=headers)) + + +def get_sum_of_all_rse_statics(rse_expression): + rses = [rse["rse"] for rse in client.list_rses(rse_expression=rse_expression)] result = 0 for rse in rses: static, _, _ = get_rse_info(rse) @@ -72,16 +56,17 @@ def get_rse_info(rse): print("{} is not a subset of {}".format(required_fields, relevant_info.keys())) return 0, 0, 0 - # ddm_quota is set proportional to percentage of (dynamic + free) space # Apparently, python integers do not overflow, https://docs.python.org/3/library/exceptions.html#OverflowError static, rucio, expired = relevant_info["static"], relevant_info["rucio"], relevant_info["expired"] return static, rucio, expired -def calculate_ddm_quotas(): +def calculate_dm_weights(rse_expression, static_weight, free_weight, expired_weight, make_quadratic): - total_static = get_sum_of_all_rse_statics() + total_static = get_sum_of_all_rse_statics(rse_expression) + rses = [rse["rse"] for rse in client.list_rses(rse_expression=rse_expression)] + dm_weights = {} for rse in rses: static, rucio, expired = get_rse_info(rse) @@ -90,52 +75,95 @@ def calculate_ddm_quotas(): if static == 0: continue # Skip if static is 0 - free = static - rucio - ddm_quota = STATIC_WEIGHT * (static / total_static) + FREE_WEIGHT * \ - (free / static) + EXPIRED_WEIGHT * (expired / static) - if MAKE_QUADRATIC: - ddm_quota = ddm_quota ** 2 - - # Override ddm_quota for operational purposes + # Control dm_weight for specially configured rses rse_attributes = client.list_rse_attributes(rse) - if "override_ddm_quota" in rse_attributes: - overridden_ddm_quotas[rse] = rse_attributes["override_ddm_quota"] - continue - ddm_quotas[rse] = ddm_quota - - -def normalize_ddm_quotas(): - weights = [value for value in ddm_quotas.values()] - for rse, weight in ddm_quotas.items(): - ddm_quotas[rse] = int(((weight - min(weights)) / (max(weights) - min(weights))) * MAX_DDM_QUOTA) + free = static - rucio + dm_weight = static_weight * (static / total_static) + free_weight * \ + (free / static) + expired_weight * (expired / static) + dm_weight_coefficient = 1 + if "dm_weight_coefficient" in rse_attributes: + dm_weight_coefficient = float(rse_attributes["dm_weight_coefficient"]) + dm_weight *= dm_weight_coefficient + if make_quadratic: + dm_weight = dm_weight ** 2 + + stats[rse] = { + "pledge": static, + "free": free, + "dm_weight_coefficient" : dm_weight_coefficient, + } + + dm_weights[rse] = dm_weight + return dm_weights + + +def normalize_dm_weights(dm_weights): + weights = [value for value in dm_weights.values()] + min_positive_weight = min([n for n in weights if n > 0]) + for rse, weight in dm_weights.items(): + + # Don't take 0 weights into normalization, since they are manually overriden + if weight == 0: + dm_weight = weight + else: + # This normalization sets the lowest dm_weight to 0 + dm_weight = int(((weight - min_positive_weight) / (max(weights) - min_positive_weight)) * MAX_DM_WEIGHT) + # Make the lowest dm_weight non-zero + if dm_weight == 0: + dm_weight = MIN_DM_WEIGHT + #dm_weight = int((weight/sum(weights)) * MAX_DM_WEIGHT) + dm_weights[rse] = dm_weight + stats[rse]["dm_weight"] = dm_weight + return dm_weights -def set_ddm_quotas(): - # Set automatically calculated ddm quotas - for rse, ddm_quota in ddm_quotas.items(): - if DRY_RUN: - print("DRY-RUN: Set ddm_quota for {} to {}".format(rse, ddm_quota)) - else: - client.add_rse_attribute(rse, "ddm_quota", ddm_quota) - print("Set ddm_quota for {} to {}".format(rse, ddm_quota)) - # Set overriden ddm_quotas - for rse, ddm_quota in overridden_ddm_quotas.items(): +def set_dm_weights(dm_weights): + # Set automatically calculated dm weights + for rse, dm_weight in dm_weights.items(): if DRY_RUN: - print("DRY-RUN: Override ddm_quota for {} to {}".format(rse, ddm_quota)) + print("DRY-RUN: Set dm_weight for {} to {}".format(rse, dm_weight)) else: - client.add_rse_attribute(rse, "ddm_quota", ddm_quota) - print("Override ddm_quota for {} to {}".format(rse, ddm_quota)) - + client.add_rse_attribute(rse, "dm_weight", dm_weight) + # Keep setting ddm_quota for disk rses until we deprecate it + if "Tape" not in rse: + client.add_rse_attribute(rse, "ddm_quota", dm_weight) + print("Set dm_weight for {} to {}".format(rse, dm_weight)) + +def run(rse_expression, static_weight, free_weight, expired_weight, make_quadratic): + + # Calculate dm_weights for disk rses + # T3s do not have the "static" usage set, they are quasi-static + dm_weights = calculate_dm_weights(rse_expression = rse_expression, + static_weight = static_weight, + free_weight = free_weight, + expired_weight = expired_weight, + make_quadratic = make_quadratic) + normalized_dm_weights = normalize_dm_weights(dm_weights) + set_dm_weights(normalized_dm_weights) def main(): - calculate_ddm_quotas() - normalize_ddm_quotas() - set_ddm_quotas() - if VERBOSE: - get_stats() + # Calculate dm_weights for disk rses + # T3s do not have the "static" usage set, they are quasi-static + + run(rse_expression = "rse_type=DISK&cms_type=real&tier<3&tier>0", + static_weight = 0.2, + free_weight = 0.5, + expired_weight = 0.3, + make_quadratic = True) + + + # Calculate dm_weights for tape rses + run(rse_expression = "rse_type=TAPE&cms_type=real&wmcore_output_tape=True", + static_weight = 0.5, + free_weight = 0.5, + expired_weight = 0, + make_quadratic = False) + + print_results() + if __name__ == "__main__":