Skip to content

Commit

Permalink
Add new module to manage site list updates
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznet committed Oct 2, 2024
1 parent 4924193 commit c3e0a2b
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 0 deletions.
143 changes: 143 additions & 0 deletions src/python/WMComponent/WorkflowUpdater/SiteListUpdater.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#!/usr/bin/env python
"""
File : SiteListUpdater
Author : Valentin Kuznetsov <vkuznet AT gmail dot com>
Description: provides module to manage update of site lists
"""

# system modules
import os
import json
import shutil
import threading

# WMCore modules
from Utils.Timers import timeFunction
from Utils.CertTools import ckey, cert
from WMCore.Services.pycurl_manager import getdata
from WMCore.WMSpec.WMWorkload import WMWorkloadHelper
from WMCore.Services.WorkQueue.WorkQueue import WorkQueue
from WMCore.WorkerThreads.BaseWorkerThread import BaseWorkerThread
from WMCore.DAOFactory import DAOFactory


class SiteListUpdater(BaseWorkerThread):
def __init__(self, config):
"""
Initialize SiteListUpdater object
:param config: a Configuration object with the component configuration
"""
BaseWorkerThread.__init__(self)
myThread = threading.currentThread()
self.logger = myThread.logger

# the reqmgr2Url should be points to ReqMgr2 data services, i.e. /reqmgr2 end-point
self.reqmgrUrl = getattr(config.SiteListUpdater, "reqmgr2Url")

# provide access to WMBS in local WMAgent
self.daoFactory = DAOFactory(package="WMCore.WMBS",
logger=myThread.logger,
dbinterface=myThread.dbi)
# DB function to retrieve active workflows
self.listActiveWflows = self.daoFactory(classname="Workflow.GetUnfinishedWorkflows")

# local WorkQueue service
self.localCouchUrl = self.config.WorkQueueManager.couchurl
self.localWQ = WorkQueue(self.localCouchUrl,
self.config.WorkQueueManager.dbname)

def getActiveWorkflowss(self):
"""
Provide list of active requests in a system (from assigned to running)
obtained the agent
:return: list of workflows names and dict
"""
# get list of active workflows in WMAgent
wflowSpecs = self.listActiveWflows.execute()

# construct dictionary of workflow names and their pickle files
wmaSpecs = {}
for wflowSpec in wflowSpecs:
name = wflowSpec['name'] # this is the name of workflow
pklFileName = wflowSpec['spec'] # the "spec" in WMBS table (wmbs_workflow.spec) is pkl file name
wmaSpecs[name] = pklFileName
return wmaSpecs

def getRequestSpecs(self, requests):
"""
Return list of requests specs for provided list of request names
:param requests: list of workflow requests names
:return specs: dictionary of specs obtained from upstream ReqMgr2 server
"""
urls = []
for name in requests:
url = "{}/data/request?name={}".format(self.reqmgrUrl, name)
urls.append(url)
response = getdata(urls, ckey(), cert())
specs = {}
for resp in response:
data = json.loads(resp['data'])
for rdict in data['result']:
specs.update(rdict)
return specs

@timeFunction
def algorithm(self, parameters=None):
"""
Executed in every polling cycle. The actual logic of the component is:
- obtain list of current active workflows from the agent
- requests their specs from upstream ReqMgr2 server
- update site lists of all workflows
- push new specs to the agent local WorkQueue and update pickle spec file
:return: none
"""
# get list of active workflows from the agent, the returned dict
# is composed by workflow names and associated pickle file (data comes from WMBS)
wmaSpecs = self.getActiveWorkflowss()
wflows = wmaSpecs.keys()

# obtain workflow specs from upstream ReqMgr2 server, the returned dict
# is composed by workflow names and JSON dict representing the workflow
specs = self.getRequestSpecs(wflows)

# iterate over workflow items and update local WorkQueue and pickle files if
# either site white or black lists are different
for wflow, rdict in specs.items():
siteWhiteList = rdict['SiteWhitelist']
siteBlackList = rdict['SiteBlacklist']

# get the name of pkl file from wma spec
pklFileName = wmaSpecs[wflow]

# create wrapper helper and load pickle file
wHelper = WMWorkloadHelper()
wHelper.load(pklFileName)

# extract from pickle spec both white and black site lists and compare them
# to one we received from upstream service (ReqMgr2)
wmaWhiteList = wHelper.getSiteWhiteList()
wmaBlackList = wHelper.getSiteBlackList()
if wmaWhiteList != siteWhiteList or wmaBlackList != siteBlackList:
self.logger.info(f"Updating {wflow}: siteWhiteList {wmaWhiteList} => {siteWhiteList} and siteBlackList {wmaBlackList} => {siteBlackList}")
# update local WorkQueue first
self.localWQ.updateSiteLists(wflow, siteWhiteList, siteBlackList)

# update workload only if we updated local WorkQueue
# update site white/black lists together
wHelper.setWhitelist(siteWhiteList)
wHelper.setBlacklist(siteBlackList)

# persist the spec in local CouchDB
self.logger.info(f"Updating {self.localCouchUrl} with new site lists for {wflow}")
wHelper.saveCouchUrl(self.localCouchUrl)

# save back pickle file
newPklFileName = pklFileName.split('.pkl')[0] + '_new.pkl'
wHelper.save(newPklFileName)

# if new pickle file is saved we can swap it with original one
if os.path.getsize(newPklFileName) > 0:
self.logger.info(f"Updated {pkfFileName}")
shutil.move(newPklFileName, pklFileName)

25 changes: 25 additions & 0 deletions src/python/WMCore/Services/WorkQueue/WorkQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,31 @@ def cancelWorkflow(self, wf):
elements = [x['id'] for x in data.get('rows', []) if x['key'][1] not in nonCancelableElements]
return self.updateElements(*elements, Status='CancelRequested')

def updateSiteLists(self, wf, siteWhiteList, siteBlackList):
"""
Update site lists of a workflow
:param wf: workflow name
:param siteWhiteList: new site white list
:param siteBlackList: new site black list
:return: None
"""
# Update elements in Available status
data = self.db.loadView('WorkQueue', 'elementsDetailByWorkflowAndStatus',
{'startkey': [wf], 'endkey': [wf, {}],
'reduce': False})
elementsToUpdate = [x['id'] for x in data.get('rows', [])]
if elementsToUpdate:
self.updateElements(*elementsToUpdate, SiteWhiteList=siteWhiteList, SiteBlackList=siteBlackList)
# Update the spec, if it exists
if self.db.documentExists(wf):
wmspec = WMWorkloadHelper()
wmspec.load(self.hostWithAuth + "/%s/%s/spec" % (self.db.name, wf))
wmspec.setSiteWhiteList(siteWhiteList)
wmspec.setSiteBlackList(siteBlackList)
dummy_values = {'name': wmspec.name()}
wmspec.saveCouch(self.hostWithAuth, self.db.name, dummy_values)
return

def updatePriority(self, wf, priority):
"""Update priority of a workflow, this implies
updating the spec and the priority of the Available elements"""
Expand Down
18 changes: 18 additions & 0 deletions src/python/WMCore/WMSpec/WMWorkload.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,15 @@ def removeTask(self, taskName):
self.data.tasks.tasklist.remove(taskName)
return

def getSiteWhitelist(self):
"""
Get the site white list from our data
:return: site white list
"""
if getattr(self.data, 'SiteWhiteList', None):
return getattr(self.data, "SiteWhiteList")
return []

def setSiteWhitelist(self, siteWhitelist):
"""
_setSiteWhitelist_
Expand All @@ -689,6 +698,15 @@ def setSiteWhitelist(self, siteWhitelist):

return

def getSiteBlacklist(self):
"""
Get the site black list from our data
:return: site black list
"""
if getattr(self.data, 'SiteBlackList', None):
return getattr(self.data, "SiteBlackList")
return []

def setSiteBlacklist(self, siteBlacklist):
"""
_setSiteBlacklist_
Expand Down

0 comments on commit c3e0a2b

Please sign in to comment.