-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
make notification of signatures work with workers #6254
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks generally ok, though I think it should probably be a separate stream. I don't know if a separate stream id is required though. @erikjohnston do you have any thoughts on how this stuff is meant to work?
Yeah, you probably want it to be a separate stream, but can share stream ID generator. ( Adding a stream is a matter of creating a new stream, similar to https://github.com/matrix-org/synapse/blob/master/synapse/replication/tcp/streams/federation.py, and adding it to the stream map https://github.com/matrix-org/synapse/blob/master/synapse/replication/tcp/streams/__init__.py. Then in the worker stores that use the stream add code to |
result["device_lists"] = self._device_list_id_gen.get_current_token() | ||
result["user_signature"] = result[ | ||
"device_lists" | ||
] = self._device_list_id_gen.get_current_token() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is kind of ugly, but it's what black came up with. 🤷♂️
Alternatively, I could set result["user_signature"] and result["device_lists"] as separate statements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, I could set result["user_signature"] and result["device_lists"] as separate statements.
this, please. use a local temp var for the token and copy it to both fields. and add a comment to say they share a stream id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a couple of nits
result["device_lists"] = self._device_list_id_gen.get_current_token() | ||
result["user_signature"] = result[ | ||
"device_lists" | ||
] = self._device_list_id_gen.get_current_token() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, I could set result["user_signature"] and result["device_lists"] as separate statements.
this, please. use a local temp var for the token and copy it to both fields. and add a comment to say they share a stream id.
@@ -42,14 +42,19 @@ def __init__(self, db_conn, hs): | |||
|
|||
def stream_positions(self): | |||
result = super(SlavedDeviceStore, self).stream_positions() | |||
result["device_lists"] = self._device_list_id_gen.get_current_token() | |||
result["user_signature"] = result[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know the existing code isn't good for this, but I'd rather we used the symbolic constants (UserSignatureStream.NAME
in this case) for the stream names, which makes it much easier to find the places the streams are used.
return result | ||
|
||
def process_replication_rows(self, stream_name, token, rows): | ||
if stream_name == "device_lists": | ||
self._device_list_id_gen.advance(token) | ||
for row in rows: | ||
self._invalidate_caches_for_devices(token, row.user_id, row.destination) | ||
elif stream_name == "user_signature": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elif stream_name == "user_signature": | |
elif stream_name == UserSignatureStream.NAME: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
…kers_notify * commit '3b4216f96': clean up code a bit make user signatures a separate stream add changelog make notification of signatures work with workers
The UNION query is kind of ugly, but shows one way of fixing the issue. Basically, the function needs to return the rows from the user signature stream in addition to the device lists stream.
Alternatively, I could use a new stream ID generator instead of using the device stream ID generator, which would involve more code. Though I suspect that it would be the better option.