-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Module to update site lists for WMAgents #12123
base: master
Are you sure you want to change the base?
Conversation
Jenkins results:
|
self.url = url | ||
self.logger = logger | ||
|
||
def getActiveRequests(self, wflowSpecs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be using a SQL statement, similar to what you have implemented for pileup updates in WorkflowUpdater.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we'll put this class to be used in WorkflowUpdaterPoller.py
(as I think it will be), then the WorkflowUpdaterPoller.py
code does this SQL statement I didn't want to do it again as we already got wflowSpecs in here: https://github.com/dmwm/WMCore/blob/master/src/python/WMComponent/WorkflowUpdater/WorkflowUpdaterPoller.py#L286
] | ||
return requests, wmaSpecs | ||
|
||
def getRequestSpecs(self, requests): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though we are going to need it to fetch the workflow spec itself, I think we will have to implement a call to the wmstatserver
service and retrieve the site lists with a filter+mask call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? if we fetch the whole spec it contains site lists, why do we need to get it again from wmstatserver? I just don't understand the reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not to get it again, but to get it from WMStats instead of ReqMgr2.
For WMStats, we can retrieve data in bulk.
wmaSpec = wmaSpecs[key] | ||
workloadHelper = WMWorkloadHelper() | ||
workloadHelper.load(wmaSpec) | ||
taskIterator = workloadHelper.taskIterator() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for iterating over the object ourselves. You can simply call the relevant WMWorkload object method, e.g.:
https://github.com/dmwm/WMCore/blob/master/src/python/WMCore/WMSpec/WMWorkload.py#L676
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we do need to iterate to check if sitelist needs to be changed, otherwise we only do assignment without knowing if it is required. But I don't mind to make the assignments. The reason I structured the code to identify which task must be updated and therefore when update happens we should push back the task to whatever place it should be.
Can one of the admins verify this patch? |
Jenkins results:
|
Jenkins results:
|
Jenkins results:
|
Jenkins results:
|
Jenkins results:
|
e9a371c
to
c3e0a2b
Compare
Jenkins results:
|
9236b4b
to
7c5dce9
Compare
Jenkins results:
|
7c5dce9
to
98c3804
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks good to me. I left just tow comments inline
'reduce': False}) | ||
elementsToUpdate = [x['id'] for x in data.get('rows', [])] | ||
if elementsToUpdate: | ||
self.updateElements(*elementsToUpdate, SiteWhiteList=siteWhiteList, SiteBlackList=siteBlackList) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a note here, not a request for changing anything. When it comes to the global Workqueue we are about to attempt to create a method to update the workqueue elements per workflow :
WMCore/src/python/WMCore/Services/WorkQueue/WorkQueue.py
Lines 259 to 291 in 1f78a64
def updateElementsByWorkflow(self, wf, updateParams, status=None): | |
""" | |
Update all available WorkQueue elements of a given workflow with a set | |
of arguments provided through the `updateParams` dictionary | |
:param wf: The workflow name | |
:param updateParams: A dictionary with parameters to be updated | |
:param status: A list of allowed WorkQueue elements statuses to be considered for updating | |
Default: None - do not filter by status | |
:return: No value, raises exceptions from internal methods in case of errors. | |
""" | |
# Fetch the whole view with Workqueue elements per given workflow | |
data = self.db.loadView('WorkQueue', 'elementsDetailByWorkflowAndStatus', | |
{'startkey': [wf], 'endkey': [wf, {}], | |
'reduce': False}) | |
# Fetch only a list of WorkQueue element Ids && Filter them by allowed status | |
if status: | |
elementsToUpdate = [x['id'] for x in data.get('rows', []) if x['value']['Status'] in status] | |
else: | |
elementsToUpdate = [x['id'] for x in data.get('rows', [])] | |
# Update all WorkQueue elements with the parameters provided in a single push | |
if elementsToUpdate: | |
self.updateElements(*elementsToUpdate, **updateParams) | |
# Update the spec, if it exists | |
if self.db.documentExists(wf): | |
wmspec = WMWorkloadHelper() | |
wmspec.load(self.hostWithAuth + "/%s/%s/spec" % (self.db.name, wf)) | |
wmspec.updateWorkloadArgs(updateParams) | |
dummy_values = {'name': wmspec.name()} | |
wmspec.saveCouch(self.hostWithAuth, self.db.name, dummy_values) | |
return |
And yet another method to update the full specs with all wmspec parameters at a single push. For the later the following methods was developed, but still not merged in the master branch:
WMCore/src/python/WMCore/WMSpec/WMWorkload.py
Lines 78 to 121 in 1f78a64
def updateWorkloadArgs(self, reqArgs): | |
""" | |
Method to take a dictionary of arguments of the type: | |
{reqArg1: value, | |
reqArg2: value, | |
...} | |
and update the workload by a predefined map of reqArg to setter methods. | |
:param reqArgs: A Dictionary of request arguments to be updated | |
:return: Nothing, Raises an error of type WMWorkloadException if | |
fails to apply the proper setter method | |
""" | |
# NOTE: So far we support only a single argument setter methods, like | |
# setSiteWhitelist or setPriority. This may change in the future, | |
# but it will require a change in the logic of how we validate and | |
# call the proper setter methods bellow. | |
# populate the current instance settersMap | |
self.settersMap['RequestPriority'] = setterTuple('RequestPriority', self.setPriority, inspect.signature(self.setPriority)) | |
self.settersMap['SiteBlacklist'] = setterTuple('SiteBlacklist', self.setSiteBlacklist, inspect.signature(self.setSiteBlacklist)) | |
self.settersMap['SiteWhitelist'] = setterTuple('SiteWhitelist', self.setSiteWhitelist, inspect.signature(self.setSiteWhitelist)) | |
# First validate if we can properly call the setter function given the reqArgs passed. | |
for reqArg, argValue in reqArgs.items(): | |
if not self.settersMap.get(reqArg, None): | |
msg = f"Unsupported or missing setter method for updating reqArg: {reqArg}." | |
raise WMWorkloadException(msg) | |
try: | |
self.settersMap[reqArg].setterSignature.bind(argValue) | |
except TypeError as ex: | |
msg = f"Setter's method signature does not match the method calls we currently support: Error: req{str(ex)}" | |
raise WMWorkloadException(msg) from None | |
# Now go through the reqArg again and call every setter method according to the map | |
for reqArg, argValue in reqArgs.items(): | |
try: | |
self.settersMap[reqArg].setterFunc(argValue) | |
except Exception as ex: | |
currFrame = inspect.currentframe() | |
argsInfo = inspect.getargvalues(currFrame) | |
argVals = {arg: argsInfo.locals.get(arg) for arg in argsInfo.args} | |
msg = f"Failure while calling setter method {self.settersMap[reqArg].setterFunc.__name__} " | |
msg += f"With arguments: {argVals}" | |
msg += f"Full exception string: {str(ex)}" | |
raise WMWorkloadException(msg) from None |
I know the scale here while working with the local queue is far from what we see in the global workqueue, but maybe once we converge on the other PR you may consider using the this same method here as well. If not anything else it at least
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, ,noted
Jenkins results:
|
601f325
to
358c344
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Valentin, please find my comments along the code.
In addition to those, I would suggest to use wmstatsserver
instead of ReqMgr2, such that we can drastically reduce the amount of HTTP calls made during each cycle.
An example url would be: https://cmsweb.cern.ch/wmstatsserver/data/filtered_requests?RequestStatus=running-open&mask=SiteWhitelist&mask=SiteBlacklist
and I would suggest to repeat it for RequestStatus=acquired
as well.
For that, I think we can simply use this wrapper method:
https://github.com/dmwm/WMCore/blob/master/src/python/WMCore/Services/WMStatsServer/WMStatsServer.py#L92
instead of writing the pycurl calls from scratch.
from WMCore.WorkerThreads.BaseWorkerThread import BaseWorkerThread | ||
|
||
|
||
class SiteListUpdater(Harness): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you are basically creating a new component called SiteListUpdater.
Instead, pleas remove this code and add a worker thread (SiteListUpdatePoller) to the component manager thread:
https://github.com/dmwm/WMCore/blob/master/src/python/WMComponent/WorkflowUpdater/WorkflowUpdater.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, moved SiteListUpdatePoller into WorkflowUpdater.
self.logger = myThread.logger | ||
|
||
# the reqmgr2Url should be points to ReqMgr2 data services, i.e. /reqmgr2 end-point | ||
self.reqmgrUrl = getattr(config.SiteListUpdater, "reqmgr2Url") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As briefly discussed in different moments, I think we should rely on WMStats cached data to reduce the amount of calls by orders of magnitude. I will give you further details by the end of this review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, switching to wmstats then
def getActiveWorkflows(self): | ||
""" | ||
Provide list of active requests in a system (from assigned to running) | ||
obtained the agent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"in"?
And the line below seems incorrect to me, as this method seem to be returning the workflow name and its spec path (not a dict)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, adjusted
] | ||
return requests, wmaSpecs | ||
|
||
def getRequestSpecs(self, requests): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not to get it again, but to get it from WMStats instead of ReqMgr2.
For WMStats, we can retrieve data in bulk.
@@ -237,6 +237,31 @@ def cancelWorkflow(self, wf): | |||
elements = [x['id'] for x in data.get('rows', []) if x['key'][1] not in nonCancelableElements] | |||
return self.updateElements(*elements, Status='CancelRequested') | |||
|
|||
def updateSiteLists(self, wf, siteWhiteList, siteBlackList): | |||
""" | |||
Update site lists of a workflow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please improve this docstring? Please refer to https://github.com/dmwm/WMCore/blob/master/CONTRIBUTING.rst#project-docstrings-best-practices
In addition, I think we should make both sitewhitelist and siteblacklist optional, as a workflow could be updating only one of those lists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what to improve as docstring complaint with contributing requirements, i.e. it provides description of input parameters and what is returned by the function. If further improvements in docstring itself is required please be specific.
Meanwhile, I converted site/black lists to be optional parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Short description of the method can likely be better, e.g. "Update site list parameters in elements matching a given workflow and a list of element statuses.
In addition, parameters are supposed to mention the data type, as written in the document I linked above.
except Exception as ex: | ||
msg = f"Caught unexpected exception in SiteListUpdater. Details:\n{str(ex)}" | ||
logging.exception(msg) | ||
raise LocalWorkqueueUpdateException(msg) from None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we raise this exception, we end up breaking the cycle. Instead, we should record the error but proceed with the other workflows in the pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, switched to continue
|
||
# update workload only if we updated local WorkQueue | ||
# update site white/black lists together | ||
wHelper.setWhitelist(siteWhiteList) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, we should only call this if there was an actual change to these lists. The if statement above does not distinguish which of the 2 lists was changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
||
# save back pickle file | ||
newPklFileName = pklFileName.split('.pkl')[0] + '_new.pkl' | ||
wHelper.save(newPklFileName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest to simply save the workload changes in its expected location, hence no need for additional file and checks that can add new failure scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the code is correct, the check is required as newPklFileName
may not be written, e.g. disk is full, in fact we need even better check to see that the size of newPklFileName
is comparable with old one to make sure that file content is written.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Valentin, we use this module for 10+ years. This code is very noisy and there is no need at all for all these concerns and checks. If the node runs out of filesystem storage, everything else would be broken, including SQL/NoSQL databases. Keep it simple please.
except Exception as ex: | ||
msg = f"Caught unexpected exception in SiteListUpdater. Details:\n{str(ex)}" | ||
logging.exception(msg) | ||
raise SpecFileUpdateException(msg) from None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the comment made for the other exception. We cannot break the whole cycle, instead, the component needs to keep processing further workflows and simply skip the one where we had problems updating the list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, switched to continue
358c344
to
f5cbee0
Compare
test this please |
2 similar comments
test this please |
test this please |
It looks like tests ran with your last trigger, as we can see here: but Jenkins failed to publish those results back to the PR. Maybe @d-ylee and/or @khurtado are doing something that might have cause this(?) |
test this please |
@amaltaro , I give up with awaiting Jenkins post to this PR. I looked at https://cmssdt.cern.ch/dmwm-jenkins/job/DMWM-WMCore-PR-test/15310/ and style wise everything is fine. The reported unit tests failure do not seem to be relevant here either. Therefore I'm asking for your review again to speed up things. |
test this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Valentin, changes are coming to a final shape. However, please look into my comments along the code.
@@ -0,0 +1,164 @@ | |||
#!/usr/bin/env python | |||
""" | |||
File : SiteListUpdater |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to keep things more consistent across components, perhaps naming this as SiteListPoller
would be better.
for state in states: | ||
url = "{}/data/filtered_requests?RequestStatus={}&mask=SiteWhitelist&mask=SiteBlacklist".format(self.wmstatsUrl, state) | ||
urls.append(url) | ||
response = getdata(urls, ckey(), cert()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the wrapper module for interacting with the WMStats Server. There is this API that reaches the endpoint you need:
https://github.com/dmwm/WMCore/blob/master/src/python/WMCore/Services/WMStatsServer/WMStatsServer.py#L92
wmaWhiteList = wHelper.getSiteWhiteList() | ||
wmaBlackList = wHelper.getSiteBlackList() | ||
if set(wmaWhiteList) != set(siteWhiteList) or set(wmaBlackList) != set(siteBlackList): | ||
self.logger.info(f"Updating {wflow}: siteWhiteList {wmaWhiteList} => {siteWhiteList} and siteBlackList {wmaBlackList} => {siteBlackList}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest breaking this log record into multiple lines. Something like:
Updating {wflow}:
SiteWhiteList {wmaWhiteList} => {siteWhiteList}
SiteBlackList {wmaBlackList} => {siteBlackList}
# update local WorkQueue first | ||
self.localWQ.updateSiteLists(wflow, siteWhiteList, siteBlackList) | ||
except Exception as ex: | ||
msg = f"Caught unexpected exception in SiteListUpdater. Details:\n{str(ex)}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are executing one method here, so I would suggest to make the error message less generic. E.g.: "Unexpected exception while updating elements in local workqueue....
|
||
# save back pickle file | ||
newPklFileName = pklFileName.split('.pkl')[0] + '_new.pkl' | ||
wHelper.save(newPklFileName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Valentin, we use this module for 10+ years. This code is very noisy and there is no need at all for all these concerns and checks. If the node runs out of filesystem storage, everything else would be broken, including SQL/NoSQL databases. Keep it simple please.
@@ -237,6 +237,31 @@ def cancelWorkflow(self, wf): | |||
elements = [x['id'] for x in data.get('rows', []) if x['key'][1] not in nonCancelableElements] | |||
return self.updateElements(*elements, Status='CancelRequested') | |||
|
|||
def updateSiteLists(self, wf, siteWhiteList, siteBlackList): | |||
""" | |||
Update site lists of a workflow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Short description of the method can likely be better, e.g. "Update site list parameters in elements matching a given workflow and a list of element statuses.
In addition, parameters are supposed to mention the data type, as written in the document I linked above.
data = self.db.loadView('WorkQueue', 'elementsDetailByWorkflowAndStatus', | ||
{'startkey': [wf], 'endkey': [wf, {}], | ||
'reduce': False}) | ||
elementsToUpdate = [x['id'] for x in data.get('rows', [])] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To filter it in the query, I think we would have to pass in a tuple with the (wflow_name, status). However, I actually think we should use a cheaper view like:
https://cmsweb.cern.ch/couchdb/workqueue/_design/WorkQueue/_view/jobStatusByRequest?reduce=false
then iterate over the rows and compare the Status
field with what we want.
@khurtado did you hard-wire any tests to this specific PR? I am surprised to see that most of the other PRs are returning with jenkins report, but this one. Looking into the log that was running this morning for this PR: I see specifically:
These tests have been persistently failing for 5 days now. Can you please look into this? |
Fixes #12039
Status
In development
Description
Provide site list updates for WMAgents with the following logic:
- obtain list of current active workflows from the agent using WMBS database
- we extract workflow name and pickle file name (the spec)
- requests their JSON specs from upstream ReqMgr2 server
- check and if necessary update site lists of all active workflows
- push new specs to the agent local WorkQueue and update spec pickle file
Please note: This code will be used with polling cycle method, i.e. it will run as separate thread within WorkflowUpdater component. As such, it can have potentially impact on:
Is it backward compatible (if not, which system it affects?)
YES
Related PRs
External dependencies / deployment changes