From 37b5fa0372a986a5cd784becc73d4dcc4c0dfeea Mon Sep 17 00:00:00 2001 From: Janne Mareike Koschinski Date: Wed, 12 Oct 2022 13:17:52 +0200 Subject: [PATCH] poc --- src/client.ts | 269 +++++++++++++++++++++++++------ src/models/event-timeline-set.ts | 6 + src/models/room.ts | 40 ++++- src/models/thread.ts | 200 ++++++++++------------- src/timeline-window.ts | 13 +- 5 files changed, 353 insertions(+), 175 deletions(-) diff --git a/src/client.ts b/src/client.ts index 923ff80d640..477dba1cd2e 100644 --- a/src/client.ts +++ b/src/client.ts @@ -5355,6 +5355,12 @@ export class MatrixClient extends TypedEventEmitter { + if (!Thread.hasServerSideSupport) { + throw new Error("could not get thread timeline: no serverside support"); + } + + if (!this.supportsExperimentalThreads()) { + throw new Error("could not get thread timeline: no client support"); + } + + const path = utils.encodeUri( + "/rooms/$roomId/context/$eventId", { + $roomId: timelineSet.room.roomId, + $eventId: eventId, + }, + ); + + const params: Record = { + limit: "0", + }; + if (this.clientOpts.lazyLoadMembers) { + params.filter = JSON.stringify(Filter.LAZY_LOADING_MESSAGES_FILTER); + } + + // TODO: we should implement a backoff (as per scrollback()) to deal more nicely with HTTP errors. + const res = await this.http.authedRequest(undefined, Method.Get, path, params); + const mapper = this.getEventMapper(); + const event = mapper(res.event); + + if (!timelineSet.canContain(event)) { + return undefined; + } + + if (Thread.hasServerSideFwdPaginationSupport) { + if (!timelineSet.thread) { + throw new Error("could not get thread timeline: not a thread timeline"); + } + + const thread = timelineSet.thread; + const resOlder = await this.fetchRelations( + timelineSet.room.roomId, + thread.id, + THREAD_RELATION_TYPE.name, + null, + { dir: Direction.Backward, from: res.start }, + ); + const resNewer = await this.fetchRelations( + timelineSet.room.roomId, + thread.id, + THREAD_RELATION_TYPE.name, + null, + { dir: Direction.Forward, from: res.end }, + ); + const events = [ + // Order events from most recent to oldest (reverse-chronological). + // We start with the last event, since that's the point at which we have known state. + // events_after is already backwards; events_before is forwards. + ...resNewer.chunk.reverse().map(mapper), + event, + ...resOlder.chunk.map(mapper), + ]; + await timelineSet.thread?.fetchEditsWhereNeeded(...events); + + // Here we handle non-thread timelines only, but still process any thread events to populate thread summaries. + let timeline = timelineSet.getTimelineForEvent(event.getId()); + if (timeline) { + timeline.getState(EventTimeline.BACKWARDS).setUnknownStateEvents(res.state.map(mapper)); + } else { + timeline = timelineSet.addTimeline(); + timeline.initialiseState(res.state.map(mapper)); + } + + timelineSet.addEventsToTimeline(events, true, timeline, resNewer.next_batch); + if (!resOlder.next_batch) { + timelineSet.addEventsToTimeline([mapper(resOlder.original_event)], true, timeline, null); + } + timeline.setPaginationToken(resOlder.next_batch ?? null, Direction.Backward); + timeline.setPaginationToken(resNewer.next_batch ?? null, Direction.Forward); + this.processBeaconEvents(timelineSet.room, events); + + // There is no guarantee that the event ended up in "timeline" (we might have switched to a neighbouring + // timeline) - so check the room's index again. On the other hand, there's no guarantee the event ended up + // anywhere, if it was later redacted, so we just return the timeline we first thought of. + return timelineSet.getTimelineForEvent(eventId) + ?? timeline; + } else { + // Where the event is a thread reply (not a root) and running in MSC-enabled mode the Thread timeline only + // functions contiguously, so we have to jump through some hoops to get our target event in it. + // XXX: workaround for https://github.com/vector-im/element-meta/issues/150 + + const thread = timelineSet.thread; + + const resOlder = await this.fetchRelations( + timelineSet.room.roomId, + thread.id, + THREAD_RELATION_TYPE.name, + null, + { dir: Direction.Backward, from: res.start }, + ); + let eventsNewer = []; + let nextBatch: Optional = res.end; + while (nextBatch) { + const resNewer = await this.fetchRelations( + timelineSet.room.roomId, + thread.id, + THREAD_RELATION_TYPE.name, + null, + { dir: Direction.Forward, from: nextBatch }, + ); + nextBatch = resNewer.next_batch ?? null; + eventsNewer.push(...resNewer.chunk); + } + const events = [ + // Order events from most recent to oldest (reverse-chronological). + // We start with the last event, since that's the point at which we have known state. + // events_after is already backwards; events_before is forwards. + ...eventsNewer.reverse().map(mapper), + event, + ...resOlder.chunk.map(mapper), + ]; + + await timelineSet.thread?.fetchEditsWhereNeeded(...events); + + // Here we handle non-thread timelines only, but still process any thread events to populate thread summaries. + let timeline = timelineSet.getLiveTimeline(); + timeline.getState(EventTimeline.BACKWARDS).setUnknownStateEvents(res.state.map(mapper)); + + timelineSet.addEventsToTimeline(events, true, timeline, null); + if (!resOlder.next_batch) { + timelineSet.addEventsToTimeline([mapper(resOlder.original_event)], true, timeline, null); + } + timeline.setPaginationToken(resOlder.next_batch ?? null, Direction.Backward); + timeline.setPaginationToken(null, Direction.Forward); + this.processBeaconEvents(timelineSet.room, events); + + return timeline; + } + } + /** * Get an EventTimeline for the latest events in the room. This will just * call `/messages` to get the latest message in the room, then use @@ -5466,28 +5578,44 @@ export class MatrixClient extends TypedEventEmitter = { + dir: 'b', + }; + if (this.clientOpts.lazyLoadMembers) { + params.filter = JSON.stringify(Filter.LAZY_LOADING_MESSAGES_FILTER); + } + + const res = await this.http.authedRequest(undefined, Method.Get, messagesPath, params); + event = res.chunk?.[0]; } - const event = res.chunk?.[0]; if (!event) { - throw new Error("No message returned from /messages when trying to construct getLatestTimeline"); + throw new Error("No message returned when trying to construct getLatestTimeline"); } return this.getEventTimeline(timelineSet, event.event_id); @@ -5625,7 +5753,8 @@ export class MatrixClient extends TypedEventEmitter { const isNotifTimeline = (eventTimeline.getTimelineSet() === this.notifTimelineSet); const room = this.getRoom(eventTimeline.getRoomId()); - const isThreadTimeline = eventTimeline.getTimelineSet().isThreadTimeline; + const isThreadListTimeline = eventTimeline.getTimelineSet().isThreadTimeline; + const isThreadTimeline = (eventTimeline.getTimelineSet().thread); // TODO: we should implement a backoff (as per scrollback()) to deal more // nicely with HTTP errors. @@ -5696,11 +5825,15 @@ export class MatrixClient extends TypedEventEmitter { + const mapper = this.getEventMapper(); + const matrixEvents = res.chunk.map(mapper); + eventTimeline.getTimelineSet().thread?.fetchEditsWhereNeeded(...matrixEvents); + + const newToken = res.next_batch; + + const timelineSet = eventTimeline.getTimelineSet(); + timelineSet.addEventsToTimeline(matrixEvents, backwards, eventTimeline, newToken ?? null); + if (!newToken && backwards) { + timelineSet.addEventsToTimeline([mapper(res.original_event)], true, eventTimeline, null); + } + this.processBeaconEvents(timelineSet.room, matrixEvents); + + // if we've hit the end of the timeline, we need to stop trying to + // paginate. We need to keep the 'forwards' token though, to make sure + // we can recover from gappy syncs. + if (backwards && !newToken) { + eventTimeline.setPaginationToken(null, dir); + } + return Boolean(newToken); + }).finally(() => { + eventTimeline.paginationRequests[dir] = null; + }); + eventTimeline.paginationRequests[dir] = promise; } else { if (!room) { throw new Error("Unknown room " + eventTimeline.getRoomId()); @@ -5756,7 +5926,12 @@ export class MatrixClient extends TypedEventEmitter it.isRelation(THREAD_RELATION_TYPE.name)), + false); const atEnd = res.end === undefined || res.end === res.start; diff --git a/src/models/event-timeline-set.ts b/src/models/event-timeline-set.ts index 55bd5615153..9674f1b7ebf 100644 --- a/src/models/event-timeline-set.ts +++ b/src/models/event-timeline-set.ts @@ -374,6 +374,12 @@ export class EventTimelineSet extends TypedEventEmitter { /** * @experimental */ - private threads = new Map(); + public threads = new Map(); public lastThread: Thread; /** @@ -1724,7 +1724,7 @@ export class Room extends ReadReceipt { let latestMyThreadsRootEvent: MatrixEvent; const roomState = this.getLiveTimeline().getState(EventTimeline.FORWARDS); for (const rootEvent of threadRoots) { - this.threadsTimelineSets[0].addLiveEvent(rootEvent, { + this.threadsTimelineSets[0]?.addLiveEvent(rootEvent, { duplicateStrategy: DuplicateStrategy.Ignore, fromCache: false, roomState, @@ -1733,7 +1733,7 @@ export class Room extends ReadReceipt { const threadRelationship = rootEvent .getServerAggregatedRelation(THREAD_RELATION_TYPE.name); if (threadRelationship.current_user_participated) { - this.threadsTimelineSets[1].addLiveEvent(rootEvent, { + this.threadsTimelineSets[1]?.addLiveEvent(rootEvent, { duplicateStrategy: DuplicateStrategy.Ignore, fromCache: false, roomState, @@ -1790,13 +1790,19 @@ export class Room extends ReadReceipt { private onThreadNewReply(thread: Thread): void { const roomState = this.getLiveTimeline().getState(EventTimeline.FORWARDS); - for (const timelineSet of this.threadsTimelineSets) { - timelineSet.removeEvent(thread.id); - timelineSet.addLiveEvent(thread.rootEvent, { + if (thread.length) { + this.threadsTimelineSets?.[0]?.addLiveEvent(thread.rootEvent, { duplicateStrategy: DuplicateStrategy.Replace, fromCache: false, roomState, }); + if (thread.hasCurrentUserParticipated) { + this.threadsTimelineSets?.[1]?.addLiveEvent(thread.rootEvent, { + duplicateStrategy: DuplicateStrategy.Replace, + fromCache: false, + roomState, + }); + } } } @@ -1924,7 +1930,6 @@ export class Room extends ReadReceipt { } const thread = new Thread(threadId, rootEvent, { - initialEvents: events, room: this, client: this.client, }); @@ -1946,7 +1951,11 @@ export class Room extends ReadReceipt { this.threadsTimelineSets.forEach(timelineSet => { if (thread.rootEvent) { if (Thread.hasServerSideSupport) { - timelineSet.addLiveEvent(thread.rootEvent); + timelineSet.addLiveEvent(thread.rootEvent, { + duplicateStrategy: DuplicateStrategy.Replace, + fromCache: false, + roomState: this.currentState, + }); } else { timelineSet.addEventToTimeline( thread.rootEvent, @@ -1960,6 +1969,21 @@ export class Room extends ReadReceipt { this.emit(ThreadEvent.New, thread, toStartOfTimeline); + if (thread.length) { + this.threadsTimelineSets?.[0]?.addLiveEvent(thread.rootEvent, { + duplicateStrategy: DuplicateStrategy.Replace, + fromCache: false, + roomState: this.currentState, + }); + if (thread.hasCurrentUserParticipated) { + this.threadsTimelineSets?.[1]?.addLiveEvent(thread.rootEvent, { + duplicateStrategy: DuplicateStrategy.Replace, + fromCache: false, + roomState: this.currentState, + }); + } + } + return thread; } diff --git a/src/models/thread.ts b/src/models/thread.ts index e0840eb65da..611623d03e5 100644 --- a/src/models/thread.ts +++ b/src/models/thread.ts @@ -16,11 +16,10 @@ limitations under the License. import { Optional } from "matrix-events-sdk"; -import { MatrixClient, MatrixEventEvent, RelationType, RoomEvent } from "../matrix"; +import { DuplicateStrategy, MatrixClient, MatrixEventEvent, RelationType, RoomEvent } from "../matrix"; import { TypedReEmitter } from "../ReEmitter"; -import { IRelationsRequestOpts } from "../@types/requests"; import { IThreadBundledRelationship, MatrixEvent } from "./event"; -import { Direction, EventTimeline } from "./event-timeline"; +import { EventTimeline } from "./event-timeline"; import { EventTimelineSet, EventTimelineSetHandlerMap } from './event-timeline-set'; import { Room } from './room'; import { RoomState } from "./room-state"; @@ -46,7 +45,6 @@ export type EventHandlerMap = { } & EventTimelineSetHandlerMap; interface IThreadOpts { - initialEvents?: MatrixEvent[]; room: Room; client: MatrixClient; } @@ -90,8 +88,6 @@ export class Thread extends ReadReceipt { public readonly room: Room; public readonly client: MatrixClient; - public initialEventsFetched = !Thread.hasServerSideSupport; - constructor( public readonly id: string, public rootEvent: MatrixEvent | undefined, @@ -123,14 +119,9 @@ export class Thread extends ReadReceipt { this.room.on(RoomEvent.LocalEchoUpdated, this.onEcho); this.timelineSet.on(RoomEvent.Timeline, this.onEcho); - if (opts.initialEvents) { - this.addEvents(opts.initialEvents, false); - } // even if this thread is thought to be originating from this client, we initialise it as we may be in a // gappy sync and a thread around this event may already exist. this.initialiseThread(); - - this.rootEvent?.setThread(this); } private async fetchRootEvent(): Promise { @@ -147,7 +138,8 @@ export class Thread extends ReadReceipt { // The root event might be not be visible to the person requesting it. // If it wasn't fetched successfully the thread will work in "limited" mode and won't // benefit from all the APIs a homeserver can provide to enhance the thread experience - this.rootEvent?.setThread(this); + await this.fetchEditsWhereNeeded(this.rootEvent); + this.setEventMetadata(this.rootEvent); this.emit(ThreadEvent.Update, this); } @@ -181,47 +173,27 @@ export class Thread extends ReadReceipt { event.getId() !== this.id && // the root event isn't counted in the length so ignore this redaction !redaction.status // only respect it when it succeeds ) { - this.replyCount--; + this.replyCount = Math.max(0, this.replyCount - 1); this.emit(ThreadEvent.Update, this); } }; - private onRedaction = (event: MatrixEvent) => { - if (event.threadRootId !== this.id) return; // ignore redactions for other timelines - const events = [...this.timelineSet.getLiveTimeline().getEvents()].reverse(); - this.lastEvent = events.find(e => ( - !e.isRedacted() && - e.isRelation(THREAD_RELATION_TYPE.name) - )) ?? this.rootEvent; + private onRedaction = async (event: MatrixEvent) => { + if (event.threadRootId !== this.id) return; // ignore echoes for other timelines + if (this.lastEvent === event) return; + if (!event.isRelation(THREAD_RELATION_TYPE.name)) return; + + await this.initialiseThread(); this.emit(ThreadEvent.Update, this); }; - private onEcho = (event: MatrixEvent) => { + private onEcho = async (event: MatrixEvent) => { if (event.threadRootId !== this.id) return; // ignore echoes for other timelines if (this.lastEvent === event) return; if (!event.isRelation(THREAD_RELATION_TYPE.name)) return; - // There is a risk that the `localTimestamp` approximation will not be accurate - // when threads are used over federation. That could result in the reply - // count value drifting away from the value returned by the server - const isThreadReply = event.isRelation(THREAD_RELATION_TYPE.name); - if (!this.lastEvent || this.lastEvent.isRedacted() || (isThreadReply - && (event.getId() !== this.lastEvent.getId()) - && (event.localTimestamp > this.lastEvent.localTimestamp)) - ) { - this.lastEvent = event; - if (this.lastEvent.getId() !== this.id) { - // This counting only works when server side support is enabled as we started the counting - // from the value returned within the bundled relationship - if (Thread.hasServerSideSupport) { - this.replyCount++; - } - - this.emit(ThreadEvent.NewReply, this, event); - } - } - - this.emit(ThreadEvent.Update, this); + await this.initialiseThread(); + this.emit(ThreadEvent.NewReply, this, event); }; public get roomState(): RoomState { @@ -244,7 +216,7 @@ export class Thread extends ReadReceipt { public addEvents(events: MatrixEvent[], toStartOfTimeline: boolean): void { events.forEach(ev => this.addEvent(ev, toStartOfTimeline, false)); - this.emit(ThreadEvent.Update, this); + this.initialiseThread(); } /** @@ -273,7 +245,6 @@ export class Thread extends ReadReceipt { this.client.decryptEventIfNeeded(event, {}); } else if (!toStartOfTimeline && - this.initialEventsFetched && event.localTimestamp > this.lastReply()?.localTimestamp ) { this.fetchEditsWhereNeeded(event); @@ -292,7 +263,8 @@ export class Thread extends ReadReceipt { } if (emit) { - this.emit(ThreadEvent.Update, this); + this.emit(ThreadEvent.NewReply, this, event); + this.initialiseThread(); } } @@ -300,33 +272,77 @@ export class Thread extends ReadReceipt { return rootEvent?.getServerAggregatedRelation(THREAD_RELATION_TYPE.name); } - private async initialiseThread(): Promise { - let bundledRelationship = this.getRootEventBundledRelationship(); - if (Thread.hasServerSideSupport && !bundledRelationship) { - await this.fetchRootEvent(); - bundledRelationship = this.getRootEventBundledRelationship(); + private updateRootEvent() { + const timeline = this.room.getTimelineForEvent(this.id); + const roomEvent = timeline?.getEvents()?.find(it => it.getId() === this.id); + if (!roomEvent) { + logger.error("updateRootEvent: Could not find root event in room timeline"); } + if (this.length) { + this.room.threadsTimelineSets?.[0]?.addLiveEvent(this.rootEvent, { + duplicateStrategy: DuplicateStrategy.Replace, + fromCache: false, + roomState: this.roomState, + }); + if (this.hasCurrentUserParticipated) { + this.room.threadsTimelineSets?.[1]?.addLiveEvent(this.rootEvent, { + duplicateStrategy: DuplicateStrategy.Replace, + fromCache: false, + roomState: this.roomState, + }); + } + + if (roomEvent) { + roomEvent.event = this.rootEvent.event; + this.setEventMetadata(roomEvent); + } + } else { + this.room.threadsTimelineSets?.[0]?.removeEvent(this.id); + this.room.threadsTimelineSets?.[1]?.removeEvent(this.id); + + if (roomEvent) { + roomEvent.event = this.rootEvent.event; + this.clearEventMetadata(roomEvent); + } + + for (const threadEvent of this.events) { + this.clearEventMetadata(threadEvent); + } + + // TODO: deleting a thread should move the redacted events back to the correct positions in the timeline + for (const timelineSet of this.room.threadsTimelineSets) { + timelineSet.removeEvent(this.id); + } + + this.room.threads.delete(this.id); + } + + this.room.emit(RoomEvent.TimelineRefresh, this.room, timeline?.getTimelineSet()); + this.room.emit(RoomEvent.TimelineRefresh, this.room, this.timelineSet); + } + + private async initialiseThread(): Promise { + await this.fetchRootEvent(); + const bundledRelationship = this.getRootEventBundledRelationship(); + if (Thread.hasServerSideSupport && bundledRelationship) { this.replyCount = bundledRelationship.count; this._currentUserParticipated = bundledRelationship.current_user_participated; - const event = new MatrixEvent({ - room_id: this.rootEvent.getRoomId(), - ...bundledRelationship.latest_event, - }); - this.setEventMetadata(event); - event.setThread(this); - this.lastEvent = event; + const mapper = this.client.getEventMapper(); - this.fetchEditsWhereNeeded(event); - } + this.lastEvent = mapper(bundledRelationship.latest_event); + await this.fetchEditsWhereNeeded(this.lastEvent); + this.setEventMetadata(this.lastEvent); + this.updateRootEvent(); + } this.emit(ThreadEvent.Update, this); } // XXX: Workaround for https://github.com/matrix-org/matrix-spec-proposals/pull/2676/files#r827240084 - private async fetchEditsWhereNeeded(...events: MatrixEvent[]): Promise { + public async fetchEditsWhereNeeded(...events: MatrixEvent[]): Promise { return Promise.all(events.filter(e => e.isEncrypted()).map((event: MatrixEvent) => { if (event.isRelation()) return; // skip - relations don't get edits return this.client.relations(this.roomId, event.getId(), RelationType.Replace, event.getType(), { @@ -341,17 +357,16 @@ export class Thread extends ReadReceipt { })); } - public async fetchInitialEvents(): Promise { - if (this.initialEventsFetched) return; - await this.fetchEvents(); - this.initialEventsFetched = true; - } - - private setEventMetadata(event: MatrixEvent): void { + public setEventMetadata(event: MatrixEvent): void { EventTimeline.setEventMetadata(event, this.roomState, false); event.setThread(this); } + private clearEventMetadata(event: MatrixEvent): void { + event.setThread(null); + delete event.event?.unsigned?.["m.relations"]?.[THREAD_RELATION_TYPE.name]; + } + /** * Finds an event by ID in the current thread */ @@ -413,57 +428,6 @@ export class Thread extends ReadReceipt { return this.timelineSet.getLiveTimeline(); } - public async fetchEvents( - opts: IRelationsRequestOpts = { limit: 20, dir: Direction.Backward }, - ): Promise<{ - originalEvent: MatrixEvent; - events: MatrixEvent[]; - nextBatch?: string | null; - prevBatch?: string; - }> { - let { - originalEvent, - events, - prevBatch, - nextBatch, - } = await this.client.relations( - this.room.roomId, - this.id, - THREAD_RELATION_TYPE.name, - null, - opts, - ); - - // When there's no nextBatch returned with a `from` request we have reached - // the end of the thread, and therefore want to return an empty one - if (!opts.to && !nextBatch) { - events = [...events, originalEvent]; - } - - await this.fetchEditsWhereNeeded(...events); - - await Promise.all(events.map(event => { - this.setEventMetadata(event); - return this.client.decryptEventIfNeeded(event); - })); - - const prependEvents = (opts.dir ?? Direction.Backward) === Direction.Backward; - - this.timelineSet.addEventsToTimeline( - events, - prependEvents, - this.liveTimeline, - prependEvents ? nextBatch : prevBatch, - ); - - return { - originalEvent, - events, - prevBatch, - nextBatch, - }; - } - public getUnfilteredTimelineSet(): EventTimelineSet { return this.timelineSet; } diff --git a/src/timeline-window.ts b/src/timeline-window.ts index 538eaa1a534..27c1072f940 100644 --- a/src/timeline-window.ts +++ b/src/timeline-window.ts @@ -21,6 +21,7 @@ import { logger } from './logger'; import { MatrixClient } from "./client"; import { EventTimelineSet } from "./models/event-timeline-set"; import { MatrixEvent } from "./models/event"; +import { ThreadEvent } from "./models/thread"; /** * @private @@ -123,8 +124,17 @@ export class TimelineWindow { this.start = new TimelineIndex(timeline, startIndex - timeline.getBaseIndex()); this.end = new TimelineIndex(timeline, endIndex - timeline.getBaseIndex()); this.eventCount = endIndex - startIndex; + + this.timelineSet.thread?.emit(ThreadEvent.Update, this.timelineSet.thread); }; + if (this.timelineSet.thread && !this.timelineSet.getLiveTimeline().getEvents().length) { + await this.client.paginateEventTimeline(this.timelineSet.getLiveTimeline(), { + backwards: true, + limit: 20, + }); + } + // We avoid delaying the resolution of the promise by a reactor tick if we already have the data we need, // which is important to keep room-switching feeling snappy. const timeline = initialEventId @@ -295,8 +305,7 @@ export class TimelineWindow { }).then((r) => { debuglog("TimelineWindow: request completed with result " + r); if (!r) { - // end of timeline - return false; + return this.paginate(direction, size, false, 0); } // recurse to advance the index into the results.