Skip to content

Commit

Permalink
Dynamically update the ddm_quota of tape rses (#728)
Browse files Browse the repository at this point in the history
Dynamically update the dm_weight of tape rses as well
-  Keep setting ddm_quota for disk rses until we deprecate it
- Add docs link, make the lowest weight non-zero and make the RSE expressions consistent

---------

Co-authored-by: Hasan Ozturk <[email protected]>
  • Loading branch information
haozturk and Hasan Ozturk authored Apr 10, 2024
1 parent e7287c9 commit d38e167
Showing 1 changed file with 111 additions and 83 deletions.
194 changes: 111 additions & 83 deletions docker/rucio_client/scripts/updateDDMQuota
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,38 @@

import math
from rucio.client import Client
from tabulate import tabulate
client = Client()

# ddm_quota are weight given to RSEs based on the amount of free space
# This is calulated as static use - rucio use
# The rule evaluation algorithm uses a weighted random selection of RSEs based on this value

# NOTE: This probably needs to be reviewed after an assement of age of dynamic data at different sites
# and if this can be used to normalise that

# See the following link for documentation and please update it if you change the logic
# https://cmsdmops.docs.cern.ch/Operators/ManageDMWeight/

DRY_RUN = False
VERBOSE = False
MAKE_QUADRATIC = False
MAX_DDM_QUOTA = 1000

STATIC_WEIGHT = 0.2
FREE_WEIGHT = 0.5
EXPIRED_WEIGHT = 0.3

# Adding ddm_quota attribute to all disk RSEs
# T3s do not have the "static" usage set, they are quasi-static
RSE_EXPRESSION = "rse_type=DISK&cms_type=real&tier<3&tier>0"
rses = [rse["rse"] for rse in client.list_rses(rse_expression=RSE_EXPRESSION)]
ddm_quotas = {}
overridden_ddm_quotas = {}


def get_stats():
values = list(ddm_quotas.values())
print(" ====== STATISTICS ====== ")

sorted_items = sorted(ddm_quotas.items(), key=lambda x: x[1])
for item in sorted_items:
print(item)

print("")

# Calculate the mean
mean = sum(values) / len(values)
print("Mean: ", mean)

# Calculate the standard deviation
variance = sum((x - mean) ** 2 for x in values) / len(values)
std_dev = math.sqrt(variance)
print("Std: ", std_dev)


def get_sum_of_all_rse_statics():
MAX_DM_WEIGHT = 100
MIN_DM_WEIGHT = 1
stats = {}

def print_results():
headers = ['RSE', 'PLEDGE (PB)', 'FREE SPACE (PB)', "RELATIVE FREE (%)", "DM WEIGHT COEFFICIENT", "DM_WEIGHT"]
table_disk = []
table_tape = []
for rse, values in stats.items():
pledge = values["pledge"] / 1e15
free = values["free"] / 1e15
relative_free = (free/pledge) * 100
dm_weight_coefficient = values["dm_weight_coefficient"]
dm_weight = values["dm_weight"]
if "Tape" in rse:
table_tape.append([rse,pledge,free,relative_free,dm_weight_coefficient, dm_weight])
else:
table_disk.append([rse,pledge,free,relative_free,dm_weight_coefficient, dm_weight])

print(tabulate(table_disk, headers=headers))
print(tabulate(table_tape, headers=headers))


def get_sum_of_all_rse_statics(rse_expression):
rses = [rse["rse"] for rse in client.list_rses(rse_expression=rse_expression)]
result = 0
for rse in rses:
static, _, _ = get_rse_info(rse)
Expand All @@ -72,16 +56,17 @@ def get_rse_info(rse):
print("{} is not a subset of {}".format(required_fields, relevant_info.keys()))
return 0, 0, 0

# ddm_quota is set proportional to percentage of (dynamic + free) space
# Apparently, python integers do not overflow, https://docs.python.org/3/library/exceptions.html#OverflowError

static, rucio, expired = relevant_info["static"], relevant_info["rucio"], relevant_info["expired"]
return static, rucio, expired


def calculate_ddm_quotas():
def calculate_dm_weights(rse_expression, static_weight, free_weight, expired_weight, make_quadratic):

total_static = get_sum_of_all_rse_statics()
total_static = get_sum_of_all_rse_statics(rse_expression)
rses = [rse["rse"] for rse in client.list_rses(rse_expression=rse_expression)]
dm_weights = {}

for rse in rses:
static, rucio, expired = get_rse_info(rse)
Expand All @@ -90,52 +75,95 @@ def calculate_ddm_quotas():
if static == 0:
continue # Skip if static is 0

free = static - rucio
ddm_quota = STATIC_WEIGHT * (static / total_static) + FREE_WEIGHT * \
(free / static) + EXPIRED_WEIGHT * (expired / static)
if MAKE_QUADRATIC:
ddm_quota = ddm_quota ** 2

# Override ddm_quota for operational purposes
# Control dm_weight for specially configured rses
rse_attributes = client.list_rse_attributes(rse)
if "override_ddm_quota" in rse_attributes:
overridden_ddm_quotas[rse] = rse_attributes["override_ddm_quota"]
continue

ddm_quotas[rse] = ddm_quota


def normalize_ddm_quotas():
weights = [value for value in ddm_quotas.values()]
for rse, weight in ddm_quotas.items():
ddm_quotas[rse] = int(((weight - min(weights)) / (max(weights) - min(weights))) * MAX_DDM_QUOTA)
free = static - rucio
dm_weight = static_weight * (static / total_static) + free_weight * \
(free / static) + expired_weight * (expired / static)
dm_weight_coefficient = 1
if "dm_weight_coefficient" in rse_attributes:
dm_weight_coefficient = float(rse_attributes["dm_weight_coefficient"])
dm_weight *= dm_weight_coefficient
if make_quadratic:
dm_weight = dm_weight ** 2

stats[rse] = {
"pledge": static,
"free": free,
"dm_weight_coefficient" : dm_weight_coefficient,
}

dm_weights[rse] = dm_weight
return dm_weights


def normalize_dm_weights(dm_weights):
weights = [value for value in dm_weights.values()]
min_positive_weight = min([n for n in weights if n > 0])
for rse, weight in dm_weights.items():

# Don't take 0 weights into normalization, since they are manually overriden
if weight == 0:
dm_weight = weight
else:
# This normalization sets the lowest dm_weight to 0
dm_weight = int(((weight - min_positive_weight) / (max(weights) - min_positive_weight)) * MAX_DM_WEIGHT)
# Make the lowest dm_weight non-zero
if dm_weight == 0:
dm_weight = MIN_DM_WEIGHT
#dm_weight = int((weight/sum(weights)) * MAX_DM_WEIGHT)
dm_weights[rse] = dm_weight
stats[rse]["dm_weight"] = dm_weight

return dm_weights

def set_ddm_quotas():
# Set automatically calculated ddm quotas
for rse, ddm_quota in ddm_quotas.items():
if DRY_RUN:
print("DRY-RUN: Set ddm_quota for {} to {}".format(rse, ddm_quota))
else:
client.add_rse_attribute(rse, "ddm_quota", ddm_quota)
print("Set ddm_quota for {} to {}".format(rse, ddm_quota))

# Set overriden ddm_quotas
for rse, ddm_quota in overridden_ddm_quotas.items():
def set_dm_weights(dm_weights):
# Set automatically calculated dm weights
for rse, dm_weight in dm_weights.items():
if DRY_RUN:
print("DRY-RUN: Override ddm_quota for {} to {}".format(rse, ddm_quota))
print("DRY-RUN: Set dm_weight for {} to {}".format(rse, dm_weight))
else:
client.add_rse_attribute(rse, "ddm_quota", ddm_quota)
print("Override ddm_quota for {} to {}".format(rse, ddm_quota))

client.add_rse_attribute(rse, "dm_weight", dm_weight)
# Keep setting ddm_quota for disk rses until we deprecate it
if "Tape" not in rse:
client.add_rse_attribute(rse, "ddm_quota", dm_weight)
print("Set dm_weight for {} to {}".format(rse, dm_weight))

def run(rse_expression, static_weight, free_weight, expired_weight, make_quadratic):

# Calculate dm_weights for disk rses
# T3s do not have the "static" usage set, they are quasi-static
dm_weights = calculate_dm_weights(rse_expression = rse_expression,
static_weight = static_weight,
free_weight = free_weight,
expired_weight = expired_weight,
make_quadratic = make_quadratic)
normalized_dm_weights = normalize_dm_weights(dm_weights)
set_dm_weights(normalized_dm_weights)

def main():

calculate_ddm_quotas()
normalize_ddm_quotas()
set_ddm_quotas()
if VERBOSE:
get_stats()
# Calculate dm_weights for disk rses
# T3s do not have the "static" usage set, they are quasi-static

run(rse_expression = "rse_type=DISK&cms_type=real&tier<3&tier>0",
static_weight = 0.2,
free_weight = 0.5,
expired_weight = 0.3,
make_quadratic = True)


# Calculate dm_weights for tape rses
run(rse_expression = "rse_type=TAPE&cms_type=real&wmcore_output_tape=True",
static_weight = 0.5,
free_weight = 0.5,
expired_weight = 0,
make_quadratic = False)

print_results()



if __name__ == "__main__":
Expand Down

0 comments on commit d38e167

Please sign in to comment.