Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Module to update site lists for WMAgents #12123

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions src/python/WMComponent/WorkflowUpdater/SiteListUpdater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#!/usr/bin/env python
"""
File : SiteListUpdater
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to keep things more consistent across components, perhaps naming this as SiteListPoller would be better.

Author : Valentin Kuznetsov <vkuznet AT gmail dot com>
Description: module to update of site lists within a WMAgent
"""

# system modules
import os
import json
import shutil
import logging
import threading

# WMCore modules
from Utils.CertTools import ckey, cert
from Utils.Timers import timeFunction
from WMCore.Agent.Harness import Harness
from WMCore.DAOFactory import DAOFactory
from WMCore.Services.pycurl_manager import getdata
from WMCore.Services.WorkQueue.WorkQueue import WorkQueue
from WMCore.WMException import WMException
from WMCore.WMSpec.WMWorkload import WMWorkloadHelper
from WMCore.WorkerThreads.BaseWorkerThread import BaseWorkerThread


class SiteListUpdaterPoller(BaseWorkerThread):
def __init__(self, config):
"""
Initialize SiteListUpdaterPoller object
:param config: a Configuration object with the component configuration
"""
BaseWorkerThread.__init__(self)
myThread = threading.currentThread()
self.logger = myThread.logger

# the reqmgr2Url should be points to ReqMgr2 data services, i.e. /reqmgr2 end-point
self.wmstatsUrl = getattr(config.SiteListUpdater, "wmstatsUrl")

# provide access to WMBS in local WMAgent
self.daoFactory = DAOFactory(package="WMCore.WMBS",
logger=myThread.logger,
dbinterface=myThread.dbi)
# DB function to retrieve active workflows
self.listActiveWflows = self.daoFactory(classname="Workflow.GetUnfinishedWorkflows")

# local WorkQueue service
self.localCouchUrl = self.config.WorkQueueManager.couchurl
self.localWQ = WorkQueue(self.localCouchUrl,
self.config.WorkQueueManager.dbname)

def getActiveWorkflows(self):
"""
Provide list of active requests within WMAgent
:return: dict of workflows names vs pickle files
"""
# get list of active workflows in WMAgent
wflowSpecs = self.listActiveWflows.execute()

# construct dictionary of workflow names and their pickle files
wmaSpecs = {}
for wflowSpec in wflowSpecs:
name = wflowSpec['name'] # this is the name of workflow
pklFileName = wflowSpec['spec'] # the "spec" in WMBS table (wmbs_workflow.spec) is pkl file name
wmaSpecs[name] = pklFileName
amaltaro marked this conversation as resolved.
Show resolved Hide resolved
return wmaSpecs

def getRequestSpecs(self, requests):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though we are going to need it to fetch the workflow spec itself, I think we will have to implement a call to the wmstatserver service and retrieve the site lists with a filter+mask call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? if we fetch the whole spec it contains site lists, why do we need to get it again from wmstatserver? I just don't understand the reason.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not to get it again, but to get it from WMStats instead of ReqMgr2.
For WMStats, we can retrieve data in bulk.

"""
Return list of requests specs for provided list of request names
:param requests: list of workflow requests names
:return wsList: list of workflow records obtained from wmstats server, each record has the following structure
{"RequestName": "bla", "SiteWhitelist":[], "SiteBlacklist": []}
"""
# get list of workflows from wmstats
states = ['running-open', 'acquired']
urls = []
for state in states:
url = "{}/data/filtered_requests?RequestStatus={}&mask=SiteWhitelist&mask=SiteBlacklist".format(self.wmstatsUrl, state)
urls.append(url)
response = getdata(urls, ckey(), cert())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the wrapper module for interacting with the WMStats Server. There is this API that reaches the endpoint you need:
https://github.com/dmwm/WMCore/blob/master/src/python/WMCore/Services/WMStatsServer/WMStatsServer.py#L92

wsList = []
for resp in response:
data = json.loads(resp['data'])
for rdict in data['result']:
# rdict here has the following structure: list of records where each record is
# {"RequestName": "bla", "SiteWhitelist":[], "SiteBlacklist": []}
wflow = rdict['RequestName']
# check that our workflow is in our requests list
if wflow in requests:
wsList.append(rdict)
return wsList

@timeFunction
def algorithm(self, parameters=None):
"""
Perform the following logic:
- obtain list of current active workflows from the agent
- requests their specs from upstream ReqMgr2 server
- update site lists of all workflows
- push new specs to the agent local WorkQueue and update pickle spec file

:return: none
"""
# get list of active workflows from the agent, the returned dict
# is composed by workflow names and associated pickle file (data comes from WMBS)
wmaSpecs = self.getActiveWorkflows()
wflows = wmaSpecs.keys()

# obtain workflow records from wmstats server
wsList = self.getRequestData(wflows)

# iterate over workflow items and update local WorkQueue and pickle files if
# either site white or black lists are different
for rdict in wsList:
wflow = rdict['RequestName']
siteWhiteList = rdict['SiteWhitelist']
siteBlackList = rdict['SiteBlacklist']

# get the name of pkl file from wma spec
pklFileName = wmaSpecs[wflow]

# create wrapper helper and load pickle file
wHelper = WMWorkloadHelper()
wHelper.load(pklFileName)

# extract from pickle spec both white and black site lists and compare them
# to one we received from upstream service (ReqMgr2)
wmaWhiteList = wHelper.getSiteWhiteList()
wmaBlackList = wHelper.getSiteBlackList()
if set(wmaWhiteList) != set(siteWhiteList) or set(wmaBlackList) != set(siteBlackList):
self.logger.info(f"Updating {wflow}: siteWhiteList {wmaWhiteList} => {siteWhiteList} and siteBlackList {wmaBlackList} => {siteBlackList}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest breaking this log record into multiple lines. Something like:

Updating {wflow}:
    SiteWhiteList {wmaWhiteList} => {siteWhiteList}
    SiteBlackList {wmaBlackList} => {siteBlackList}

try:
# update local WorkQueue first
self.localWQ.updateSiteLists(wflow, siteWhiteList, siteBlackList)
except Exception as ex:
msg = f"Caught unexpected exception in SiteListUpdater. Details:\n{str(ex)}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are executing one method here, so I would suggest to make the error message less generic. E.g.: "Unexpected exception while updating elements in local workqueue....

logging.exception(msg)
continue

# update workload only if we updated local WorkQueue
# update site white/black lists together
if set(wmaWhiteList) != set(siteWhiteList):
wHelper.setWhitelist(siteWhiteList)
if set(wmaBlackList) != set(siteBlackList):
wHelper.setBlacklist(siteBlackList)

try:
# persist the spec in local CouchDB
self.logger.info(f"Updating {self.localCouchUrl} with new site lists for {wflow}")
wHelper.saveCouchUrl(self.localCouchUrl)

# save back pickle file
newPklFileName = pklFileName.split('.pkl')[0] + '_new.pkl'
wHelper.save(newPklFileName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to simply save the workload changes in its expected location, hence no need for additional file and checks that can add new failure scenarios.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code is correct, the check is required as newPklFileName may not be written, e.g. disk is full, in fact we need even better check to see that the size of newPklFileName is comparable with old one to make sure that file content is written.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valentin, we use this module for 10+ years. This code is very noisy and there is no need at all for all these concerns and checks. If the node runs out of filesystem storage, everything else would be broken, including SQL/NoSQL databases. Keep it simple please.


# if new pickle file is saved we can swap it with original one
if os.path.getsize(newPklFileName) > 0:
self.logger.info(f"Updated {pklFileName}")
shutil.move(newPklFileName, pklFileName)
except Exception as ex:
msg = f"Caught unexpected exception in SiteListUpdater. Details:\n{str(ex)}"
logging.exception(msg)
continue
4 changes: 4 additions & 0 deletions src/python/WMComponent/WorkflowUpdater/WorkflowUpdater.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ def preInitialization(self):
myThread = threading.currentThread()
myThread.workerThreadManager.addWorker(WorkflowUpdaterPoller(self.config),
pollInterval)

myThread = threading.currentThread()
myThread.workerThreadManager.addWorker(SiteListUpdaterPoller(self.config),
pollInterval)
27 changes: 27 additions & 0 deletions src/python/WMCore/Services/WorkQueue/WorkQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,33 @@ def cancelWorkflow(self, wf):
elements = [x['id'] for x in data.get('rows', []) if x['key'][1] not in nonCancelableElements]
return self.updateElements(*elements, Status='CancelRequested')

def updateSiteLists(self, wf, siteWhiteList=None, siteBlackList=None):
"""
Update site lists of a workflow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please improve this docstring? Please refer to https://github.com/dmwm/WMCore/blob/master/CONTRIBUTING.rst#project-docstrings-best-practices

In addition, I think we should make both sitewhitelist and siteblacklist optional, as a workflow could be updating only one of those lists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what to improve as docstring complaint with contributing requirements, i.e. it provides description of input parameters and what is returned by the function. If further improvements in docstring itself is required please be specific.

Meanwhile, I converted site/black lists to be optional parameters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short description of the method can likely be better, e.g. "Update site list parameters in elements matching a given workflow and a list of element statuses.
In addition, parameters are supposed to mention the data type, as written in the document I linked above.


:param wf: workflow name
:param siteWhiteList: new site white list (optional)
:param siteBlackList: new site black list (optional)
:return: None
"""
# Update elements in Available status
data = self.db.loadView('WorkQueue', 'elementsDetailByWorkflowAndStatus',
{'startkey': [wf], 'endkey': [wf, {}],
'reduce': False})
elementsToUpdate = [x['id'] for x in data.get('rows', [])]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking that we should filter these elements by Available and perhaps Acquired status. For now, can you filter them by the former only? Further thought is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how to apply filter, I followed logic of updatePriority function. Please point me to an example how filters are applied within CouchDB.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To filter it in the query, I think we would have to pass in a tuple with the (wflow_name, status). However, I actually think we should use a cheaper view like:
https://cmsweb.cern.ch/couchdb/workqueue/_design/WorkQueue/_view/jobStatusByRequest?reduce=false

then iterate over the rows and compare the Status field with what we want.

if elementsToUpdate:
self.updateElements(*elementsToUpdate, SiteWhiteList=siteWhiteList, SiteBlackList=siteBlackList)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note here, not a request for changing anything. When it comes to the global Workqueue we are about to attempt to create a method to update the workqueue elements per workflow :

def updateElementsByWorkflow(self, wf, updateParams, status=None):
"""
Update all available WorkQueue elements of a given workflow with a set
of arguments provided through the `updateParams` dictionary
:param wf: The workflow name
:param updateParams: A dictionary with parameters to be updated
:param status: A list of allowed WorkQueue elements statuses to be considered for updating
Default: None - do not filter by status
:return: No value, raises exceptions from internal methods in case of errors.
"""
# Fetch the whole view with Workqueue elements per given workflow
data = self.db.loadView('WorkQueue', 'elementsDetailByWorkflowAndStatus',
{'startkey': [wf], 'endkey': [wf, {}],
'reduce': False})
# Fetch only a list of WorkQueue element Ids && Filter them by allowed status
if status:
elementsToUpdate = [x['id'] for x in data.get('rows', []) if x['value']['Status'] in status]
else:
elementsToUpdate = [x['id'] for x in data.get('rows', [])]
# Update all WorkQueue elements with the parameters provided in a single push
if elementsToUpdate:
self.updateElements(*elementsToUpdate, **updateParams)
# Update the spec, if it exists
if self.db.documentExists(wf):
wmspec = WMWorkloadHelper()
wmspec.load(self.hostWithAuth + "/%s/%s/spec" % (self.db.name, wf))
wmspec.updateWorkloadArgs(updateParams)
dummy_values = {'name': wmspec.name()}
wmspec.saveCouch(self.hostWithAuth, self.db.name, dummy_values)
return

And yet another method to update the full specs with all wmspec parameters at a single push. For the later the following methods was developed, but still not merged in the master branch:
def updateWorkloadArgs(self, reqArgs):
"""
Method to take a dictionary of arguments of the type:
{reqArg1: value,
reqArg2: value,
...}
and update the workload by a predefined map of reqArg to setter methods.
:param reqArgs: A Dictionary of request arguments to be updated
:return: Nothing, Raises an error of type WMWorkloadException if
fails to apply the proper setter method
"""
# NOTE: So far we support only a single argument setter methods, like
# setSiteWhitelist or setPriority. This may change in the future,
# but it will require a change in the logic of how we validate and
# call the proper setter methods bellow.
# populate the current instance settersMap
self.settersMap['RequestPriority'] = setterTuple('RequestPriority', self.setPriority, inspect.signature(self.setPriority))
self.settersMap['SiteBlacklist'] = setterTuple('SiteBlacklist', self.setSiteBlacklist, inspect.signature(self.setSiteBlacklist))
self.settersMap['SiteWhitelist'] = setterTuple('SiteWhitelist', self.setSiteWhitelist, inspect.signature(self.setSiteWhitelist))
# First validate if we can properly call the setter function given the reqArgs passed.
for reqArg, argValue in reqArgs.items():
if not self.settersMap.get(reqArg, None):
msg = f"Unsupported or missing setter method for updating reqArg: {reqArg}."
raise WMWorkloadException(msg)
try:
self.settersMap[reqArg].setterSignature.bind(argValue)
except TypeError as ex:
msg = f"Setter's method signature does not match the method calls we currently support: Error: req{str(ex)}"
raise WMWorkloadException(msg) from None
# Now go through the reqArg again and call every setter method according to the map
for reqArg, argValue in reqArgs.items():
try:
self.settersMap[reqArg].setterFunc(argValue)
except Exception as ex:
currFrame = inspect.currentframe()
argsInfo = inspect.getargvalues(currFrame)
argVals = {arg: argsInfo.locals.get(arg) for arg in argsInfo.args}
msg = f"Failure while calling setter method {self.settersMap[reqArg].setterFunc.__name__} "
msg += f"With arguments: {argVals}"
msg += f"Full exception string: {str(ex)}"
raise WMWorkloadException(msg) from None

I know the scale here while working with the local queue is far from what we see in the global workqueue, but maybe once we converge on the other PR you may consider using the this same method here as well. If not anything else it at least

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, ,noted

# Update the spec, if it exists
if self.db.documentExists(wf):
wmspec = WMWorkloadHelper()
# update local workqueue couchDB
wmspec.load(self.hostWithAuth + "/%s/%s/spec" % (self.db.name, wf))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please clarify where would this document be located at? Is it supposed to be local or global workqueue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WorkQueueManager in WMAgent points to local workqueue couchdb URL.

wmspec.setSiteWhiteList(siteWhiteList)
wmspec.setSiteBlackList(siteBlackList)
dummy_values = {'name': wmspec.name()}
wmspec.saveCouch(self.hostWithAuth, self.db.name, dummy_values)
return

def updatePriority(self, wf, priority):
"""Update priority of a workflow, this implies
updating the spec and the priority of the Available elements"""
Expand Down
18 changes: 18 additions & 0 deletions src/python/WMCore/WMSpec/WMWorkload.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,15 @@ def removeTask(self, taskName):
self.data.tasks.tasklist.remove(taskName)
return

def getSiteWhitelist(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide unit tests.

"""
Get the site white list from our data
:return: site white list
"""
if getattr(self.data, 'SiteWhiteList', None):
return getattr(self.data, "SiteWhiteList")
return []

def setSiteWhitelist(self, siteWhitelist):
"""
_setSiteWhitelist_
Expand All @@ -689,6 +698,15 @@ def setSiteWhitelist(self, siteWhitelist):

return

def getSiteBlacklist(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need unit tests for this.

"""
Get the site black list from our data
:return: site black list
"""
if getattr(self.data, 'SiteBlackList', None):
return getattr(self.data, "SiteBlackList")
return []

def setSiteBlacklist(self, siteBlacklist):
"""
_setSiteBlacklist_
Expand Down
31 changes: 31 additions & 0 deletions test/python/WMCore_t/WMSpec_t/WMWorkload_t.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,37 @@ def testDbsUrl(self):
self.assertEqual(url, "https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader")
return

def testGetSiteWhitelist(self):
"""
Teats getSiteWhitelist and getSiteBlackList functionality of the task.
"""
testWorkload = WMWorkloadHelper(WMWorkload("TestWorkload"))

procTestTask = testWorkload.newTask("ProcessingTask")
procTestTaskCMSSW = procTestTask.makeStep("cmsRun1")
procTestTaskCMSSW.setStepType("CMSSW")

procTestTask.addInputDataset(name="/PrimaryDataset/ProcessedDataset/DATATIER",
primary="PrimaryDataset",
processed="ProcessedDataset",
tier="DATATIER",
block_whitelist=["Block1", "Block2"],
black_blacklist=["Block3"],
run_whitelist=[1, 2],
run_blacklist=[3])

newSiteWhiteList = ["T1_US_FNAL", "T0_CH_CERN"]
newSiteBlackList = ["T1_DE_KIT"]
testWorkload.setSiteWhitelist(newSiteWhiteList)
testWorkload.setSiteBlacklist([newSiteBlackList)

siteWhiteList = procTestTask.getSiteWhitelist()
siteBlackList = procTestTask.getSiteWhitelist()
self.assertTrue(set(newSiteWhiteList) == set(siteWhiteList),
"Error: Site white list mismatch")
self.assertTrue(set(newSiteBlackList) == set(siteBlackList),
"Error: Site black list mismatch")

def testWhiteBlacklists(self):
"""
_testWhiteBlacklists_
Expand Down