diff --git a/src/client.ts b/src/client.ts index 68c92a899a0..2a00d220258 100644 --- a/src/client.ts +++ b/src/client.ts @@ -5354,6 +5354,12 @@ export class MatrixClient extends TypedEventEmitter { + if (!Thread.hasServerSideSupport) { + throw new Error("could not get thread timeline: no serverside support"); + } + + if (!this.supportsExperimentalThreads()) { + throw new Error("could not get thread timeline: no client support"); + } + + const path = utils.encodeUri( + "/rooms/$roomId/context/$eventId", { + $roomId: timelineSet.room.roomId, + $eventId: eventId, + }, + ); + + const params: Record = { + limit: "0", + }; + if (this.clientOpts.lazyLoadMembers) { + params.filter = JSON.stringify(Filter.LAZY_LOADING_MESSAGES_FILTER); + } + + // TODO: we should implement a backoff (as per scrollback()) to deal more nicely with HTTP errors. + const res = await this.http.authedRequest(undefined, Method.Get, path, params); + const mapper = this.getEventMapper(); + const event = mapper(res.event); + + if (!timelineSet.canContain(event)) { + return undefined; + } + + if (Thread.hasServerSideSupport === FeatureSupport.Stable) { + if (!timelineSet.thread) { + throw new Error("could not get thread timeline: not a thread timeline"); + } + + const thread = timelineSet.thread; + const resOlder = await this.fetchRelations( + timelineSet.room.roomId, + thread.id, + THREAD_RELATION_TYPE.name, + null, + { dir: Direction.Backward, from: res.start }, + ); + const resNewer = await this.fetchRelations( + timelineSet.room.roomId, + thread.id, + THREAD_RELATION_TYPE.name, + null, + { dir: Direction.Forward, from: res.end }, + ); + const events = [ + // Order events from most recent to oldest (reverse-chronological). + // We start with the last event, since that's the point at which we have known state. + // events_after is already backwards; events_before is forwards. + ...resNewer.chunk.reverse().map(mapper), + event, + ...resOlder.chunk.map(mapper), + ]; + await timelineSet.thread?.fetchEditsWhereNeeded(...events); + + // Here we handle non-thread timelines only, but still process any thread events to populate thread summaries. + let timeline = timelineSet.getTimelineForEvent(event.getId()); + if (timeline) { + timeline.getState(EventTimeline.BACKWARDS).setUnknownStateEvents(res.state.map(mapper)); + } else { + timeline = timelineSet.addTimeline(); + timeline.initialiseState(res.state.map(mapper)); + } + + timelineSet.addEventsToTimeline(events, true, timeline, resNewer.next_batch); + if (!resOlder.next_batch) { + timelineSet.addEventsToTimeline([mapper(resOlder.original_event)], true, timeline, null); + } + timeline.setPaginationToken(resOlder.next_batch ?? null, Direction.Backward); + timeline.setPaginationToken(resNewer.next_batch ?? null, Direction.Forward); + this.processBeaconEvents(timelineSet.room, events); + + // There is no guarantee that the event ended up in "timeline" (we might have switched to a neighbouring + // timeline) - so check the room's index again. On the other hand, there's no guarantee the event ended up + // anywhere, if it was later redacted, so we just return the timeline we first thought of. + return timelineSet.getTimelineForEvent(eventId) + ?? timeline; + } else { + // Where the event is a thread reply (not a root) and running in MSC-enabled mode the Thread timeline only + // functions contiguously, so we have to jump through some hoops to get our target event in it. + // XXX: workaround for https://github.com/vector-im/element-meta/issues/150 + + const thread = timelineSet.thread; + const opts: IRelationsRequestOpts = { + dir: Direction.Backward, + limit: 50, + }; + + await thread.fetchInitialEvents(); + let nextBatch: string | null | undefined = thread.liveTimeline.getPaginationToken(Direction.Backward); + + // Fetch events until we find the one we were asked for, or we run out of pages + while (!thread.findEventById(eventId)) { + if (nextBatch) { + opts.from = nextBatch; + } + + ({ nextBatch } = await thread.fetchEvents(opts)); + if (!nextBatch) break; + } + + return thread.liveTimeline; + } + } + /** * Get an EventTimeline for the latest events in the room. This will just * call `/messages` to get the latest message in the room, then use @@ -5465,28 +5551,44 @@ export class MatrixClient extends TypedEventEmitter = { + dir: 'b', + }; + if (this.clientOpts.lazyLoadMembers) { + params.filter = JSON.stringify(Filter.LAZY_LOADING_MESSAGES_FILTER); + } + + const res = await this.http.authedRequest(undefined, Method.Get, messagesPath, params); + event = res.chunk?.[0]; } - const event = res.chunk?.[0]; if (!event) { - throw new Error("No message returned from /messages when trying to construct getLatestTimeline"); + throw new Error("No message returned when trying to construct getLatestTimeline"); } return this.getEventTimeline(timelineSet, event.event_id); @@ -5624,7 +5726,8 @@ export class MatrixClient extends TypedEventEmitter { const isNotifTimeline = (eventTimeline.getTimelineSet() === this.notifTimelineSet); const room = this.getRoom(eventTimeline.getRoomId()); - const isThreadTimeline = eventTimeline.getTimelineSet().isThreadTimeline; + const isThreadListTimeline = eventTimeline.getTimelineSet().isThreadTimeline; + const isThreadTimeline = (eventTimeline.getTimelineSet().thread); // TODO: we should implement a backoff (as per scrollback()) to deal more // nicely with HTTP errors. @@ -5695,7 +5798,7 @@ export class MatrixClient extends TypedEventEmitter { + const mapper = this.getEventMapper(); + const matrixEvents = res.chunk.map(mapper); + eventTimeline.getTimelineSet().thread?.fetchEditsWhereNeeded(...matrixEvents); + + const newToken = res.next_batch; + + const timelineSet = eventTimeline.getTimelineSet(); + timelineSet.addEventsToTimeline(matrixEvents, backwards, eventTimeline, newToken ?? null); + if (!newToken && backwards) { + timelineSet.addEventsToTimeline([mapper(res.original_event)], true, eventTimeline, null); + } + this.processBeaconEvents(timelineSet.room, matrixEvents); + + // if we've hit the end of the timeline, we need to stop trying to + // paginate. We need to keep the 'forwards' token though, to make sure + // we can recover from gappy syncs. + if (backwards && !newToken) { + eventTimeline.setPaginationToken(null, dir); + } + return Boolean(newToken); + }).finally(() => { + eventTimeline.paginationRequests[dir] = null; + }); + eventTimeline.paginationRequests[dir] = promise; } else { if (!room) { throw new Error("Unknown room " + eventTimeline.getRoomId()); @@ -5752,10 +5892,12 @@ export class MatrixClient extends TypedEventEmitter it.isRelation(THREAD_RELATION_TYPE.name)), + false); const atEnd = res.end === undefined || res.end === res.start; diff --git a/src/models/event-timeline-set.ts b/src/models/event-timeline-set.ts index 55bd5615153..4428ddc3f17 100644 --- a/src/models/event-timeline-set.ts +++ b/src/models/event-timeline-set.ts @@ -374,6 +374,12 @@ export class EventTimelineSet extends TypedEventEmitter { /** * @experimental */ - private threads = new Map(); + public threads = new Map(); public lastThread: Thread; /** @@ -1790,13 +1790,19 @@ export class Room extends ReadReceipt { private onThreadNewReply(thread: Thread): void { const roomState = this.getLiveTimeline().getState(EventTimeline.FORWARDS); - for (const timelineSet of this.threadsTimelineSets) { - timelineSet.removeEvent(thread.id); - timelineSet.addLiveEvent(thread.rootEvent, { + if (thread.length) { + this.threadsTimelineSets?.[0]?.addLiveEvent(thread.rootEvent, { duplicateStrategy: DuplicateStrategy.Replace, fromCache: false, roomState, }); + if (thread.hasCurrentUserParticipated) { + this.threadsTimelineSets?.[1]?.addLiveEvent(thread.rootEvent, { + duplicateStrategy: DuplicateStrategy.Replace, + fromCache: false, + roomState, + }); + } } } @@ -1924,7 +1930,6 @@ export class Room extends ReadReceipt { } const thread = new Thread(threadId, rootEvent, { - initialEvents: events, room: this, client: this.client, }); @@ -1946,7 +1951,11 @@ export class Room extends ReadReceipt { this.threadsTimelineSets.forEach(timelineSet => { if (thread.rootEvent) { if (Thread.hasServerSideSupport) { - timelineSet.addLiveEvent(thread.rootEvent); + timelineSet.addLiveEvent(thread.rootEvent, { + duplicateStrategy: DuplicateStrategy.Replace, + fromCache: false, + roomState: this.currentState, + }); } else { timelineSet.addEventToTimeline( thread.rootEvent, @@ -1960,6 +1969,21 @@ export class Room extends ReadReceipt { this.emit(ThreadEvent.New, thread, toStartOfTimeline); + if (thread.length) { + this.threadsTimelineSets?.[0]?.addLiveEvent(thread.rootEvent, { + duplicateStrategy: DuplicateStrategy.Replace, + fromCache: false, + roomState: this.currentState, + }); + if (thread.hasCurrentUserParticipated) { + this.threadsTimelineSets?.[1]?.addLiveEvent(thread.rootEvent, { + duplicateStrategy: DuplicateStrategy.Replace, + fromCache: false, + roomState: this.currentState, + }); + } + } + return thread; } diff --git a/src/models/thread.ts b/src/models/thread.ts index 34568e4f8c0..ae194038bb8 100644 --- a/src/models/thread.ts +++ b/src/models/thread.ts @@ -16,9 +16,9 @@ limitations under the License. import { Optional } from "matrix-events-sdk"; -import { MatrixClient, MatrixEventEvent, RelationType, RoomEvent } from "../matrix"; +import { DuplicateStrategy, MatrixClient, MatrixEventEvent, Method, RelationType, RoomEvent } from "../matrix"; import { TypedReEmitter } from "../ReEmitter"; -import { IRelationsRequestOpts } from "../@types/requests"; +import { IContextResponse, IRelationsRequestOpts } from "../@types/requests"; import { IThreadBundledRelationship, MatrixEvent } from "./event"; import { Direction, EventTimeline } from "./event-timeline"; import { EventTimelineSet, EventTimelineSetHandlerMap } from './event-timeline-set'; @@ -26,6 +26,7 @@ import { Room } from './room'; import { RoomState } from "./room-state"; import { ServerControlledNamespacedValue } from "../NamespacedValue"; import { logger } from "../logger"; +import { encodeUri } from "../utils"; import { ReadReceipt } from "./read-receipt"; export enum ThreadEvent { @@ -46,7 +47,6 @@ export type EventHandlerMap = { } & EventTimelineSetHandlerMap; interface IThreadOpts { - initialEvents?: MatrixEvent[]; room: Room; client: MatrixClient; } @@ -122,33 +122,10 @@ export class Thread extends ReadReceipt { this.room.on(RoomEvent.LocalEchoUpdated, this.onEcho); this.timelineSet.on(RoomEvent.Timeline, this.onEcho); - if (opts.initialEvents) { - this.addEvents(opts.initialEvents, false); - } // even if this thread is thought to be originating from this client, we initialise it as we may be in a // gappy sync and a thread around this event may already exist. this.initialiseThread(); - - this.rootEvent?.setThread(this); - } - - private async fetchRootEvent(): Promise { - this.rootEvent = this.room.findEventById(this.id); - // If the rootEvent does not exist in the local stores, then fetch it from the server. - try { - const eventData = await this.client.fetchRoomEvent(this.roomId, this.id); - const mapper = this.client.getEventMapper(); - this.rootEvent = mapper(eventData); // will merge with existing event object if such is known - } catch (e) { - logger.error("Failed to fetch thread root to construct thread with", e); - } - - // The root event might be not be visible to the person requesting it. - // If it wasn't fetched successfully the thread will work in "limited" mode and won't - // benefit from all the APIs a homeserver can provide to enhance the thread experience - this.rootEvent?.setThread(this); - - this.emit(ThreadEvent.Update, this); + this.setEventMetadata(this.rootEvent); } public static setServerSideSupport( @@ -174,47 +151,66 @@ export class Thread extends ReadReceipt { event.getId() !== this.id && // the root event isn't counted in the length so ignore this redaction !redaction.status // only respect it when it succeeds ) { - this.replyCount--; + this.replyCount = Math.max(0, this.replyCount - 1); this.emit(ThreadEvent.Update, this); } }; private onRedaction = (event: MatrixEvent) => { if (event.threadRootId !== this.id) return; // ignore redactions for other timelines - const events = [...this.timelineSet.getLiveTimeline().getEvents()].reverse(); - this.lastEvent = events.find(e => ( - !e.isRedacted() && - e.isRelation(THREAD_RELATION_TYPE.name) - )) ?? this.rootEvent; + if (this.replyCount <= 0) { + const timeline = this.room.getTimelineForEvent(this.id); + const roomEvent = timeline?.getEvents()?.find(it => it.getId() === this.id); + if (roomEvent) { + this.clearEventMetadata(roomEvent); + this.rootEvent = roomEvent; + } else { + logger.error("redaction: Could not find root event in room timeline"); + } + + for (const threadEvent of this.events) { + this.clearEventMetadata(threadEvent); + } + // TODO: deleting a thread should move the redacted events back to the correct positions in the timeline + for (const timelineSet of this.room.threadsTimelineSets) { + timelineSet.removeEvent(this.id); + } + this.room.emit(RoomEvent.TimelineRefresh, this.room, timeline?.getTimelineSet()); + this.room.emit(RoomEvent.TimelineRefresh, this.room, this.timelineSet); + this.room.threads.delete(this.id); + } this.emit(ThreadEvent.Update, this); }; - private onEcho = (event: MatrixEvent) => { + private async loadEvent(event: string): Promise { + const path = encodeUri( + "/rooms/$roomId/context/$eventId", { + $roomId: this.roomId, + $eventId: event, + }, + ); + + // TODO: we should implement a backoff (as per scrollback()) to deal more nicely with HTTP errors. + const res = await this.client.http.authedRequest(undefined, Method.Get, path, { + limit: "0", + }); + if (!res.event) { + throw new Error("'event' not in '/context' result - homeserver too old?"); + } + + const mapper = this.client.getEventMapper(); + const mappedEvent = mapper(res.event); + await this.fetchEditsWhereNeeded(mappedEvent); + return mappedEvent; + } + + private onEcho = async (event: MatrixEvent) => { if (event.threadRootId !== this.id) return; // ignore echoes for other timelines if (this.lastEvent === event) return; if (!event.isRelation(THREAD_RELATION_TYPE.name)) return; - // There is a risk that the `localTimestamp` approximation will not be accurate - // when threads are used over federation. That could result in the reply - // count value drifting away from the value returned by the server - const isThreadReply = event.isRelation(THREAD_RELATION_TYPE.name); - if (!this.lastEvent || this.lastEvent.isRedacted() || (isThreadReply - && (event.getId() !== this.lastEvent.getId()) - && (event.localTimestamp > this.lastEvent.localTimestamp)) - ) { - this.lastEvent = event; - if (this.lastEvent.getId() !== this.id) { - // This counting only works when server side support is enabled as we started the counting - // from the value returned within the bundled relationship - if (Thread.hasServerSideSupport) { - this.replyCount++; - } - - this.emit(ThreadEvent.NewReply, this, event); - } - } - - this.emit(ThreadEvent.Update, this); + await this.initialiseThread(); + this.emit(ThreadEvent.NewReply, this, event); }; public get roomState(): RoomState { @@ -238,6 +234,7 @@ export class Thread extends ReadReceipt { public addEvents(events: MatrixEvent[], toStartOfTimeline: boolean): void { events.forEach(ev => this.addEvent(ev, toStartOfTimeline, false)); this.emit(ThreadEvent.Update, this); + this.initialiseThread(); } /** @@ -285,41 +282,59 @@ export class Thread extends ReadReceipt { } if (emit) { - this.emit(ThreadEvent.Update, this); + this.emit(ThreadEvent.NewReply, this, event); + this.initialiseThread(); } } - private getRootEventBundledRelationship(rootEvent = this.rootEvent): IThreadBundledRelationship { - return rootEvent?.getServerAggregatedRelation(THREAD_RELATION_TYPE.name); - } - private async initialiseThread(): Promise { - let bundledRelationship = this.getRootEventBundledRelationship(); - if (Thread.hasServerSideSupport && !bundledRelationship) { - await this.fetchRootEvent(); - bundledRelationship = this.getRootEventBundledRelationship(); - } - - if (Thread.hasServerSideSupport && bundledRelationship) { - this.replyCount = bundledRelationship.count; - this._currentUserParticipated = bundledRelationship.current_user_participated; - - const event = new MatrixEvent({ - room_id: this.rootEvent.getRoomId(), - ...bundledRelationship.latest_event, - }); - this.setEventMetadata(event); - event.setThread(this); - this.lastEvent = event; + const mapper = this.client.getEventMapper(); + const mappedEvent = await this.loadEvent(this.id); + this.setEventMetadata(mappedEvent); - this.fetchEditsWhereNeeded(event); + const metadata = mappedEvent?.getServerAggregatedRelation( + THREAD_RELATION_TYPE.name, + ); + if (metadata) { + this.replyCount = metadata.count; + const latestEvent = mapper(metadata.latest_event); + EventTimeline.setEventMetadata(latestEvent, this.roomState, false); + await this.fetchEditsWhereNeeded(latestEvent); + this.lastEvent = latestEvent; + this.rootEvent = mappedEvent; + + if (this.length) { + this.room.threadsTimelineSets?.[0]?.addLiveEvent(mappedEvent, { + duplicateStrategy: DuplicateStrategy.Replace, + fromCache: false, + roomState: this.roomState, + }); + if (this.hasCurrentUserParticipated) { + this.room.threadsTimelineSets?.[1]?.addLiveEvent(mappedEvent, { + duplicateStrategy: DuplicateStrategy.Replace, + fromCache: false, + roomState: this.roomState, + }); + } + } + const timeline = this.room.getTimelineForEvent(this.id); + const roomEvent = timeline?.getEvents()?.find(it => it.getId() === this.id); + if (roomEvent) { + roomEvent.event = mappedEvent.event; + this.setEventMetadata(roomEvent); + } else { + logger.error("initialiseThread: Could not find root event in room timeline"); + } + this.room.emit(RoomEvent.TimelineRefresh, this.room, timeline?.getTimelineSet()); + this.room.emit(RoomEvent.TimelineRefresh, this.room, this.timelineSet); + this.emit(ThreadEvent.Update, this); + } else { + logger.error("initialiseThread failed: metadata was falsy", mappedEvent); } - - this.emit(ThreadEvent.Update, this); } // XXX: Workaround for https://github.com/matrix-org/matrix-spec-proposals/pull/2676/files#r827240084 - private async fetchEditsWhereNeeded(...events: MatrixEvent[]): Promise { + public async fetchEditsWhereNeeded(...events: MatrixEvent[]): Promise { return Promise.all(events.filter(e => e.isEncrypted()).map((event: MatrixEvent) => { if (event.isRelation()) return; // skip - relations don't get edits return this.client.relations(this.roomId, event.getId(), RelationType.Replace, event.getType(), { @@ -345,6 +360,11 @@ export class Thread extends ReadReceipt { event.setThread(this); } + private clearEventMetadata(event: MatrixEvent): void { + event.setThread(null); + delete event.event?.unsigned?.["m.relations"]?.[THREAD_RELATION_TYPE.name]; + } + /** * Finds an event by ID in the current thread */ diff --git a/src/timeline-window.ts b/src/timeline-window.ts index 538eaa1a534..27c1072f940 100644 --- a/src/timeline-window.ts +++ b/src/timeline-window.ts @@ -21,6 +21,7 @@ import { logger } from './logger'; import { MatrixClient } from "./client"; import { EventTimelineSet } from "./models/event-timeline-set"; import { MatrixEvent } from "./models/event"; +import { ThreadEvent } from "./models/thread"; /** * @private @@ -123,8 +124,17 @@ export class TimelineWindow { this.start = new TimelineIndex(timeline, startIndex - timeline.getBaseIndex()); this.end = new TimelineIndex(timeline, endIndex - timeline.getBaseIndex()); this.eventCount = endIndex - startIndex; + + this.timelineSet.thread?.emit(ThreadEvent.Update, this.timelineSet.thread); }; + if (this.timelineSet.thread && !this.timelineSet.getLiveTimeline().getEvents().length) { + await this.client.paginateEventTimeline(this.timelineSet.getLiveTimeline(), { + backwards: true, + limit: 20, + }); + } + // We avoid delaying the resolution of the promise by a reactor tick if we already have the data we need, // which is important to keep room-switching feeling snappy. const timeline = initialEventId @@ -295,8 +305,7 @@ export class TimelineWindow { }).then((r) => { debuglog("TimelineWindow: request completed with result " + r); if (!r) { - // end of timeline - return false; + return this.paginate(direction, size, false, 0); } // recurse to advance the index into the results.