Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add types to async_helpers #8260

Merged
merged 4 commits into from
Sep 8, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ files =
synapse/http/federation/well_known_resolver.py,
synapse/http/server.py,
synapse/http/site.py,
synapse/logging/,
synapse/logging,
synapse/metrics,
synapse/module_api,
synapse/notifier.py,
Expand All @@ -54,6 +54,7 @@ files =
synapse/storage/util,
synapse/streams,
synapse/types.py,
synapse/util/async_helpers.py,
synapse/util/caches/descriptors.py,
synapse/util/caches/stream_change_cache.py,
synapse/util/metrics.py,
Expand Down
45 changes: 25 additions & 20 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import collections
import logging
from contextlib import contextmanager
from typing import Dict, Sequence, Set, Union
from typing import Dict, Set

import attr
from typing_extensions import ContextManager
Expand Down Expand Up @@ -188,13 +188,21 @@ def yieldable_gather_results(func, iter, *args, **kwargs):
).addErrback(unwrapFirstError)


@attr.s(slots=True)
class _LinearizerEntry:
# The number of things executing.
count = attr.ib(type=int)
# Deferreds for the things bocked from executing.
clokep marked this conversation as resolved.
Show resolved Hide resolved
deferreds = attr.ib(type=collections.OrderedDict)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not awaitables. Deferreds are Awaitables too, right?

Or is this because the rest of the file refers to them as deferreds?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it is actually a Deferred, not just an Awaitable (and since this is internal to the functionality of the Linearizer it seems better to be specific).

new_defer = make_deferred_yieldable(defer.Deferred())



class Linearizer:
"""Limits concurrent access to resources based on a key. Useful to ensure
only a few things happen at a time on a given resource.

Example:

with (yield limiter.queue("test_key")):
with await limiter.queue("test_key"):
# do some work.

"""
Expand All @@ -216,13 +224,8 @@ def __init__(self, name=None, max_count=1, clock=None):
self._clock = clock
self.max_count = max_count

# key_to_defer is a map from the key to a 2 element list where
# the first element is the number of things executing, and
# the second element is an OrderedDict, where the keys are deferreds for the
# things blocked from executing.
self.key_to_defer = (
{}
) # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]]
# key_to_defer is a map from the key to a _LinearizerEntry.
self.key_to_defer = {} # type: Dict[str, _LinearizerEntry]

def is_queued(self, key) -> bool:
"""Checks whether there is a process queued up waiting
Expand All @@ -234,25 +237,27 @@ def is_queued(self, key) -> bool:

# There are waiting deferreds only in the OrderedDict of deferreds is
# non-empty.
return bool(entry[1])
return bool(entry.deferreds)

def queue(self, key):
# we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
# (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not
# propagated inside inlineCallbacks until Twisted 18.7)
entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
entry = self.key_to_defer.setdefault(
key, _LinearizerEntry(0, collections.OrderedDict())
)

# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
# When one of the things currently executing finishes it will callback
# this item so that it can continue executing.
if entry[0] >= self.max_count:
if entry.count >= self.max_count:
res = self._await_lock(key)
else:
logger.debug(
"Acquired uncontended linearizer lock %r for key %r", self.name, key
)
entry[0] += 1
entry.count += 1
res = defer.succeed(None)

# once we successfully get the lock, we need to return a context manager which
Expand All @@ -267,15 +272,15 @@ def _ctx_manager(_):

# We've finished executing so check if there are any things
# blocked waiting to execute and start one of them
entry[0] -= 1
entry.count -= 1

if entry[1]:
(next_def, _) = entry[1].popitem(last=False)
if entry.deferreds:
(next_def, _) = entry.deferreds.popitem(last=False)

# we need to run the next thing in the sentinel context.
with PreserveLoggingContext():
next_def.callback(None)
elif entry[0] == 0:
elif entry.count == 0:
# We were the last thing for this key: remove it from the
# map.
del self.key_to_defer[key]
Expand All @@ -298,11 +303,11 @@ def _await_lock(self, key):
logger.debug("Waiting to acquire linearizer lock %r for key %r", self.name, key)

new_defer = make_deferred_yieldable(defer.Deferred())
entry[1][new_defer] = 1
entry.deferreds[new_defer] = 1

def cb(_r):
logger.debug("Acquired linearizer lock %r for key %r", self.name, key)
entry[0] += 1
entry.count += 1

# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
Expand Down Expand Up @@ -331,7 +336,7 @@ def eb(e):
)

# we just have to take ourselves back out of the queue.
del entry[1][new_defer]
del entry.deferreds[new_defer]
return e

new_defer.addCallbacks(cb, eb)
Expand Down