Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory trace for DataCacheUpdate CP thread #11907

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
6 changes: 6 additions & 0 deletions src/python/WMCore/REST/Format.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import gzip
from builtins import str, bytes, object
from memory_profiler import profile

from Utils.PythonVersion import PY3
from Utils.Utilities import encodeUnicodeToBytes, encodeUnicodeToBytesConditional
Expand Down Expand Up @@ -552,6 +553,7 @@ def _etag_match(status, etagval, match, nomatch):
if nomatch and ("*" in nomatch or etagval in nomatch):
raise cherrypy.HTTPRedirect([], 304)

@profile
def _etag_tail(head, tail, etag):
"""Generator which first returns anything in `head`, then `tail`.
Sets ETag header at the end to value of `etag` if it's defined and
Expand All @@ -566,6 +568,7 @@ def _etag_tail(head, tail, etag):
if etagval:
cherrypy.response.headers["ETag"] = etagval

@profile
def stream_maybe_etag(size_limit, etag, reply):
"""Maybe generate ETag header for the response, and handle If-Match
and If-None-Match request headers. Consumes the reply until at most
Expand All @@ -589,6 +592,8 @@ def stream_maybe_etag(size_limit, etag, reply):
req = cherrypy.request
res = cherrypy.response
match = [str(x) for x in (req.headers.elements('If-Match') or [])]
# FIXME TODO this apparently increases wmstatsserver memory
# footprint by half giga byte
nomatch = [str(x) for x in (req.headers.elements('If-None-Match') or [])]

# If ETag is already set, match conditions and output without buffering.
Expand All @@ -606,6 +611,7 @@ def stream_maybe_etag(size_limit, etag, reply):
# clients including browsers will ignore them.
size = 0
result = []
### FIXME TODO: this block apparently leaks memory
for chunk in reply:
result.append(chunk)
size += len(chunk)
Expand Down
5 changes: 5 additions & 0 deletions src/python/WMCore/REST/Server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import cherrypy
import inspect
from memory_profiler import profile
import os
import re
import signal
Expand Down Expand Up @@ -728,6 +729,7 @@ def metrics(self):
return encodeUnicodeToBytes(metrics)

@expose
@profile
def default(self, *args, **kwargs):
"""The HTTP request handler.

Expand Down Expand Up @@ -760,6 +762,7 @@ def default(self, *args, **kwargs):

default._cp_config = {'response.stream': True}

@profile
def _call(self, param):
"""The real HTTP request handler.

Expand Down Expand Up @@ -859,6 +862,8 @@ def _call(self, param):
# Format the response.
response.headers['X-REST-Status'] = 100
response.headers['Content-Type'] = format
# FIXME TODO this apparently increases wmstatsserver memory
# footprint by half giga byte
etagger = apiobj.get('etagger', None) or SHA1ETag()
reply = stream_compress(fmthandler(obj, etagger),
apiobj.get('compression', self.compression),
Expand Down
5 changes: 5 additions & 0 deletions src/python/WMCore/Services/WMStats/WMStatsReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from future.utils import viewitems

import logging
from memory_profiler import profile

from Utils.IteratorTools import nestedDictUpdate, grouper
from WMCore.Database.CMSCouch import CouchServer
from WMCore.Lexicon import splitCouchServiceURL, sanitizeURL
Expand Down Expand Up @@ -93,6 +95,7 @@ def setDefaultStaleOptions(self, options):
options.update(self.defaultStale)
return options

@profile
def getLatestJobInfoByRequests(self, requestNames):
jobInfoByRequestAndAgent = {}

Expand All @@ -101,6 +104,7 @@ def getLatestJobInfoByRequests(self, requestNames):
jobInfoByRequestAndAgent = self._getLatestJobInfo(requestAndAgentKey)
return jobInfoByRequestAndAgent

@profile
def _updateRequestInfoWithJobInfo(self, requestInfo):
if requestInfo:
jobInfoByRequestAndAgent = self.getLatestJobInfoByRequests(list(requestInfo))
Expand Down Expand Up @@ -304,6 +308,7 @@ def getT0ActiveData(self, jobInfoFlag=False):

return self.getRequestByStatus(T0_ACTIVE_STATUS, jobInfoFlag)

@profile
def getRequestByStatus(self, statusList, jobInfoFlag=False, limit=None, skip=None,
legacyFormat=False):

Expand Down
3 changes: 2 additions & 1 deletion src/python/WMCore/WMStats/CherryPyThreads/DataCacheUpdate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import (division, print_function)

import time
from memory_profiler import profile
from WMCore.REST.CherryPyPeriodicTask import CherryPyPeriodicTask
from WMCore.WMStats.DataStructs.DataCache import DataCache
from WMCore.Services.WMStats.WMStatsReader import WMStatsReader
Expand All @@ -10,7 +11,6 @@ class DataCacheUpdate(CherryPyPeriodicTask):

def __init__(self, rest, config):
self.getJobInfo = getattr(config, "getJobInfo", False)

super(DataCacheUpdate, self).__init__(config)

def setConcurrentTasks(self, config):
Expand All @@ -19,6 +19,7 @@ def setConcurrentTasks(self, config):
"""
self.concurrentTasks = [{'func': self.gatherActiveDataStats, 'duration': 300}]

@profile
def gatherActiveDataStats(self, config):
"""
gather active data statistics
Expand Down
3 changes: 3 additions & 0 deletions src/python/WMCore/WMStats/DataStructs/DataCache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from future.utils import viewitems

import time

from Utils.Utilities import getSize
from WMCore.ReqMgr.DataStructs.Request import RequestInfo, protectedLFNs

class DataCache(object):
Expand All @@ -22,6 +24,7 @@ def setDuration(sec):
@staticmethod
def getlatestJobData():
if (DataCache._lastedActiveDataFromAgent):
print(f"Size of DataCache: {getSize(DataCache._lastedActiveDataFromAgent)}")
return DataCache._lastedActiveDataFromAgent["data"]
else:
return {}
Expand Down
2 changes: 2 additions & 0 deletions src/python/WMCore/WMStats/Service/ActiveRequestJobInfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Just wait for the server cache to be updated
"""
from __future__ import (division, print_function)
from memory_profiler import profile
from WMCore.REST.Server import RESTEntity, restcall, rows
from WMCore.REST.Tools import tools
from WMCore.REST.Error import DataCacheEmpty
Expand All @@ -25,6 +26,7 @@ def validate(self, apiobj, method, api, param, safe):

@restcall(formats=[('text/plain', PrettyJSONFormat()), ('application/json', JSONFormat())])
@tools.expires(secs=-1)
@profile()
def get(self):
# This assumes DataCahe is periodically updated.
# If data is not updated, need to check, dataCacheUpdate log
Expand Down