Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Extend StreamChangeCache to support multiple entities per stream ID #7303

Merged
merged 6 commits into from
Apr 22, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add support for multiple entities per stream id
richvdh committed Apr 17, 2020

Verified

This commit was signed with the committer’s verified signature.
snyk-bot Snyk bot
commit fa7bb6b911d496a461cf03a9e106f830018d6081
82 changes: 44 additions & 38 deletions synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@
# limitations under the License.

import logging
from typing import Collection, Dict, Iterable, Mapping, Optional, Set
from typing import Dict, Iterable, List, Mapping, Optional, Set

from six import integer_types

@@ -48,8 +48,8 @@ def __init__(
self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR)
self._entity_to_key = {} # type: Dict[EntityType, int]

# map from stream id to the entity which changed at that stream id.
self._cache = SortedDict() # type: SortedDict[int, EntityType]
# map from stream id to the a set of entities which changed at that stream id.
self._cache = SortedDict() # type: SortedDict[int, Set[EntityType]]

# the earliest stream_pos for which we can reliably answer
# get_all_entities_changed. In other words, one less than the earliest
@@ -92,16 +92,9 @@ def get_entities_changed(
position. Entities unknown to the cache will be returned. If the
position is too old it will just return the given list.
"""
assert type(stream_pos) is int

if stream_pos >= self._earliest_known_stream_pos:
changed_entities = {
self._cache[k]
for k in self._cache.islice(start=self._cache.bisect_right(stream_pos))
}

result = changed_entities.intersection(entities)

changed_entities = self.get_all_entities_changed(stream_pos)
if changed_entities is not None:
result = set(changed_entities).intersection(entities)
self.metrics.inc_hits()
else:
result = set(entities)
@@ -115,7 +108,7 @@ def has_any_entity_changed(self, stream_pos: int) -> bool:
assert type(stream_pos) is int

if not self._cache:
# If we have no cache, nothing can have changed.
# If the cache is empty, nothing can have changed.
return False

if stream_pos >= self._earliest_known_stream_pos:
@@ -125,42 +118,55 @@ def has_any_entity_changed(self, stream_pos: int) -> bool:
self.metrics.inc_misses()
return True

def get_all_entities_changed(
self, stream_pos: int
) -> Optional[Collection[EntityType]]:
"""Returns all entites that have had new things since the given
def get_all_entities_changed(self, stream_pos: int) -> Optional[List[EntityType]]:
"""Returns all entities that have had new things since the given
position. If the position is too old it will return None.

Returns the entities in the order that they were changed.
"""
assert type(stream_pos) is int

if stream_pos >= self._earliest_known_stream_pos:
return [
self._cache[k]
for k in self._cache.islice(start=self._cache.bisect_right(stream_pos))
]
else:
if stream_pos < self._earliest_known_stream_pos:
return None

changed_entities = [] # type: List[EntityType]

for k in self._cache.islice(start=self._cache.bisect_right(stream_pos)):
changed_entities.extend(self._cache[k])
return changed_entities

def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
"""Informs the cache that the entity has been changed at the given
position.
"""
assert type(stream_pos) is int

if stream_pos > self._earliest_known_stream_pos:
old_pos = self._entity_to_key.get(entity, None)
if old_pos is not None:
stream_pos = max(stream_pos, old_pos)
self._cache.pop(old_pos, None)
self._cache[stream_pos] = entity
self._entity_to_key[entity] = stream_pos

while len(self._cache) > self._max_size:
k, r = self._cache.popitem(0)
self._earliest_known_stream_pos = max(
k, self._earliest_known_stream_pos
)
self._entity_to_key.pop(r, None)
if stream_pos <= self._earliest_known_stream_pos:
return

old_pos = self._entity_to_key.get(entity, None)
if old_pos is not None:
if old_pos >= stream_pos:
# nothing to do
return
e = self._cache[old_pos]
e.remove(entity)
if not e:
# cache at this point is now empty
del self._cache[old_pos]

e1 = self._cache.get(stream_pos)
if e1 is None:
e1 = self._cache[stream_pos] = set()
e1.add(entity)
self._entity_to_key[entity] = stream_pos

# if the cache is too big, remove entries
while len(self._cache) > self._max_size:
k, r = self._cache.popitem(0)
self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
for entity in r:
del self._entity_to_key[entity]

def get_max_pos_of_last_change(self, entity: EntityType) -> int:
"""Returns an upper bound of the stream id of the last change to an
58 changes: 50 additions & 8 deletions tests/util/test_stream_change_cache.py
Original file line number Diff line number Diff line change
@@ -28,18 +28,26 @@ def test_has_entity_changed(self):
cache.entity_has_changed("[email protected]", 6)
cache.entity_has_changed("[email protected]", 7)

# also test multiple things changing on the same stream ID
cache.entity_has_changed("[email protected]", 8)
cache.entity_has_changed("[email protected]", 8)

# If it's been changed after that stream position, return True
self.assertTrue(cache.has_entity_changed("[email protected]", 4))
self.assertTrue(cache.has_entity_changed("[email protected]", 4))
self.assertTrue(cache.has_entity_changed("[email protected]", 4))
self.assertTrue(cache.has_entity_changed("[email protected]", 4))

# If it's been changed at that stream position, return False
self.assertFalse(cache.has_entity_changed("[email protected]", 6))
self.assertFalse(cache.has_entity_changed("[email protected]", 8))

# If there's no changes after that stream position, return False
self.assertFalse(cache.has_entity_changed("[email protected]", 7))
self.assertFalse(cache.has_entity_changed("[email protected]", 9))

# If the entity does not exist, return False.
self.assertFalse(cache.has_entity_changed("[email protected]", 7))
self.assertFalse(cache.has_entity_changed("[email protected]", 9))

# If we request before the stream cache's earliest known position,
# return True, whether it's a known entity or not.
@@ -89,18 +97,52 @@ def test_get_all_entities_changed(self):

cache.entity_has_changed("[email protected]", 2)
cache.entity_has_changed("[email protected]", 3)
cache.entity_has_changed("[email protected]", 3)
cache.entity_has_changed("[email protected]", 4)

self.assertEqual(
cache.get_all_entities_changed(1),
["[email protected]", "[email protected]", "[email protected]"],
)
self.assertEqual(
cache.get_all_entities_changed(2), ["[email protected]", "[email protected]"]
)
r = cache.get_all_entities_changed(1)

# either of these are valid
ok1 = [
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
]
ok2 = [
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
]
self.assertTrue(r == ok1 or r == ok2)

r = cache.get_all_entities_changed(2)
self.assertTrue(r == ok1[1:] or r == ok2[1:])

self.assertEqual(cache.get_all_entities_changed(3), ["[email protected]"])
self.assertEqual(cache.get_all_entities_changed(0), None)

# ... later, things gest more updates
cache.entity_has_changed("[email protected]", 5)
cache.entity_has_changed("[email protected]", 5)
cache.entity_has_changed("[email protected]", 6)

ok1 = [
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
]
ok2 = [
"[email protected]",
"[email protected]",
"[email protected]",
"[email protected]",
]
r = cache.get_all_entities_changed(3)
self.assertTrue(r == ok1 or r == ok2)

def test_has_any_entity_changed(self):
"""
StreamChangeCache.has_any_entity_changed will return True if any