Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Convert Linearizer tests from inlineCallbacks to async
Browse files Browse the repository at this point in the history
Signed-off-by: Sean Quah <[email protected]>
  • Loading branch information
Sean Quah committed Apr 1, 2022
1 parent 45ce571 commit 7842add
Showing 1 changed file with 62 additions and 80 deletions.
142 changes: 62 additions & 80 deletions tests/util/test_linearizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
from twisted.internet.base import ReactorBase
from twisted.internet.defer import CancelledError, Deferred

from synapse.logging.context import LoggingContext, current_context
from synapse.util import Clock
from synapse.logging.context import (
LoggingContext,
current_context,
make_deferred_yieldable,
)
from synapse.util.async_helpers import Linearizer

from tests import unittest
Expand Down Expand Up @@ -67,27 +70,24 @@ def _pump(self) -> None:
while reactor.getDelayedCalls():
reactor.runUntilCurrent()

@defer.inlineCallbacks
def test_linearizer(self):
"""Tests that a task is queued up behind an earlier task."""
linearizer = Linearizer()

key = object()

d1 = linearizer.queue(key)
cm1 = yield d1
_, acquired_d1, unblock1 = self._start_task(linearizer, key)
self.assertTrue(acquired_d1.called)

d2 = linearizer.queue(key)
self.assertFalse(d2.called)
_, acquired_d2, unblock2 = self._start_task(linearizer, key)
self.assertFalse(acquired_d2.called)

# Once the first task is done, the second task can continue.
with cm1:
self.assertFalse(d2.called)
unblock1()
self.assertTrue(acquired_d2.called)

with (yield d2):
pass
unblock2()

@defer.inlineCallbacks
def test_linearizer_is_queued(self):
"""Tests `Linearizer.is_queued`.
Expand All @@ -97,31 +97,26 @@ def test_linearizer_is_queued(self):

key = object()

d1 = linearizer.queue(key)
cm1 = yield d1
_, acquired_d1, unblock1 = self._start_task(linearizer, key)
self.assertTrue(acquired_d1.called)

# Since the first task acquires the lock immediately, "is_queued" should return
# false.
self.assertFalse(linearizer.is_queued(key))

d2 = linearizer.queue(key)
self.assertFalse(d2.called)
_, acquired_d2, unblock2 = self._start_task(linearizer, key)
self.assertFalse(acquired_d2.called)

# Now the second task is queued up behind the first.
self.assertTrue(linearizer.is_queued(key))

with cm1:
self.assertFalse(d2.called)

# cm1 still not done, so d2 still queued.
self.assertTrue(linearizer.is_queued(key))
unblock1()

# And now the second task acquires the lock and nothing is in the queue again.
self.assertTrue(acquired_d2.called)
self.assertFalse(linearizer.is_queued(key))

with (yield d2):
self.assertFalse(linearizer.is_queued(key))

unblock2()
self.assertFalse(linearizer.is_queued(key))

def test_lots_of_queued_things(self):
Expand All @@ -130,106 +125,93 @@ def test_lots_of_queued_things(self):
The stack should *not* explode when the fast thing completes.
"""
linearizer = Linearizer()
key = ""

@defer.inlineCallbacks
def func(i, sleep=False):
async def func(i, wait_for=None) -> None:
with LoggingContext("func(%s)" % i) as lc:
with (yield linearizer.queue("")):
with (await linearizer.queue(key)):
self.assertEqual(current_context(), lc)
if sleep:
yield Clock(reactor).sleep(0)
if wait_for:
await make_deferred_yieldable(wait_for)

self.assertEqual(current_context(), lc)

func(0, sleep=True)
_, _, unblock = self._start_task(linearizer, key)
for i in range(1, 100):
func(i)
defer.ensureDeferred(func(i))

return func(1000)
d = defer.ensureDeferred(func(1000))
unblock()
self.successResultOf(d)

@defer.inlineCallbacks
def test_multiple_entries(self):
"""Tests a `Linearizer` with a concurrency above 1."""
limiter = Linearizer(max_count=3)

key = object()

d1 = limiter.queue(key)
cm1 = yield d1
_, acquired_d1, unblock1 = self._start_task(limiter, key)
self.assertTrue(acquired_d1.called)

d2 = limiter.queue(key)
cm2 = yield d2
_, acquired_d2, unblock2 = self._start_task(limiter, key)
self.assertTrue(acquired_d2.called)

d3 = limiter.queue(key)
cm3 = yield d3
_, acquired_d3, unblock3 = self._start_task(limiter, key)
self.assertTrue(acquired_d3.called)

# These next two tasks have to wait.
d4 = limiter.queue(key)
self.assertFalse(d4.called)
_, acquired_d4, unblock4 = self._start_task(limiter, key)
self.assertFalse(acquired_d4.called)

d5 = limiter.queue(key)
self.assertFalse(d5.called)
_, acquired_d5, unblock5 = self._start_task(limiter, key)
self.assertFalse(acquired_d5.called)

# Once the first task completes, the fourth task can continue.
with cm1:
self.assertFalse(d4.called)
self.assertFalse(d5.called)

cm4 = yield d4
self.assertFalse(d5.called)
unblock1()
self.assertTrue(acquired_d4.called)
self.assertFalse(acquired_d5.called)

# Once the third task completes, the fifth task can continue.
with cm3:
self.assertFalse(d5.called)

cm5 = yield d5
unblock3()
self.assertTrue(acquired_d5.called)

# Make all tasks finish.
with cm2:
pass

with cm4:
pass

with cm5:
pass
unblock2()
unblock4()
unblock5()

# The next task shouldn't have to wait.
d6 = limiter.queue(key)
with (yield d6):
pass
_, acquired_d6, unblock6 = self._start_task(limiter, key)
self.assertTrue(acquired_d6)
unblock6()

@defer.inlineCallbacks
def test_cancellation(self):
"""Tests cancellation while waiting for a `Linearizer`."""
linearizer = Linearizer()

key = object()

d1 = linearizer.queue(key)
cm1 = yield d1
d1, acquired_d1, unblock1 = self._start_task(linearizer, key)
self.assertTrue(acquired_d1.called)

# Create a second task, waiting for the first task.
d2 = linearizer.queue(key)
self.assertFalse(d2.called)
d2, acquired_d2, _ = self._start_task(linearizer, key)
self.assertFalse(acquired_d2.called)

# Create a third task, waiting for the second task.
d3 = linearizer.queue(key)
self.assertFalse(d3.called)
d3, acquired_d3, unblock3 = self._start_task(linearizer, key)
self.assertFalse(acquired_d3.called)

# Cancel the waiting second task.
d2.cancel()

with cm1:
pass
unblock1()
self.successResultOf(d1)

self.assertTrue(d2.called)
try:
yield d2
self.fail("Expected d2 to raise CancelledError")
except CancelledError:
pass
self.failureResultOf(d2, CancelledError)

# The third task should continue running.
with (yield d3):
pass
self.assertTrue(acquired_d3.called)
unblock3()
self.successResultOf(d3)

0 comments on commit 7842add

Please sign in to comment.