Skip to content

Commit

Permalink
Refactor and optimize get_breaches code
Browse files Browse the repository at this point in the history
  • Loading branch information
pgrunewald committed Apr 9, 2024
1 parent 2e7043d commit 23d72f3
Showing 1 changed file with 36 additions and 52 deletions.
88 changes: 36 additions & 52 deletions plone/app/linkintegrity/browser/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from zope.i18n import translate

import logging
import warnings


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -50,17 +51,26 @@ def get_breaches(self, items=None):
items = [self.context]
catalog = getToolByName(self.context, "portal_catalog")
results = []
uids_to_ignore = []
uids_to_ignore = set()
uids_visited = set()
self.breach_count = {}
brains_to_delete = []
path2obj = dict()
path2brains = dict()

# build various helper structures
for obj in items:
obj_path = "/".join(obj.getPhysicalPath())
brains_to_delete = catalog(path={"query": obj_path})
path2obj[obj_path] = obj
path2brains[obj_path] = brains_to_delete = catalog(path={"query": obj_path})
# add the current items uid and all its children's uids to the
# list of uids that are ignored
uids_to_ignore.extend([i.UID for i in brains_to_delete])
uids_to_ignore.update([i.UID for i in brains_to_delete])

# determine breaches
for obj_path, obj in path2obj.items():
brains_to_delete = path2brains[obj_path]
# prepare the collection of breaches for current item
get_breaches_for_item = None
for brain_to_delete in brains_to_delete:
try:
obj_to_delete = brain_to_delete.getObject() # noqa
Expand All @@ -69,22 +79,20 @@ def get_breaches(self, items=None):
"No object found for %s! Skipping", brain_to_delete
)
continue
if get_breaches_for_item is None:
get_breaches_for_item = self.get_breaches_for_item(obj)
for breach in get_breaches_for_item:
add_breach = False
# look into potential breach
breach = self.check_object(obj=obj_to_delete, excluded_paths=set(path2obj.keys()))
if breach:
for source in breach["sources"]:
# Only add the breach if one the sources is not in the
# list of items that are to be deleted.
if (
source["uid"] not in uids_to_ignore
and source["uid"] not in uids_visited
):
add_breach = True
results.append(breach)
uids_visited.add(source["uid"])
break
if add_breach:
results.append(breach)

if IFolder.providedBy(obj):
count = len(catalog(path={"query": obj_path}))
count_dirs = len(catalog(path={"query": obj_path}, is_folderish=True))
Expand All @@ -94,19 +102,6 @@ def get_breaches(self, items=None):
if count:
self.breach_count[obj_path] = [count, count_dirs, count_public]

# Cleanup: Some breaches where added before it was known
# that their source will be deleted too.
for result in results:
for source in result["sources"]:
if source["uid"] in uids_to_ignore:
# Drop sources that are also being deleted
result["sources"].remove(source)
if not result["sources"]:
# Remove the breach is there are no more sources
# This check is necessary since there can be multiple
# sources for a breach
results.remove(result)

# De-duplicate targets * sources
uid_target = {}
uid_sources = defaultdict(list)
Expand All @@ -129,35 +124,24 @@ def get_breaches_for_item(self, obj=None):
Breaches coming from the children of a folder are ignored by default.
"""
if obj is None:
obj = self.context
results = []
catalog = getToolByName(obj, "portal_catalog")
obj_path = "/".join(obj.getPhysicalPath())

breaches = self.check_object(obj)
if breaches:
results.append(breaches)

if IFolder.providedBy(obj):
brains = catalog(path={"query": obj_path})
for brain in brains:
try:
child = brain.getObject()
except (AttributeError, KeyError):
continue
if child == obj:
continue
breaches = self.check_object(obj=child, excluded_path=obj_path)
if breaches:
results.append(breaches)
self.breaches = results
return results

def check_object(self, obj, excluded_path=None):
# BBB: No direct usage is known, but keep this for backwards compatibility.
# Sooner or later, we should use only one method.
warnings.warn("""Using `get_breaches_for_item` is deprecated. Use `get_breaches`
instead.""", DeprecationWarning)
if obj is not None:
obj = [obj]
return self.get_breaches(obj)

def check_object(self, obj, excluded_path=None, excluded_paths=None):
"""Check one object for breaches.
Breaches originating from excluded_path are ignored.
Breaches originating from excluded_paths are ignored.
"""
# BBB: Support old and new parameters likewise
if excluded_paths is None:
excluded_paths = set()
if excluded_path:
excluded_paths.add(excluded_path)

breaches = {}
direct_links = getIncomingLinks(obj, from_attribute=None)
has_breaches = False
Expand All @@ -166,7 +150,7 @@ def check_object(self, obj, excluded_path=None):
if not source_path:
# link is broken
continue
if excluded_path and source_path.startswith(excluded_path):
if any([source_path.startswith(excluded_path) for excluded_path in excluded_paths]):
# source is in excluded_path
continue
source = direct_link.from_object
Expand Down

0 comments on commit 23d72f3

Please sign in to comment.