From 23d72f341ee00d17cc32b00bb65c666b249d569f Mon Sep 17 00:00:00 2001 From: Paul Grunewald Date: Tue, 9 Apr 2024 23:38:21 +0200 Subject: [PATCH] Refactor and optimize get_breaches code --- plone/app/linkintegrity/browser/info.py | 88 ++++++++++--------------- 1 file changed, 36 insertions(+), 52 deletions(-) diff --git a/plone/app/linkintegrity/browser/info.py b/plone/app/linkintegrity/browser/info.py index dda6e5a..8952661 100644 --- a/plone/app/linkintegrity/browser/info.py +++ b/plone/app/linkintegrity/browser/info.py @@ -14,6 +14,7 @@ from zope.i18n import translate import logging +import warnings logger = logging.getLogger(__name__) @@ -50,17 +51,26 @@ def get_breaches(self, items=None): items = [self.context] catalog = getToolByName(self.context, "portal_catalog") results = [] - uids_to_ignore = [] + uids_to_ignore = set() uids_visited = set() self.breach_count = {} + brains_to_delete = [] + path2obj = dict() + path2brains = dict() + + # build various helper structures for obj in items: obj_path = "/".join(obj.getPhysicalPath()) - brains_to_delete = catalog(path={"query": obj_path}) + path2obj[obj_path] = obj + path2brains[obj_path] = brains_to_delete = catalog(path={"query": obj_path}) # add the current items uid and all its children's uids to the # list of uids that are ignored - uids_to_ignore.extend([i.UID for i in brains_to_delete]) + uids_to_ignore.update([i.UID for i in brains_to_delete]) + + # determine breaches + for obj_path, obj in path2obj.items(): + brains_to_delete = path2brains[obj_path] # prepare the collection of breaches for current item - get_breaches_for_item = None for brain_to_delete in brains_to_delete: try: obj_to_delete = brain_to_delete.getObject() # noqa @@ -69,10 +79,9 @@ def get_breaches(self, items=None): "No object found for %s! Skipping", brain_to_delete ) continue - if get_breaches_for_item is None: - get_breaches_for_item = self.get_breaches_for_item(obj) - for breach in get_breaches_for_item: - add_breach = False + # look into potential breach + breach = self.check_object(obj=obj_to_delete, excluded_paths=set(path2obj.keys())) + if breach: for source in breach["sources"]: # Only add the breach if one the sources is not in the # list of items that are to be deleted. @@ -80,11 +89,10 @@ def get_breaches(self, items=None): source["uid"] not in uids_to_ignore and source["uid"] not in uids_visited ): - add_breach = True + results.append(breach) uids_visited.add(source["uid"]) break - if add_breach: - results.append(breach) + if IFolder.providedBy(obj): count = len(catalog(path={"query": obj_path})) count_dirs = len(catalog(path={"query": obj_path}, is_folderish=True)) @@ -94,19 +102,6 @@ def get_breaches(self, items=None): if count: self.breach_count[obj_path] = [count, count_dirs, count_public] - # Cleanup: Some breaches where added before it was known - # that their source will be deleted too. - for result in results: - for source in result["sources"]: - if source["uid"] in uids_to_ignore: - # Drop sources that are also being deleted - result["sources"].remove(source) - if not result["sources"]: - # Remove the breach is there are no more sources - # This check is necessary since there can be multiple - # sources for a breach - results.remove(result) - # De-duplicate targets * sources uid_target = {} uid_sources = defaultdict(list) @@ -129,35 +124,24 @@ def get_breaches_for_item(self, obj=None): Breaches coming from the children of a folder are ignored by default. """ - if obj is None: - obj = self.context - results = [] - catalog = getToolByName(obj, "portal_catalog") - obj_path = "/".join(obj.getPhysicalPath()) - - breaches = self.check_object(obj) - if breaches: - results.append(breaches) - - if IFolder.providedBy(obj): - brains = catalog(path={"query": obj_path}) - for brain in brains: - try: - child = brain.getObject() - except (AttributeError, KeyError): - continue - if child == obj: - continue - breaches = self.check_object(obj=child, excluded_path=obj_path) - if breaches: - results.append(breaches) - self.breaches = results - return results - - def check_object(self, obj, excluded_path=None): + # BBB: No direct usage is known, but keep this for backwards compatibility. + # Sooner or later, we should use only one method. + warnings.warn("""Using `get_breaches_for_item` is deprecated. Use `get_breaches` + instead.""", DeprecationWarning) + if obj is not None: + obj = [obj] + return self.get_breaches(obj) + + def check_object(self, obj, excluded_path=None, excluded_paths=None): """Check one object for breaches. - Breaches originating from excluded_path are ignored. + Breaches originating from excluded_paths are ignored. """ + # BBB: Support old and new parameters likewise + if excluded_paths is None: + excluded_paths = set() + if excluded_path: + excluded_paths.add(excluded_path) + breaches = {} direct_links = getIncomingLinks(obj, from_attribute=None) has_breaches = False @@ -166,7 +150,7 @@ def check_object(self, obj, excluded_path=None): if not source_path: # link is broken continue - if excluded_path and source_path.startswith(excluded_path): + if any([source_path.startswith(excluded_path) for excluded_path in excluded_paths]): # source is in excluded_path continue source = direct_link.from_object