Skip to content

Commit

Permalink
implement new thread timeline loading
Browse files Browse the repository at this point in the history
  • Loading branch information
justjanne committed Oct 21, 2022
1 parent dc5b06e commit 093d7cc
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 113 deletions.
77 changes: 51 additions & 26 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,14 @@ import { TypedEventEmitter } from "./models/typed-event-emitter";
import { ReceiptType } from "./@types/read_receipts";
import { MSC3575SlidingSyncRequest, MSC3575SlidingSyncResponse, SlidingSync } from "./sliding-sync";
import { SlidingSyncSdk } from "./sliding-sync-sdk";
import { FeatureSupport, Thread, THREAD_RELATION_TYPE, determineFeatureSupport } from "./models/thread";
import {
FeatureSupport,
Thread,
THREAD_RELATION_TYPE,
determineFeatureSupport,
ThreadFilterType,
threadFilterTypeToFilter,
} from "./models/thread";
import { MBeaconInfoEventContent, M_BEACON_INFO } from "./@types/beacon";
import { UnstableValue } from "./NamespacedValue";
import { ToDeviceMessageQueue } from "./ToDeviceMessageQueue";
Expand Down Expand Up @@ -5160,7 +5167,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
return timelineSet.getTimelineForEvent(eventId);
}

if (this.supportsExperimentalThreads()) {
if (timelineSet.thread && this.supportsExperimentalThreads()) {
return this.getThreadTimeline(timelineSet, eventId);
}

Expand Down Expand Up @@ -5227,6 +5234,14 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
throw new Error("could not get thread timeline: no client support");
}

if (!timelineSet.room) {
throw new Error("could not get thread timeline: not a room timeline");
}

if (!timelineSet.thread) {
throw new Error("could not get thread timeline: not a thread timeline");
}

const path = utils.encodeUri(
"/rooms/$roomId/context/$eventId", {
$roomId: timelineSet.room.roomId,
Expand Down Expand Up @@ -5257,14 +5272,14 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
}

const thread = timelineSet.thread;
const resOlder = await this.fetchRelations(
const resOlder: IRelationsResponse = await this.fetchRelations(
timelineSet.room.roomId,
thread.id,
THREAD_RELATION_TYPE.name,
null,
{ dir: Direction.Backward, from: res.start },
);
const resNewer = await this.fetchRelations(
const resNewer: IRelationsResponse = await this.fetchRelations(
timelineSet.room.roomId,
thread.id,
THREAD_RELATION_TYPE.name,
Expand All @@ -5279,7 +5294,9 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
event,
...resOlder.chunk.map(mapper),
];
await timelineSet.thread?.fetchEditsWhereNeeded(...events);
for (const event of events) {
await timelineSet.thread?.processEvent(event);
}

// Here we handle non-thread timelines only, but still process any thread events to populate thread summaries.
let timeline = timelineSet.getTimelineForEvent(event.getId());
Expand Down Expand Up @@ -5317,10 +5334,10 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
null,
{ dir: Direction.Backward, from: res.start },
);
const eventsNewer = [];
const eventsNewer: IEvent[] = [];
let nextBatch: Optional<string> = res.end;
while (nextBatch) {
const resNewer = await this.fetchRelations(
const resNewer: IRelationsResponse = await this.fetchRelations(
timelineSet.room.roomId,
thread.id,
THREAD_RELATION_TYPE.name,
Expand All @@ -5338,8 +5355,9 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
event,
...resOlder.chunk.map(mapper),
];

await timelineSet.thread?.fetchEditsWhereNeeded(...events);
for (const event of events) {
await timelineSet.thread?.processEvent(event);
}

// Here we handle non-thread timelines only, but still process any thread events to populate thread
// summaries.
Expand Down Expand Up @@ -5381,12 +5399,13 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
}

let event;
if (timelineSet.isThreadTimeline) {
if (timelineSet.threadListType) {
const res = await this.createThreadListMessagesRequest(
timelineSet.room.roomId,
null,
1,
Direction.Backward,
timelineSet.threadListType,
timelineSet.getFilter(),
);
event = res.chunk?.[0];
Expand Down Expand Up @@ -5490,14 +5509,15 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
fromToken: string | null,
limit = 30,
dir = Direction.Backward,
threadListType: ThreadFilterType = ThreadFilterType.All,
timelineFilter?: Filter,
): Promise<IMessagesResponse> {
const path = utils.encodeUri("/rooms/$roomId/threads", { $roomId: roomId });

const params: Record<string, string> = {
limit: limit.toString(),
dir: dir,
include: 'all',
include: threadFilterTypeToFilter(threadListType),
};

if (fromToken) {
Expand All @@ -5509,7 +5529,6 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
// create a shallow copy of LAZY_LOADING_MESSAGES_FILTER,
// so the timelineFilter doesn't get written into it below
filter = {
...filter,
...Filter.LAZY_LOADING_MESSAGES_FILTER,
};
}
Expand All @@ -5525,10 +5544,11 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
params.filter = JSON.stringify(filter);
}

const opts: { prefix?: string } = {};
if (Thread.hasServerSideListSupport === FeatureSupport.Experimental) {
opts.prefix = "/_matrix/client/unstable/org.matrix.msc3856";
}
const opts = {
prefix: Thread.hasServerSideListSupport === FeatureSupport.Stable
? "/_matrix/client/v1"
: "/_matrix/client/unstable/org.matrix.msc3856",
};

return this.http.authedRequest<IThreadedMessagesResponse>(Method.Get, path, params, undefined, opts)
.then(res => ({
Expand All @@ -5555,8 +5575,8 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
public paginateEventTimeline(eventTimeline: EventTimeline, opts: IPaginateOpts): Promise<boolean> {
const isNotifTimeline = (eventTimeline.getTimelineSet() === this.notifTimelineSet);
const room = this.getRoom(eventTimeline.getRoomId()!);
const isThreadListTimeline = eventTimeline.getTimelineSet().isThreadTimeline;
const isThreadTimeline = (eventTimeline.getTimelineSet().thread);
const threadListType = eventTimeline.getTimelineSet().threadListType;
const thread = eventTimeline.getTimelineSet().thread;

// TODO: we should implement a backoff (as per scrollback()) to deal more
// nicely with HTTP errors.
Expand Down Expand Up @@ -5627,7 +5647,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
eventTimeline.paginationRequests[dir] = null;
});
eventTimeline.paginationRequests[dir] = promise;
} else if (isThreadListTimeline) {
} else if (threadListType) {
if (!room) {
throw new Error("Unknown room " + eventTimeline.getRoomId());
}
Expand All @@ -5641,6 +5661,7 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
token,
opts.limit,
dir,
threadListType,
eventTimeline.getFilter(),
).then((res) => {
if (res.state) {
Expand All @@ -5667,22 +5688,24 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
eventTimeline.paginationRequests[dir] = null;
});
eventTimeline.paginationRequests[dir] = promise;
} else if (isThreadTimeline) {
} else if (thread) {
const room = this.getRoom(eventTimeline.getRoomId());
if (!room) {
throw new Error("Unknown room " + eventTimeline.getRoomId());
}

promise = this.fetchRelations(
eventTimeline.getRoomId(),
eventTimeline.getTimelineSet().thread?.id,
thread.id,
THREAD_RELATION_TYPE.name,
null,
{ dir, limit: opts.limit, from: token },
).then((res) => {
{ dir, limit: opts.limit, from: token ?? undefined },
).then(async (res) => {
const mapper = this.getEventMapper();
const matrixEvents = res.chunk.map(mapper);
eventTimeline.getTimelineSet().thread?.fetchEditsWhereNeeded(...matrixEvents);
for (const event of matrixEvents) {
await eventTimeline.getTimelineSet()?.thread?.processEvent(event);
}

const newToken = res.next_batch;

Expand Down Expand Up @@ -5725,10 +5748,12 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
const matrixEvents = res.chunk.map(this.getEventMapper());

const timelineSet = eventTimeline.getTimelineSet();
const [timelineEvents, threadedEvents] = room.partitionThreadedEvents(matrixEvents);
const [timelineEvents] = room.partitionThreadedEvents(matrixEvents);
timelineSet.addEventsToTimeline(timelineEvents, backwards, eventTimeline, token);
this.processBeaconEvents(room, timelineEvents);
this.processThreadEvents(room, threadedEvents, backwards);
this.processThreadRoots(room,
timelineEvents.filter(it => it.isRelation(THREAD_RELATION_TYPE.name)),
false);

const atEnd = res.end === undefined || res.end === res.start;

Expand Down
4 changes: 2 additions & 2 deletions src/models/event-timeline-set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import { RoomState } from "./room-state";
import { TypedEventEmitter } from "./typed-event-emitter";
import { RelationsContainer } from "./relations-container";
import { MatrixClient } from "../client";
import { Thread } from "./thread";
import { Thread, ThreadFilterType } from "./thread";

const DEBUG = true;

Expand Down Expand Up @@ -140,7 +140,7 @@ export class EventTimelineSet extends TypedEventEmitter<EmittedEvents, EventTime
opts: IOpts = {},
client?: MatrixClient,
public readonly thread?: Thread,
public readonly isThreadTimeline: boolean = false,
public readonly threadListType: ThreadFilterType | null = null,
) {
super();

Expand Down
80 changes: 57 additions & 23 deletions src/models/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ export type RoomEmittedEvents = RoomEvent
| ThreadEvent.New
| ThreadEvent.Update
| ThreadEvent.NewReply
| ThreadEvent.Delete
| MatrixEventEvent.BeforeRedaction
| BeaconEvent.New
| BeaconEvent.Update
Expand Down Expand Up @@ -180,7 +181,7 @@ export type RoomEventHandlerMap = {
[ThreadEvent.New]: (thread: Thread, toStartOfTimeline: boolean) => void;
} & Pick<
ThreadHandlerMap,
ThreadEvent.Update | ThreadEvent.NewReply
ThreadEvent.Update | ThreadEvent.NewReply | ThreadEvent.Delete
>
& EventTimelineSetHandlerMap
& Pick<MatrixEventHandlerMap, MatrixEventEvent.BeforeRedaction>
Expand Down Expand Up @@ -1661,8 +1662,12 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
private async createThreadTimelineSet(filterType?: ThreadFilterType): Promise<EventTimelineSet> {
let timelineSet: EventTimelineSet;
if (Thread.hasServerSideListSupport) {
let threadListType: ThreadFilterType | null = null;
if (Thread.hasServerSideListSupport) {
threadListType = filterType ?? ThreadFilterType.All;
}
timelineSet =
new EventTimelineSet(this, this.opts, undefined, undefined, Boolean(Thread.hasServerSideListSupport));
new EventTimelineSet(this, this.opts, undefined, undefined, threadListType);
this.reEmitter.reEmit(timelineSet, [
RoomEvent.Timeline,
RoomEvent.TimelineReset,
Expand Down Expand Up @@ -1796,6 +1801,7 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
}

this.on(ThreadEvent.NewReply, this.onThreadNewReply);
this.on(ThreadEvent.Delete, this.onThreadDelete);
this.threadsReady = true;
}

Expand All @@ -1814,6 +1820,7 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
null,
undefined,
Direction.Backward,
timelineSet.threadListType,
timelineSet.getFilter(),
);

Expand All @@ -1834,8 +1841,21 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
}

private onThreadNewReply(thread: Thread): void {
if (thread.length && thread.rootEvent) {
this.updateThreadRootEvents(thread, false);
this.updateThreadRootEvents(thread, false);
}

private onThreadDelete(thread: Thread): void {
this.threads.delete(thread.id);

const timeline = this.getTimelineForEvent(thread.id);
const roomEvent = timeline?.getEvents()?.find(it => it.getId() === thread.id);
if (roomEvent) {
thread.clearEventMetadata(roomEvent);
} else {
logger.debug("onThreadDelete: Could not find root event in room timeline");
}
for (const timelineSet of this.threadsTimelineSets) {
timelineSet.removeEvent(thread.id);
}
}

Expand Down Expand Up @@ -1948,9 +1968,11 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
}

private updateThreadRootEvents = (thread: Thread, toStartOfTimeline: boolean) => {
this.updateThreadRootEvent(this.threadsTimelineSets?.[0], thread, toStartOfTimeline);
if (thread.hasCurrentUserParticipated) {
this.updateThreadRootEvent(this.threadsTimelineSets?.[1], thread, toStartOfTimeline);
if (thread.length) {
this.updateThreadRootEvent(this.threadsTimelineSets?.[0], thread, toStartOfTimeline);
if (thread.hasCurrentUserParticipated) {
this.updateThreadRootEvent(this.threadsTimelineSets?.[1], thread, toStartOfTimeline);
}
}
};

Expand All @@ -1959,18 +1981,20 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
thread: Thread,
toStartOfTimeline: boolean,
) => {
if (Thread.hasServerSideSupport) {
timelineSet.addLiveEvent(thread.rootEvent, {
duplicateStrategy: DuplicateStrategy.Replace,
fromCache: false,
roomState: this.currentState,
});
} else {
timelineSet.addEventToTimeline(
thread.rootEvent,
timelineSet.getLiveTimeline(),
{ toStartOfTimeline },
);
if (timelineSet && thread.rootEvent) {
if (Thread.hasServerSideSupport) {
timelineSet.addLiveEvent(thread.rootEvent, {
duplicateStrategy: DuplicateStrategy.Replace,
fromCache: false,
roomState: this.currentState,
});
} else {
timelineSet.addEventToTimeline(
thread.rootEvent,
timelineSet.getLiveTimeline(),
{ toStartOfTimeline },
);
}
}
};

Expand All @@ -1994,23 +2018,33 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
client: this.client,
});

// This is necessary to be able to jump to events in threads:
// If we jump to an event in a thread where neither the event, nor the root,
// nor any thread event are loaded yet, we'll load the event as well as the thread root, create the thread,
// and pass the event through this.
for (const event of events) {
thread.setEventMetadata(event);
}

// If we managed to create a thread and figure out its `id` then we can use it
this.threads.set(thread.id, thread);
this.reEmitter.reEmit(thread, [
ThreadEvent.Delete,
ThreadEvent.Update,
ThreadEvent.NewReply,
RoomEvent.Timeline,
RoomEvent.TimelineReset,
]);
const isNewer = this.lastThread?.rootEvent
&& rootEvent?.localTimestamp
&& this.lastThread.rootEvent?.localTimestamp < rootEvent?.localTimestamp;

if (!this.lastThread || this.lastThread.rootEvent?.localTimestamp < rootEvent?.localTimestamp) {
if (!this.lastThread || isNewer) {
this.lastThread = thread;
}

if (this.threadsReady) {
if (thread.length && thread.rootEvent) {
this.updateThreadRootEvents(thread, toStartOfTimeline);
}
this.updateThreadRootEvents(thread, toStartOfTimeline);
}

this.emit(ThreadEvent.New, thread, toStartOfTimeline);
Expand Down
Loading

0 comments on commit 093d7cc

Please sign in to comment.