From 093d7cca79f6b8885c5d8b8ff8524671f6c3bdea Mon Sep 17 00:00:00 2001 From: Janne Mareike Koschinski Date: Fri, 21 Oct 2022 14:46:37 +0200 Subject: [PATCH] implement new thread timeline loading --- src/client.ts | 77 +++++++++++------ src/models/event-timeline-set.ts | 4 +- src/models/room.ts | 80 +++++++++++++----- src/models/thread.ts | 139 +++++++++++++++++-------------- 4 files changed, 187 insertions(+), 113 deletions(-) diff --git a/src/client.ts b/src/client.ts index a9bd3f01b2d..02553dbb3c1 100644 --- a/src/client.ts +++ b/src/client.ts @@ -189,7 +189,14 @@ import { TypedEventEmitter } from "./models/typed-event-emitter"; import { ReceiptType } from "./@types/read_receipts"; import { MSC3575SlidingSyncRequest, MSC3575SlidingSyncResponse, SlidingSync } from "./sliding-sync"; import { SlidingSyncSdk } from "./sliding-sync-sdk"; -import { FeatureSupport, Thread, THREAD_RELATION_TYPE, determineFeatureSupport } from "./models/thread"; +import { + FeatureSupport, + Thread, + THREAD_RELATION_TYPE, + determineFeatureSupport, + ThreadFilterType, + threadFilterTypeToFilter, +} from "./models/thread"; import { MBeaconInfoEventContent, M_BEACON_INFO } from "./@types/beacon"; import { UnstableValue } from "./NamespacedValue"; import { ToDeviceMessageQueue } from "./ToDeviceMessageQueue"; @@ -5160,7 +5167,7 @@ export class MatrixClient extends TypedEventEmitter = res.end; while (nextBatch) { - const resNewer = await this.fetchRelations( + const resNewer: IRelationsResponse = await this.fetchRelations( timelineSet.room.roomId, thread.id, THREAD_RELATION_TYPE.name, @@ -5338,8 +5355,9 @@ export class MatrixClient extends TypedEventEmitter { const path = utils.encodeUri("/rooms/$roomId/threads", { $roomId: roomId }); @@ -5497,7 +5517,7 @@ export class MatrixClient extends TypedEventEmitter = { limit: limit.toString(), dir: dir, - include: 'all', + include: threadFilterTypeToFilter(threadListType), }; if (fromToken) { @@ -5509,7 +5529,6 @@ export class MatrixClient extends TypedEventEmitter(Method.Get, path, params, undefined, opts) .then(res => ({ @@ -5555,8 +5575,8 @@ export class MatrixClient extends TypedEventEmitter { const isNotifTimeline = (eventTimeline.getTimelineSet() === this.notifTimelineSet); const room = this.getRoom(eventTimeline.getRoomId()!); - const isThreadListTimeline = eventTimeline.getTimelineSet().isThreadTimeline; - const isThreadTimeline = (eventTimeline.getTimelineSet().thread); + const threadListType = eventTimeline.getTimelineSet().threadListType; + const thread = eventTimeline.getTimelineSet().thread; // TODO: we should implement a backoff (as per scrollback()) to deal more // nicely with HTTP errors. @@ -5627,7 +5647,7 @@ export class MatrixClient extends TypedEventEmitter { if (res.state) { @@ -5667,7 +5688,7 @@ export class MatrixClient extends TypedEventEmitter { + { dir, limit: opts.limit, from: token ?? undefined }, + ).then(async (res) => { const mapper = this.getEventMapper(); const matrixEvents = res.chunk.map(mapper); - eventTimeline.getTimelineSet().thread?.fetchEditsWhereNeeded(...matrixEvents); + for (const event of matrixEvents) { + await eventTimeline.getTimelineSet()?.thread?.processEvent(event); + } const newToken = res.next_batch; @@ -5725,10 +5748,12 @@ export class MatrixClient extends TypedEventEmitter it.isRelation(THREAD_RELATION_TYPE.name)), + false); const atEnd = res.end === undefined || res.end === res.start; diff --git a/src/models/event-timeline-set.ts b/src/models/event-timeline-set.ts index db2e17c29e9..f56c38376db 100644 --- a/src/models/event-timeline-set.ts +++ b/src/models/event-timeline-set.ts @@ -27,7 +27,7 @@ import { RoomState } from "./room-state"; import { TypedEventEmitter } from "./typed-event-emitter"; import { RelationsContainer } from "./relations-container"; import { MatrixClient } from "../client"; -import { Thread } from "./thread"; +import { Thread, ThreadFilterType } from "./thread"; const DEBUG = true; @@ -140,7 +140,7 @@ export class EventTimelineSet extends TypedEventEmitter void; } & Pick< ThreadHandlerMap, - ThreadEvent.Update | ThreadEvent.NewReply + ThreadEvent.Update | ThreadEvent.NewReply | ThreadEvent.Delete > & EventTimelineSetHandlerMap & Pick @@ -1661,8 +1662,12 @@ export class Room extends ReadReceipt { private async createThreadTimelineSet(filterType?: ThreadFilterType): Promise { let timelineSet: EventTimelineSet; if (Thread.hasServerSideListSupport) { + let threadListType: ThreadFilterType | null = null; + if (Thread.hasServerSideListSupport) { + threadListType = filterType ?? ThreadFilterType.All; + } timelineSet = - new EventTimelineSet(this, this.opts, undefined, undefined, Boolean(Thread.hasServerSideListSupport)); + new EventTimelineSet(this, this.opts, undefined, undefined, threadListType); this.reEmitter.reEmit(timelineSet, [ RoomEvent.Timeline, RoomEvent.TimelineReset, @@ -1796,6 +1801,7 @@ export class Room extends ReadReceipt { } this.on(ThreadEvent.NewReply, this.onThreadNewReply); + this.on(ThreadEvent.Delete, this.onThreadDelete); this.threadsReady = true; } @@ -1814,6 +1820,7 @@ export class Room extends ReadReceipt { null, undefined, Direction.Backward, + timelineSet.threadListType, timelineSet.getFilter(), ); @@ -1834,8 +1841,21 @@ export class Room extends ReadReceipt { } private onThreadNewReply(thread: Thread): void { - if (thread.length && thread.rootEvent) { - this.updateThreadRootEvents(thread, false); + this.updateThreadRootEvents(thread, false); + } + + private onThreadDelete(thread: Thread): void { + this.threads.delete(thread.id); + + const timeline = this.getTimelineForEvent(thread.id); + const roomEvent = timeline?.getEvents()?.find(it => it.getId() === thread.id); + if (roomEvent) { + thread.clearEventMetadata(roomEvent); + } else { + logger.debug("onThreadDelete: Could not find root event in room timeline"); + } + for (const timelineSet of this.threadsTimelineSets) { + timelineSet.removeEvent(thread.id); } } @@ -1948,9 +1968,11 @@ export class Room extends ReadReceipt { } private updateThreadRootEvents = (thread: Thread, toStartOfTimeline: boolean) => { - this.updateThreadRootEvent(this.threadsTimelineSets?.[0], thread, toStartOfTimeline); - if (thread.hasCurrentUserParticipated) { - this.updateThreadRootEvent(this.threadsTimelineSets?.[1], thread, toStartOfTimeline); + if (thread.length) { + this.updateThreadRootEvent(this.threadsTimelineSets?.[0], thread, toStartOfTimeline); + if (thread.hasCurrentUserParticipated) { + this.updateThreadRootEvent(this.threadsTimelineSets?.[1], thread, toStartOfTimeline); + } } }; @@ -1959,18 +1981,20 @@ export class Room extends ReadReceipt { thread: Thread, toStartOfTimeline: boolean, ) => { - if (Thread.hasServerSideSupport) { - timelineSet.addLiveEvent(thread.rootEvent, { - duplicateStrategy: DuplicateStrategy.Replace, - fromCache: false, - roomState: this.currentState, - }); - } else { - timelineSet.addEventToTimeline( - thread.rootEvent, - timelineSet.getLiveTimeline(), - { toStartOfTimeline }, - ); + if (timelineSet && thread.rootEvent) { + if (Thread.hasServerSideSupport) { + timelineSet.addLiveEvent(thread.rootEvent, { + duplicateStrategy: DuplicateStrategy.Replace, + fromCache: false, + roomState: this.currentState, + }); + } else { + timelineSet.addEventToTimeline( + thread.rootEvent, + timelineSet.getLiveTimeline(), + { toStartOfTimeline }, + ); + } } }; @@ -1994,23 +2018,33 @@ export class Room extends ReadReceipt { client: this.client, }); + // This is necessary to be able to jump to events in threads: + // If we jump to an event in a thread where neither the event, nor the root, + // nor any thread event are loaded yet, we'll load the event as well as the thread root, create the thread, + // and pass the event through this. + for (const event of events) { + thread.setEventMetadata(event); + } + // If we managed to create a thread and figure out its `id` then we can use it this.threads.set(thread.id, thread); this.reEmitter.reEmit(thread, [ + ThreadEvent.Delete, ThreadEvent.Update, ThreadEvent.NewReply, RoomEvent.Timeline, RoomEvent.TimelineReset, ]); + const isNewer = this.lastThread?.rootEvent + && rootEvent?.localTimestamp + && this.lastThread.rootEvent?.localTimestamp < rootEvent?.localTimestamp; - if (!this.lastThread || this.lastThread.rootEvent?.localTimestamp < rootEvent?.localTimestamp) { + if (!this.lastThread || isNewer) { this.lastThread = thread; } if (this.threadsReady) { - if (thread.length && thread.rootEvent) { - this.updateThreadRootEvents(thread, toStartOfTimeline); - } + this.updateThreadRootEvents(thread, toStartOfTimeline); } this.emit(ThreadEvent.New, thread, toStartOfTimeline); diff --git a/src/models/thread.ts b/src/models/thread.ts index 4978a11d024..d4fec1e1373 100644 --- a/src/models/thread.ts +++ b/src/models/thread.ts @@ -32,6 +32,7 @@ export enum ThreadEvent { Update = "Thread.update", NewReply = "Thread.newReply", ViewThread = "Thread.viewThread", + Delete = "Thread.delete" } type EmittedEvents = Exclude @@ -42,6 +43,7 @@ export type EventHandlerMap = { [ThreadEvent.Update]: (thread: Thread) => void; [ThreadEvent.NewReply]: (thread: Thread, event: MatrixEvent) => void; [ThreadEvent.ViewThread]: () => void; + [ThreadEvent.Delete]: (thread: Thread) => void; } & EventTimelineSetHandlerMap; interface IThreadOpts { @@ -88,6 +90,8 @@ export class Thread extends ReadReceipt { public readonly room: Room; public readonly client: MatrixClient; + public initialEventsFetched = !Thread.hasServerSideSupport; + constructor( public readonly id: string, public rootEvent: MatrixEvent | undefined, @@ -122,8 +126,7 @@ export class Thread extends ReadReceipt { // even if this thread is thought to be originating from this client, we initialise it as we may be in a // gappy sync and a thread around this event may already exist. this.initialiseThread(); - - this.rootEvent?.setThread(this); + this.setEventMetadata(this.rootEvent); } private async fetchRootEvent(): Promise { @@ -136,13 +139,7 @@ export class Thread extends ReadReceipt { } catch (e) { logger.error("Failed to fetch thread root to construct thread with", e); } - - // The root event might be not be visible to the person requesting it. - // If it wasn't fetched successfully the thread will work in "limited" mode and won't - // benefit from all the APIs a homeserver can provide to enhance the thread experience - this.rootEvent?.setThread(this); - - this.emit(ThreadEvent.Update, this); + await this.processEvent(this.rootEvent); } public static setServerSideSupport( @@ -179,42 +176,26 @@ export class Thread extends ReadReceipt { } }; - private onRedaction = (event: MatrixEvent) => { + private onRedaction = async (event: MatrixEvent) => { if (event.threadRootId !== this.id) return; // ignore redactions for other timelines - const events = [...this.timelineSet.getLiveTimeline().getEvents()].reverse(); - this.lastEvent = events.find(e => ( - !e.isRedacted() && - e.isRelation(THREAD_RELATION_TYPE.name) - )) ?? this.rootEvent!; + if (this.replyCount <= 0) { + for (const threadEvent of this.events) { + this.clearEventMetadata(threadEvent); + } + this.emit(ThreadEvent.Delete, this); + } else { + await this.initialiseThread(); + } this.emit(ThreadEvent.Update, this); }; - private onEcho = (event: MatrixEvent) => { + private onEcho = async (event: MatrixEvent) => { if (event.threadRootId !== this.id) return; // ignore echoes for other timelines if (this.lastEvent === event) return; if (!event.isRelation(THREAD_RELATION_TYPE.name)) return; - // There is a risk that the `localTimestamp` approximation will not be accurate - // when threads are used over federation. That could result in the reply - // count value drifting away from the value returned by the server - const isThreadReply = event.isRelation(THREAD_RELATION_TYPE.name); - if (!this.lastEvent || this.lastEvent.isRedacted() || (isThreadReply - && (event.getId() !== this.lastEvent.getId()) - && (event.localTimestamp > this.lastEvent.localTimestamp)) - ) { - this.lastEvent = event; - if (this.lastEvent.getId() !== this.id) { - // This counting only works when server side support is enabled as we started the counting - // from the value returned within the bundled relationship - if (Thread.hasServerSideSupport) { - this.replyCount++; - } - - this.emit(ThreadEvent.NewReply, this, event); - } - } - - this.emit(ThreadEvent.Update, this); + await this.initialiseThread(); + this.emit(ThreadEvent.NewReply, this, event); }; public get roomState(): RoomState { @@ -238,6 +219,7 @@ export class Thread extends ReadReceipt { public addEvents(events: MatrixEvent[], toStartOfTimeline: boolean): void { events.forEach(ev => this.addEvent(ev, toStartOfTimeline, false)); this.emit(ThreadEvent.Update, this); + this.initialiseThread(); } /** @@ -249,12 +231,11 @@ export class Thread extends ReadReceipt { * to the start (and not the end) of the timeline. * @param {boolean} emit whether to emit the Update event if the thread was updated or not. */ - public addEvent(event: MatrixEvent, toStartOfTimeline: boolean, emit = true): void { - event.setThread(this); + public async addEvent(event: MatrixEvent, toStartOfTimeline: boolean, emit = true): Promise { + this.setEventMetadata(event); - if (!this._currentUserParticipated && event.getSender() === this.client.getUserId()) { - this._currentUserParticipated = true; - } + const lastReply = this.lastReply(); + const isNewestReply = !lastReply || event.localTimestamp > lastReply!.localTimestamp; // Add all incoming events to the thread's timeline set when there's no server support if (!Thread.hasServerSideSupport) { @@ -265,15 +246,13 @@ export class Thread extends ReadReceipt { this.addEventToTimeline(event, toStartOfTimeline); this.client.decryptEventIfNeeded(event, {}); - } else if (!toStartOfTimeline && - event.localTimestamp > this.lastReply()!.localTimestamp - ) { - this.fetchEditsWhereNeeded(event); + } else if (!toStartOfTimeline && this.initialEventsFetched && isNewestReply) { + await this.fetchEditsWhereNeeded(event); this.addEventToTimeline(event, false); } else if (event.isRelation(RelationType.Annotation) || event.isRelation(RelationType.Replace)) { // Apply annotations and replace relations to the relations of the timeline only - this.timelineSet.relations.aggregateParentEvent(event); - this.timelineSet.relations.aggregateChildEvent(event, this.timelineSet); + this.timelineSet.relations?.aggregateParentEvent(event); + this.timelineSet.relations?.aggregateChildEvent(event, this.timelineSet); return; } @@ -284,7 +263,15 @@ export class Thread extends ReadReceipt { } if (emit) { - this.emit(ThreadEvent.Update, this); + this.emit(ThreadEvent.NewReply, this, event); + this.initialiseThread(); + } + } + + public async processEvent(event: Optional): Promise { + if (event) { + this.setEventMetadata(event); + await this.fetchEditsWhereNeeded(event); } } @@ -294,7 +281,7 @@ export class Thread extends ReadReceipt { private async initialiseThread(): Promise { let bundledRelationship = this.getRootEventBundledRelationship(); - if (Thread.hasServerSideSupport && !bundledRelationship) { + if (Thread.hasServerSideSupport) { await this.fetchRootEvent(); bundledRelationship = this.getRootEventBundledRelationship(); } @@ -303,22 +290,32 @@ export class Thread extends ReadReceipt { this.replyCount = bundledRelationship.count; this._currentUserParticipated = !!bundledRelationship.current_user_participated; - const event = new MatrixEvent({ - room_id: this.rootEvent.getRoomId(), - ...bundledRelationship.latest_event, - }); - this.setEventMetadata(event); - event.setThread(this); - this.lastEvent = event; + const mapper = this.client.getEventMapper(); + this.lastEvent = mapper(bundledRelationship.latest_event); + await this.processEvent(this.lastEvent); + } - this.fetchEditsWhereNeeded(event); + if (!this.initialEventsFetched) { + this.initialEventsFetched = true; + // fetch initial event to allow proper pagination + try { + // if the thread has regular events, this will just load the last reply. + // if the thread is newly created, this will load the root event. + await this.client.paginateEventTimeline(this.liveTimeline, { backwards: true, limit: 1 }); + // just to make sure that, if we've created a timeline window for this thread before the thread itself + // existed (e.g. when creating a new thread), we'll make sure the panel is force refreshed correctly. + this.emit(RoomEvent.TimelineReset, this.room, this.timelineSet, true); + } catch (e) { + logger.error("Failed to load start of newly created thread: ", e); + this.initialEventsFetched = false; + } } this.emit(ThreadEvent.Update, this); } // XXX: Workaround for https://github.com/matrix-org/matrix-spec-proposals/pull/2676/files#r827240084 - public async fetchEditsWhereNeeded(...events: MatrixEvent[]): Promise { + private async fetchEditsWhereNeeded(...events: MatrixEvent[]): Promise { return Promise.all(events.filter(e => e.isEncrypted()).map((event: MatrixEvent) => { if (event.isRelation()) return; // skip - relations don't get edits return this.client.relations(this.roomId, event.getId(), RelationType.Replace, event.getType(), { @@ -333,9 +330,18 @@ export class Thread extends ReadReceipt { })); } - public setEventMetadata(event: MatrixEvent): void { - EventTimeline.setEventMetadata(event, this.roomState, false); - event.setThread(this); + public setEventMetadata(event: Optional): void { + if (event) { + EventTimeline.setEventMetadata(event, this.roomState, false); + event.setThread(this); + } + } + + public clearEventMetadata(event: Optional): void { + if (event) { + event.setThread(null); + delete event.event?.unsigned?.["m.relations"]?.[THREAD_RELATION_TYPE.name]; + } } /** @@ -429,3 +435,12 @@ export enum ThreadFilterType { "My", "All" } + +export function threadFilterTypeToFilter(type: ThreadFilterType): 'all' | 'participated' { + switch (type) { + case ThreadFilterType.My: + return 'participated'; + case ThreadFilterType.All: + return 'all'; + } +}