Skip to content

Commit

Permalink
refactor: minor bugfix, and better naming
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Dec 10, 2024
1 parent 5394072 commit 53819ea
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 45 deletions.
5 changes: 2 additions & 3 deletions src/DIRAC/Core/Utilities/DictCache.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def __init__(self, soft_ttl: int, hard_ttl: int, *, max_workers: int = 10, max_i
self.futures: dict[str, Future] = {}
self.pool = ThreadPoolExecutor(max_workers=max_workers)

def get(self, key: str, populate_func: Callable[[], Any]):
def get(self, key: str, populate_func: Callable[[], Any]) -> dict:
"""Retrieve a value from the cache, populating it if necessary.
This method first checks the soft cache for the key. If not found,
Expand Down Expand Up @@ -328,7 +328,7 @@ def get(self, key: str, populate_func: Callable[[], Any]):
return result
# It is critical that ``future`` is waited for outside of the lock as
# _work aquires the lock before filling the caches. This also means
# we can gaurentee that the future has not yet been removed from the
# we can guarantee that the future has not yet been removed from the
# futures dict.
future = self.futures[key]
wait([future])
Expand All @@ -353,4 +353,3 @@ def _work(self, key: str, populate_func: Callable[[], Any]) -> None:
self.futures.pop(key)
self.hard_cache[key] = result
self.soft_cache[key] = result

87 changes: 45 additions & 42 deletions src/DIRAC/WorkloadManagementSystem/Client/Limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,72 +45,76 @@ def getNegativeCond(self):
orCond = self.condCache.get("GLOBAL")
if orCond:
return orCond
negCond = {}
negativeCondition = {}

# Run Limit
result = self.__opsHelper.getSections(self.__runningLimitSection)
sites = []
if result["OK"]:
sites = result["Value"]
for siteName in sites:
if not result["OK"]:
self.log.error("Issue getting running conditions", result["Message"])
sites_with_running_limits = []
else:
sites_with_running_limits = result["Value"]
self.log.verbose(f"Found running conditions for {len(sites_with_running_limits)} sites")

for siteName in sites_with_running_limits:
result = self.__getRunningCondition(siteName)
if not result["OK"]:
continue
data = result["Value"]
if data:
negCond[siteName] = data
self.log.error("Issue getting running conditions", result["Message"])
running_condition = {}
else:
running_condition = result["Value"]
if running_condition:
negativeCondition[siteName] = running_condition

# Delay limit
result = self.__opsHelper.getSections(self.__matchingDelaySection)
sites = []
if result["OK"]:
sites = result["Value"]
for siteName in sites:
result = self.__getDelayCondition(siteName)
if self.__opsHelper.getValue("JobScheduling/CheckMatchingDelay", True):
result = self.__opsHelper.getSections(self.__matchingDelaySection)
if not result["OK"]:
continue
data = result["Value"]
if not data:
continue
if siteName in negCond:
negCond[siteName] = self.__mergeCond(negCond[siteName], data)
self.log.error("Issue getting delay conditions", result["Message"])
sites_with_matching_delay = []
else:
negCond[siteName] = data
sites_with_matching_delay = result["Value"]
self.log.verbose(f"Found delay conditions for {len(sites_with_matching_delay)} sites")

for siteName in sites_with_matching_delay:
delay_condition = self.__getDelayCondition(siteName)
if siteName in negativeCondition:
negativeCondition[siteName] = self.__mergeCond(negativeCondition[siteName], delay_condition)
else:
negativeCondition[siteName] = delay_condition

orCond = []
for siteName in negCond:
negCond[siteName]["Site"] = siteName
orCond.append(negCond[siteName])
for siteName in negativeCondition:
negativeCondition[siteName]["Site"] = siteName
orCond.append(negativeCondition[siteName])
self.condCache.add("GLOBAL", 10, orCond)
return orCond

def getNegativeCondForSite(self, siteName, gridCE=None):
"""Generate a negative query based on the limits set on the site"""
# Check if Limits are imposed onto the site
negativeCond = {}
if self.__opsHelper.getValue("JobScheduling/CheckJobLimits", True):
result = self.__getRunningCondition(siteName)
if not result["OK"]:
self.log.error("Issue getting running conditions", result["Message"])
negativeCond = {}
else:
negativeCond = result["Value"]
self.log.verbose(
"Negative conditions for site", f"{siteName} after checking limits are: {str(negativeCond)}"
)
self.log.verbose(
"Negative conditions for site", f"{siteName} after checking limits are: {str(negativeCond)}"
)

if gridCE:
result = self.__getRunningCondition(siteName, gridCE)
if not result["OK"]:
self.log.error("Issue getting running conditions", result["Message"])
else:
negativeCondCE = result["Value"]
negativeCond = self.__mergeCond(negativeCond, negativeCondCE)
negativeCond = self.__mergeCond(negativeCond, result["Value"])

if self.__opsHelper.getValue("JobScheduling/CheckMatchingDelay", True):
result = self.__getDelayCondition(siteName)
if result["OK"]:
delayCond = result["Value"]
self.log.verbose(
"Negative conditions for site", f"{siteName} after delay checking are: {str(delayCond)}"
)
negativeCond = self.__mergeCond(negativeCond, delayCond)
delayCond = self.__getDelayCondition(siteName)
self.log.verbose("Negative conditions for site", f"{siteName} after delay checking are: {str(delayCond)}")
negativeCond = self.__mergeCond(negativeCond, delayCond)

if negativeCond:
self.log.info("Negative conditions for site", f"{siteName} are: {str(negativeCond)}")
Expand Down Expand Up @@ -230,14 +234,14 @@ def updateDelayCounters(self, siteName, jid):
def __getDelayCondition(self, siteName):
"""Get extra conditions allowing matching delay"""
if siteName not in self.delayMem:
return S_OK({})
return {}
lastRun = self.delayMem[siteName].getKeys()
negCond = {}
for attName, attValue in lastRun:
if attName not in negCond:
negCond[attName] = []
negCond[attName].append(attValue)
return S_OK(negCond)
return negCond

def _countsByJobType(self, siteName, attName):
result = self.jobDB.getCounters(
Expand All @@ -247,6 +251,5 @@ def _countsByJobType(self, siteName, attName):
)
if not result["OK"]:
return result
data = result["Value"]
data = {k[0][attName]: k[1] for k in data}
data = {k[0][attName]: k[1] for k in result["Value"]}
return data

0 comments on commit 53819ea

Please sign in to comment.