-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Module to update site lists for WMAgents #12123
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
#!/usr/bin/env python | ||
""" | ||
File : SiteListUpdater | ||
Author : Valentin Kuznetsov <vkuznet AT gmail dot com> | ||
Description: module to update of site lists within a WMAgent | ||
""" | ||
|
||
# system modules | ||
import os | ||
import json | ||
import shutil | ||
import logging | ||
import threading | ||
|
||
# WMCore modules | ||
from Utils.CertTools import ckey, cert | ||
from Utils.Timers import timeFunction | ||
from WMCore.Agent.Harness import Harness | ||
from WMCore.DAOFactory import DAOFactory | ||
from WMCore.Services.pycurl_manager import getdata | ||
from WMCore.Services.WorkQueue.WorkQueue import WorkQueue | ||
from WMCore.WMException import WMException | ||
from WMCore.WMSpec.WMWorkload import WMWorkloadHelper | ||
from WMCore.WorkerThreads.BaseWorkerThread import BaseWorkerThread | ||
|
||
|
||
class SiteListUpdaterPoller(BaseWorkerThread): | ||
def __init__(self, config): | ||
""" | ||
Initialize SiteListUpdaterPoller object | ||
:param config: a Configuration object with the component configuration | ||
""" | ||
BaseWorkerThread.__init__(self) | ||
myThread = threading.currentThread() | ||
self.logger = myThread.logger | ||
|
||
# the reqmgr2Url should be points to ReqMgr2 data services, i.e. /reqmgr2 end-point | ||
self.wmstatsUrl = getattr(config.SiteListUpdater, "wmstatsUrl") | ||
|
||
# provide access to WMBS in local WMAgent | ||
self.daoFactory = DAOFactory(package="WMCore.WMBS", | ||
logger=myThread.logger, | ||
dbinterface=myThread.dbi) | ||
# DB function to retrieve active workflows | ||
self.listActiveWflows = self.daoFactory(classname="Workflow.GetUnfinishedWorkflows") | ||
|
||
# local WorkQueue service | ||
self.localCouchUrl = self.config.WorkQueueManager.couchurl | ||
self.localWQ = WorkQueue(self.localCouchUrl, | ||
self.config.WorkQueueManager.dbname) | ||
|
||
def getActiveWorkflows(self): | ||
""" | ||
Provide list of active requests within WMAgent | ||
:return: dict of workflows names vs pickle files | ||
""" | ||
# get list of active workflows in WMAgent | ||
wflowSpecs = self.listActiveWflows.execute() | ||
|
||
# construct dictionary of workflow names and their pickle files | ||
wmaSpecs = {} | ||
for wflowSpec in wflowSpecs: | ||
name = wflowSpec['name'] # this is the name of workflow | ||
pklFileName = wflowSpec['spec'] # the "spec" in WMBS table (wmbs_workflow.spec) is pkl file name | ||
wmaSpecs[name] = pklFileName | ||
amaltaro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return wmaSpecs | ||
|
||
def getRequestSpecs(self, requests): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even though we are going to need it to fetch the workflow spec itself, I think we will have to implement a call to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why? if we fetch the whole spec it contains site lists, why do we need to get it again from wmstatserver? I just don't understand the reason. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not to get it again, but to get it from WMStats instead of ReqMgr2. |
||
""" | ||
Return list of requests specs for provided list of request names | ||
:param requests: list of workflow requests names | ||
:return wsList: list of workflow records obtained from wmstats server, each record has the following structure | ||
{"RequestName": "bla", "SiteWhitelist":[], "SiteBlacklist": []} | ||
""" | ||
# get list of workflows from wmstats | ||
states = ['running-open', 'acquired'] | ||
urls = [] | ||
for state in states: | ||
url = "{}/data/filtered_requests?RequestStatus={}&mask=SiteWhitelist&mask=SiteBlacklist".format(self.wmstatsUrl, state) | ||
urls.append(url) | ||
response = getdata(urls, ckey(), cert()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use the wrapper module for interacting with the WMStats Server. There is this API that reaches the endpoint you need: |
||
wsList = [] | ||
for resp in response: | ||
data = json.loads(resp['data']) | ||
for rdict in data['result']: | ||
# rdict here has the following structure: list of records where each record is | ||
# {"RequestName": "bla", "SiteWhitelist":[], "SiteBlacklist": []} | ||
wflow = rdict['RequestName'] | ||
# check that our workflow is in our requests list | ||
if wflow in requests: | ||
wsList.append(rdict) | ||
return wsList | ||
|
||
@timeFunction | ||
def algorithm(self, parameters=None): | ||
""" | ||
Perform the following logic: | ||
- obtain list of current active workflows from the agent | ||
- requests their specs from upstream ReqMgr2 server | ||
- update site lists of all workflows | ||
- push new specs to the agent local WorkQueue and update pickle spec file | ||
|
||
:return: none | ||
""" | ||
# get list of active workflows from the agent, the returned dict | ||
# is composed by workflow names and associated pickle file (data comes from WMBS) | ||
wmaSpecs = self.getActiveWorkflows() | ||
wflows = wmaSpecs.keys() | ||
|
||
# obtain workflow records from wmstats server | ||
wsList = self.getRequestData(wflows) | ||
|
||
# iterate over workflow items and update local WorkQueue and pickle files if | ||
# either site white or black lists are different | ||
for rdict in wsList: | ||
wflow = rdict['RequestName'] | ||
siteWhiteList = rdict['SiteWhitelist'] | ||
siteBlackList = rdict['SiteBlacklist'] | ||
|
||
# get the name of pkl file from wma spec | ||
pklFileName = wmaSpecs[wflow] | ||
|
||
# create wrapper helper and load pickle file | ||
wHelper = WMWorkloadHelper() | ||
wHelper.load(pklFileName) | ||
|
||
# extract from pickle spec both white and black site lists and compare them | ||
# to one we received from upstream service (ReqMgr2) | ||
wmaWhiteList = wHelper.getSiteWhiteList() | ||
wmaBlackList = wHelper.getSiteBlackList() | ||
if set(wmaWhiteList) != set(siteWhiteList) or set(wmaBlackList) != set(siteBlackList): | ||
self.logger.info(f"Updating {wflow}: siteWhiteList {wmaWhiteList} => {siteWhiteList} and siteBlackList {wmaBlackList} => {siteBlackList}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest breaking this log record into multiple lines. Something like:
|
||
try: | ||
# update local WorkQueue first | ||
self.localWQ.updateSiteLists(wflow, siteWhiteList, siteBlackList) | ||
except Exception as ex: | ||
msg = f"Caught unexpected exception in SiteListUpdater. Details:\n{str(ex)}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are executing one method here, so I would suggest to make the error message less generic. E.g.: "Unexpected exception while updating elements in local workqueue.... |
||
logging.exception(msg) | ||
continue | ||
|
||
# update workload only if we updated local WorkQueue | ||
# update site white/black lists together | ||
if set(wmaWhiteList) != set(siteWhiteList): | ||
wHelper.setWhitelist(siteWhiteList) | ||
if set(wmaBlackList) != set(siteBlackList): | ||
wHelper.setBlacklist(siteBlackList) | ||
|
||
try: | ||
# persist the spec in local CouchDB | ||
self.logger.info(f"Updating {self.localCouchUrl} with new site lists for {wflow}") | ||
wHelper.saveCouchUrl(self.localCouchUrl) | ||
|
||
# save back pickle file | ||
newPklFileName = pklFileName.split('.pkl')[0] + '_new.pkl' | ||
wHelper.save(newPklFileName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd suggest to simply save the workload changes in its expected location, hence no need for additional file and checks that can add new failure scenarios. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the code is correct, the check is required as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Valentin, we use this module for 10+ years. This code is very noisy and there is no need at all for all these concerns and checks. If the node runs out of filesystem storage, everything else would be broken, including SQL/NoSQL databases. Keep it simple please. |
||
|
||
# if new pickle file is saved we can swap it with original one | ||
if os.path.getsize(newPklFileName) > 0: | ||
self.logger.info(f"Updated {pklFileName}") | ||
shutil.move(newPklFileName, pklFileName) | ||
except Exception as ex: | ||
msg = f"Caught unexpected exception in SiteListUpdater. Details:\n{str(ex)}" | ||
logging.exception(msg) | ||
continue |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -237,6 +237,33 @@ def cancelWorkflow(self, wf): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
elements = [x['id'] for x in data.get('rows', []) if x['key'][1] not in nonCancelableElements] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return self.updateElements(*elements, Status='CancelRequested') | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def updateSiteLists(self, wf, siteWhiteList=None, siteBlackList=None): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Update site lists of a workflow | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please improve this docstring? Please refer to https://github.com/dmwm/WMCore/blob/master/CONTRIBUTING.rst#project-docstrings-best-practices In addition, I think we should make both sitewhitelist and siteblacklist optional, as a workflow could be updating only one of those lists. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure what to improve as docstring complaint with contributing requirements, i.e. it provides description of input parameters and what is returned by the function. If further improvements in docstring itself is required please be specific. Meanwhile, I converted site/black lists to be optional parameters. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Short description of the method can likely be better, e.g. "Update site list parameters in elements matching a given workflow and a list of element statuses. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
:param wf: workflow name | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
:param siteWhiteList: new site white list (optional) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
:param siteBlackList: new site black list (optional) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
:return: None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Update elements in Available status | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
data = self.db.loadView('WorkQueue', 'elementsDetailByWorkflowAndStatus', | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
{'startkey': [wf], 'endkey': [wf, {}], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
'reduce': False}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
elementsToUpdate = [x['id'] for x in data.get('rows', [])] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am thinking that we should filter these elements by Available and perhaps Acquired status. For now, can you filter them by the former only? Further thought is needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know how to apply filter, I followed logic of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To filter it in the query, I think we would have to pass in a tuple with the (wflow_name, status). However, I actually think we should use a cheaper view like: then iterate over the rows and compare the |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if elementsToUpdate: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
self.updateElements(*elementsToUpdate, SiteWhiteList=siteWhiteList, SiteBlackList=siteBlackList) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a note here, not a request for changing anything. When it comes to the global Workqueue we are about to attempt to create a method to update the workqueue elements per workflow : WMCore/src/python/WMCore/Services/WorkQueue/WorkQueue.py Lines 259 to 291 in 1f78a64
And yet another method to update the full specs with all wmspec parameters at a single push. For the later the following methods was developed, but still not merged in the master branch: WMCore/src/python/WMCore/WMSpec/WMWorkload.py Lines 78 to 121 in 1f78a64
I know the scale here while working with the local queue is far from what we see in the global workqueue, but maybe once we converge on the other PR you may consider using the this same method here as well. If not anything else it at least There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, ,noted |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# Update the spec, if it exists | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if self.db.documentExists(wf): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
wmspec = WMWorkloadHelper() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
# update local workqueue couchDB | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
wmspec.load(self.hostWithAuth + "/%s/%s/spec" % (self.db.name, wf)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please clarify where would this document be located at? Is it supposed to be local or global workqueue? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The WorkQueueManager in WMAgent points to local workqueue couchdb URL. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
wmspec.setSiteWhiteList(siteWhiteList) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
wmspec.setSiteBlackList(siteBlackList) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
dummy_values = {'name': wmspec.name()} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
wmspec.saveCouch(self.hostWithAuth, self.db.name, dummy_values) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
def updatePriority(self, wf, priority): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"""Update priority of a workflow, this implies | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
updating the spec and the priority of the Available elements""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -673,6 +673,15 @@ def removeTask(self, taskName): | |
self.data.tasks.tasklist.remove(taskName) | ||
return | ||
|
||
def getSiteWhitelist(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please provide unit tests. |
||
""" | ||
Get the site white list from our data | ||
:return: site white list | ||
""" | ||
if getattr(self.data, 'SiteWhiteList', None): | ||
return getattr(self.data, "SiteWhiteList") | ||
return [] | ||
|
||
def setSiteWhitelist(self, siteWhitelist): | ||
""" | ||
_setSiteWhitelist_ | ||
|
@@ -689,6 +698,15 @@ def setSiteWhitelist(self, siteWhitelist): | |
|
||
return | ||
|
||
def getSiteBlacklist(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need unit tests for this. |
||
""" | ||
Get the site black list from our data | ||
:return: site black list | ||
""" | ||
if getattr(self.data, 'SiteBlackList', None): | ||
return getattr(self.data, "SiteBlackList") | ||
return [] | ||
|
||
def setSiteBlacklist(self, siteBlacklist): | ||
""" | ||
_setSiteBlacklist_ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to keep things more consistent across components, perhaps naming this as
SiteListPoller
would be better.