Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix threadsafety in ThreadedMemoryReactorClock (#8497)
Browse files Browse the repository at this point in the history
This could, very occasionally, cause:

```
tests.test_visibility.FilterEventsForServerTestCase.test_large_room
===============================================================================
[ERROR]
Traceback (most recent call last):
  File "/src/tests/rest/media/v1/test_media_storage.py", line 86, in test_ensure_media_is_in_local_cache
    self.wait_on_thread(x)
  File "/src/tests/unittest.py", line 296, in wait_on_thread
    self.reactor.advance(0.01)
  File "/src/.tox/py35/lib/python3.5/site-packages/twisted/internet/task.py", line 826, in advance
    self._sortCalls()
  File "/src/.tox/py35/lib/python3.5/site-packages/twisted/internet/task.py", line 787, in _sortCalls
    self.calls.sort(key=lambda a: a.getTime())
builtins.ValueError: list modified during sort

tests.rest.media.v1.test_media_storage.MediaStorageTests.test_ensure_media_is_in_local_cache
```
  • Loading branch information
richvdh authored Oct 9, 2020
1 parent ca2db5d commit 9789b1f
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
1 change: 1 addition & 0 deletions changelog.d/8497.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a threadsafety bug in unit tests.
36 changes: 32 additions & 4 deletions tests/server.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import json
import logging
from collections import deque
from io import SEEK_END, BytesIO
from typing import Callable

import attr
from typing_extensions import Deque
from zope.interface import implementer

from twisted.internet import address, threads, udp
Expand Down Expand Up @@ -251,6 +254,7 @@ def __init__(self):
self._tcp_callbacks = {}
self._udp = []
lookups = self.lookups = {}
self._thread_callbacks = deque() # type: Deque[Callable[[], None]]()

@implementer(IResolverSimple)
class FakeResolver:
Expand All @@ -272,10 +276,10 @@ def callFromThread(self, callback, *args, **kwargs):
"""
Make the callback fire in the next reactor iteration.
"""
d = Deferred()
d.addCallback(lambda x: callback(*args, **kwargs))
self.callLater(0, d.callback, True)
return d
cb = lambda: callback(*args, **kwargs)
# it's not safe to call callLater() here, so we append the callback to a
# separate queue.
self._thread_callbacks.append(cb)

def getThreadPool(self):
return self.threadpool
Expand Down Expand Up @@ -303,6 +307,30 @@ def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):

return conn

def advance(self, amount):
# first advance our reactor's time, and run any "callLater" callbacks that
# makes ready
super().advance(amount)

# now run any "callFromThread" callbacks
while True:
try:
callback = self._thread_callbacks.popleft()
except IndexError:
break
callback()

# check for more "callLater" callbacks added by the thread callback
# This isn't required in a regular reactor, but it ends up meaning that
# our database queries can complete in a single call to `advance` [1] which
# simplifies tests.
#
# [1]: we replace the threadpool backing the db connection pool with a
# mock ThreadPool which doesn't really use threads; but we still use
# reactor.callFromThread to feed results back from the db functions to the
# main thread.
super().advance(0)


class ThreadPool:
"""
Expand Down

0 comments on commit 9789b1f

Please sign in to comment.