Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge commit 'be16ee59a' into anoa/dinsic_release_1_21_x
Browse files Browse the repository at this point in the history
* commit 'be16ee59a':
  Add type hints to more handlers (#8244)
  Remove obsolete order field in `send_new_transaction` (#8245)
  Split fetching device keys and signatures into two transactions (#8233)
  • Loading branch information
anoadragon453 committed Oct 20, 2020
2 parents 255860b + be16ee5 commit 412a215
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 151 deletions.
1 change: 1 addition & 0 deletions changelog.d/8233.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor queries for device keys and cross-signatures.
1 change: 1 addition & 0 deletions changelog.d/8244.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add type hints to pagination, initial sync and events handlers.
1 change: 1 addition & 0 deletions changelog.d/8245.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove obsolete `order` field from federation send queues.
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ files =
synapse/handlers/auth.py,
synapse/handlers/cas_handler.py,
synapse/handlers/directory.py,
synapse/handlers/events.py,
synapse/handlers/federation.py,
synapse/handlers/identity.py,
synapse/handlers/initial_sync.py,
synapse/handlers/message.py,
synapse/handlers/oidc_handler.py,
synapse/handlers/pagination.py,
synapse/handlers/presence.py,
synapse/handlers/room.py,
synapse/handlers/room_member.py,
Expand Down
7 changes: 1 addition & 6 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ def __init__(self, hs: "synapse.server.HomeServer"):
),
)

self._order = 1

self._is_processing = False
self._last_poked_id = -1

Expand Down Expand Up @@ -272,9 +270,6 @@ def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.

order = self._order
self._order += 1

destinations = set(destinations)
destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
Expand All @@ -286,7 +281,7 @@ def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
sent_pdus_destination_dist_count.inc()

for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu, order)
self._get_per_destination_queue(destination).send_pdu(pdu)

async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room
Expand Down
17 changes: 8 additions & 9 deletions synapse/federation/sender/per_destination_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ def __init__(
self._destination = destination
self.transmission_loop_running = False

# a list of tuples of (pending pdu, order)
self._pending_pdus = [] # type: List[Tuple[EventBase, int]]
# a list of pending PDUs
self._pending_pdus = [] # type: List[EventBase]

# XXX this is never actually used: see
# https://github.com/matrix-org/synapse/issues/7549
Expand Down Expand Up @@ -132,14 +132,13 @@ def pending_edu_count(self) -> int:
+ len(self._pending_edus_keyed)
)

def send_pdu(self, pdu: EventBase, order: int) -> None:
def send_pdu(self, pdu: EventBase) -> None:
"""Add a PDU to the queue, and start the transmission loop if necessary
Args:
pdu: pdu to send
order
"""
self._pending_pdus.append((pdu, order))
self._pending_pdus.append(pdu)
self.attempt_new_transaction()

def send_presence(self, states: Iterable[UserPresenceState]) -> None:
Expand Down Expand Up @@ -185,7 +184,7 @@ def attempt_new_transaction(self) -> None:
returns immediately. Otherwise kicks off the process of sending a
transaction in the background.
"""
# list of (pending_pdu, deferred, order)

if self.transmission_loop_running:
# XXX: this can get stuck on by a never-ending
# request at which point pending_pdus just keeps growing.
Expand All @@ -210,7 +209,7 @@ def attempt_new_transaction(self) -> None:
)

async def _transaction_transmission_loop(self) -> None:
pending_pdus = [] # type: List[Tuple[EventBase, int]]
pending_pdus = [] # type: List[EventBase]
try:
self.transmission_loop_running = True

Expand Down Expand Up @@ -373,13 +372,13 @@ async def _transaction_transmission_loop(self) -> None:
"TX [%s] Failed to send transaction: %s", self._destination, e
)

for p, _ in pending_pdus:
for p in pending_pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
except Exception:
logger.exception("TX [%s] Failed to send transaction", self._destination)
for p, _ in pending_pdus:
for p in pending_pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
Expand Down
26 changes: 13 additions & 13 deletions synapse/federation/sender/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, List, Tuple
from typing import TYPE_CHECKING, List

from synapse.api.errors import HttpResponseException
from synapse.events import EventBase
Expand Down Expand Up @@ -53,11 +53,17 @@ def __init__(self, hs: "synapse.server.HomeServer"):

@measure_func("_send_new_transaction")
async def send_new_transaction(
self,
destination: str,
pending_pdus: List[Tuple[EventBase, int]],
pending_edus: List[Edu],
):
self, destination: str, pdus: List[EventBase], edus: List[Edu],
) -> bool:
"""
Args:
destination: The destination to send to (e.g. 'example.org')
pdus: In-order list of PDUs to send
edus: List of EDUs to send
Returns:
True iff the transaction was successful
"""

# Make a transaction-sending opentracing span. This span follows on from
# all the edus in that transaction. This needs to be done since there is
Expand All @@ -67,20 +73,14 @@ async def send_new_transaction(
span_contexts = []
keep_destination = whitelisted_homeserver(destination)

for edu in pending_edus:
for edu in edus:
context = edu.get_context()
if context:
span_contexts.append(extract_text_map(json_decoder.decode(context)))
if keep_destination:
edu.strip_context()

with start_active_span_follows_from("send_transaction", span_contexts):

# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
edus = pending_edus

success = True

logger.debug("TX [%s] _attempt_new_transaction", destination)
Expand Down
49 changes: 26 additions & 23 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,30 @@

import logging
import random
from typing import TYPE_CHECKING, Iterable, List, Optional

from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.utils import log_function
from synapse.types import UserID
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, UserID
from synapse.visibility import filter_events_for_client

from ._base import BaseHandler

if TYPE_CHECKING:
from synapse.server import HomeServer


logger = logging.getLogger(__name__)


class EventStreamHandler(BaseHandler):
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super(EventStreamHandler, self).__init__(hs)

# Count of active streams per user
self._streams_per_user = {}
# Grace timers per user to delay the "stopped" signal
self._stop_timer_per_user = {}

self.distributor = hs.get_distributor()
self.distributor.declare("started_user_eventstream")
self.distributor.declare("stopped_user_eventstream")
Expand All @@ -52,14 +53,14 @@ def __init__(self, hs):
@log_function
async def get_stream(
self,
auth_user_id,
pagin_config,
timeout=0,
as_client_event=True,
affect_presence=True,
room_id=None,
is_guest=False,
):
auth_user_id: str,
pagin_config: PaginationConfig,
timeout: int = 0,
as_client_event: bool = True,
affect_presence: bool = True,
room_id: Optional[str] = None,
is_guest: bool = False,
) -> JsonDict:
"""Fetches the events stream for a given user.
"""

Expand Down Expand Up @@ -98,7 +99,7 @@ async def get_stream(

# When the user joins a new room, or another user joins a currently
# joined room, we need to send down presence for those users.
to_add = []
to_add = [] # type: List[JsonDict]
for event in events:
if not isinstance(event, EventBase):
continue
Expand All @@ -110,7 +111,7 @@ async def get_stream(
# Send down presence for everyone in the room.
users = await self.state.get_current_users_in_room(
event.room_id
)
) # type: Iterable[str]
else:
users = [event.state_key]

Expand Down Expand Up @@ -144,20 +145,22 @@ async def get_stream(


class EventHandler(BaseHandler):
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super(EventHandler, self).__init__(hs)
self.storage = hs.get_storage()

async def get_event(self, user, room_id, event_id):
async def get_event(
self, user: UserID, room_id: Optional[str], event_id: str
) -> Optional[EventBase]:
"""Retrieve a single specified event.
Args:
user (synapse.types.UserID): The user requesting the event
room_id (str|None): The expected room id. We'll return None if the
user: The user requesting the event
room_id: The expected room id. We'll return None if the
event's room does not match.
event_id (str): The event ID to obtain.
event_id: The event ID to obtain.
Returns:
dict: An event, or None if there is no event matching this ID.
An event, or None if there is no event matching this ID.
Raises:
SynapseError if there was a problem retrieving this event, or
AuthError if the user does not have the rights to inspect this
Expand Down
Loading

0 comments on commit 412a215

Please sign in to comment.