Skip to content

Commit

Permalink
poc
Browse files Browse the repository at this point in the history
  • Loading branch information
justjanne committed Oct 12, 2022
1 parent 1e26f43 commit 37b5fa0
Show file tree
Hide file tree
Showing 5 changed files with 353 additions and 175 deletions.
269 changes: 222 additions & 47 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5355,6 +5355,12 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
return timelineSet.getTimelineForEvent(eventId);
}

if (this.supportsExperimentalThreads()
&& Thread.hasServerSideSupport
&& timelineSet.thread) {
return this.getThreadTimeline(timelineSet, eventId);
}

const path = utils.encodeUri(
"/rooms/$roomId/context/$eventId", {
$roomId: timelineSet.room.roomId,
Expand Down Expand Up @@ -5389,38 +5395,6 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
...res.events_before.map(mapper),
];

if (this.supportsExperimentalThreads()) {
if (!timelineSet.canContain(event)) {
return undefined;
}

// Where the event is a thread reply (not a root) and running in MSC-enabled mode the Thread timeline only
// functions contiguously, so we have to jump through some hoops to get our target event in it.
// XXX: workaround for https://github.com/vector-im/element-meta/issues/150
if (Thread.hasServerSideSupport && timelineSet.thread) {
const thread = timelineSet.thread;
const opts: IRelationsRequestOpts = {
dir: Direction.Backward,
limit: 50,
};

await thread.fetchInitialEvents();
let nextBatch: string | null | undefined = thread.liveTimeline.getPaginationToken(Direction.Backward);

// Fetch events until we find the one we were asked for, or we run out of pages
while (!thread.findEventById(eventId)) {
if (nextBatch) {
opts.from = nextBatch;
}

({ nextBatch } = await thread.fetchEvents(opts));
if (!nextBatch) break;
}

return thread.liveTimeline;
}
}

// Here we handle non-thread timelines only, but still process any thread events to populate thread summaries.
let timeline = timelineSet.getTimelineForEvent(events[0].getId());
if (timeline) {
Expand All @@ -5445,6 +5419,144 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
?? timeline;
}

public async getThreadTimeline(timelineSet: EventTimelineSet, eventId: string): Promise<EventTimeline | undefined> {
if (!Thread.hasServerSideSupport) {
throw new Error("could not get thread timeline: no serverside support");
}

if (!this.supportsExperimentalThreads()) {
throw new Error("could not get thread timeline: no client support");
}

const path = utils.encodeUri(
"/rooms/$roomId/context/$eventId", {
$roomId: timelineSet.room.roomId,
$eventId: eventId,
},
);

const params: Record<string, string | string[]> = {
limit: "0",
};
if (this.clientOpts.lazyLoadMembers) {
params.filter = JSON.stringify(Filter.LAZY_LOADING_MESSAGES_FILTER);
}

// TODO: we should implement a backoff (as per scrollback()) to deal more nicely with HTTP errors.
const res = await this.http.authedRequest<IContextResponse>(undefined, Method.Get, path, params);
const mapper = this.getEventMapper();
const event = mapper(res.event);

if (!timelineSet.canContain(event)) {
return undefined;
}

if (Thread.hasServerSideFwdPaginationSupport) {
if (!timelineSet.thread) {
throw new Error("could not get thread timeline: not a thread timeline");
}

const thread = timelineSet.thread;
const resOlder = await this.fetchRelations(
timelineSet.room.roomId,
thread.id,
THREAD_RELATION_TYPE.name,
null,
{ dir: Direction.Backward, from: res.start },
);
const resNewer = await this.fetchRelations(
timelineSet.room.roomId,
thread.id,
THREAD_RELATION_TYPE.name,
null,
{ dir: Direction.Forward, from: res.end },
);
const events = [
// Order events from most recent to oldest (reverse-chronological).
// We start with the last event, since that's the point at which we have known state.
// events_after is already backwards; events_before is forwards.
...resNewer.chunk.reverse().map(mapper),
event,
...resOlder.chunk.map(mapper),
];
await timelineSet.thread?.fetchEditsWhereNeeded(...events);

// Here we handle non-thread timelines only, but still process any thread events to populate thread summaries.
let timeline = timelineSet.getTimelineForEvent(event.getId());
if (timeline) {
timeline.getState(EventTimeline.BACKWARDS).setUnknownStateEvents(res.state.map(mapper));
} else {
timeline = timelineSet.addTimeline();
timeline.initialiseState(res.state.map(mapper));
}

timelineSet.addEventsToTimeline(events, true, timeline, resNewer.next_batch);
if (!resOlder.next_batch) {
timelineSet.addEventsToTimeline([mapper(resOlder.original_event)], true, timeline, null);
}
timeline.setPaginationToken(resOlder.next_batch ?? null, Direction.Backward);
timeline.setPaginationToken(resNewer.next_batch ?? null, Direction.Forward);
this.processBeaconEvents(timelineSet.room, events);

// There is no guarantee that the event ended up in "timeline" (we might have switched to a neighbouring
// timeline) - so check the room's index again. On the other hand, there's no guarantee the event ended up
// anywhere, if it was later redacted, so we just return the timeline we first thought of.
return timelineSet.getTimelineForEvent(eventId)
?? timeline;
} else {
// Where the event is a thread reply (not a root) and running in MSC-enabled mode the Thread timeline only
// functions contiguously, so we have to jump through some hoops to get our target event in it.
// XXX: workaround for https://github.com/vector-im/element-meta/issues/150

const thread = timelineSet.thread;

const resOlder = await this.fetchRelations(
timelineSet.room.roomId,
thread.id,
THREAD_RELATION_TYPE.name,
null,
{ dir: Direction.Backward, from: res.start },
);
let eventsNewer = [];
let nextBatch: Optional<string> = res.end;
while (nextBatch) {
const resNewer = await this.fetchRelations(
timelineSet.room.roomId,
thread.id,
THREAD_RELATION_TYPE.name,
null,
{ dir: Direction.Forward, from: nextBatch },
);
nextBatch = resNewer.next_batch ?? null;
eventsNewer.push(...resNewer.chunk);
}
const events = [
// Order events from most recent to oldest (reverse-chronological).
// We start with the last event, since that's the point at which we have known state.
// events_after is already backwards; events_before is forwards.
...eventsNewer.reverse().map(mapper),
event,
...resOlder.chunk.map(mapper),
];

await timelineSet.thread?.fetchEditsWhereNeeded(...events);

// Here we handle non-thread timelines only, but still process any thread events to populate thread summaries.
let timeline = timelineSet.getLiveTimeline();
timeline.getState(EventTimeline.BACKWARDS).setUnknownStateEvents(res.state.map(mapper));

timelineSet.addEventsToTimeline(events, true, timeline, null);
if (!resOlder.next_batch) {
timelineSet.addEventsToTimeline([mapper(resOlder.original_event)], true, timeline, null);
}
timeline.setPaginationToken(resOlder.next_batch ?? null, Direction.Backward);
timeline.setPaginationToken(null, Direction.Forward);
this.processBeaconEvents(timelineSet.room, events);

return timeline;
}
}

/**
* Get an EventTimeline for the latest events in the room. This will just
* call `/messages` to get the latest message in the room, then use
Expand All @@ -5466,28 +5578,44 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
throw new Error("getLatestTimeline only supports room timelines");
}

let res: IMessagesResponse;
const roomId = timelineSet.room.roomId;
let event;
if (timelineSet.isThreadTimeline) {
res = await this.createThreadListMessagesRequest(
roomId,
const res = await this.createThreadListMessagesRequest(
timelineSet.room.roomId,
null,
1,
Direction.Backward,
timelineSet.getFilter(),
);
} else {
res = await this.createMessagesRequest(
roomId,
event = res.chunk?.[0];
} else if (timelineSet.thread && Thread.hasServerSideSupport) {
const res = await this.fetchRelations(
timelineSet.room.roomId,
timelineSet.thread.id,
THREAD_RELATION_TYPE.name,
null,
1,
Direction.Backward,
timelineSet.getFilter(),
{ dir: Direction.Backward, limit: 1 },
);
event = res.chunk?.[0];
} else {
const messagesPath = utils.encodeUri(
"/rooms/$roomId/messages", {
$roomId: timelineSet.room.roomId,
},
);

const params: Record<string, string | string[]> = {
dir: 'b',
};
if (this.clientOpts.lazyLoadMembers) {
params.filter = JSON.stringify(Filter.LAZY_LOADING_MESSAGES_FILTER);
}

const res = await this.http.authedRequest<IMessagesResponse>(undefined, Method.Get, messagesPath, params);
event = res.chunk?.[0];
}
const event = res.chunk?.[0];
if (!event) {
throw new Error("No message returned from /messages when trying to construct getLatestTimeline");
throw new Error("No message returned when trying to construct getLatestTimeline");
}

return this.getEventTimeline(timelineSet, event.event_id);
Expand Down Expand Up @@ -5625,7 +5753,8 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
public paginateEventTimeline(eventTimeline: EventTimeline, opts: IPaginateOpts): Promise<boolean> {
const isNotifTimeline = (eventTimeline.getTimelineSet() === this.notifTimelineSet);
const room = this.getRoom(eventTimeline.getRoomId());
const isThreadTimeline = eventTimeline.getTimelineSet().isThreadTimeline;
const isThreadListTimeline = eventTimeline.getTimelineSet().isThreadTimeline;
const isThreadTimeline = (eventTimeline.getTimelineSet().thread);

// TODO: we should implement a backoff (as per scrollback()) to deal more
// nicely with HTTP errors.
Expand Down Expand Up @@ -5696,11 +5825,15 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
eventTimeline.paginationRequests[dir] = null;
});
eventTimeline.paginationRequests[dir] = promise;
} else if (isThreadTimeline) {
} else if (isThreadListTimeline) {
if (!room) {
throw new Error("Unknown room " + eventTimeline.getRoomId());
}

if (!Thread.hasServerSideFwdPaginationSupport && dir === Direction.Forward) {
throw new Error("Cannot paginate threads forwards without server-side support for MSC 3715");
}

promise = this.createThreadListMessagesRequest(
eventTimeline.getRoomId(),
token,
Expand Down Expand Up @@ -5732,6 +5865,43 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
eventTimeline.paginationRequests[dir] = null;
});
eventTimeline.paginationRequests[dir] = promise;
} else if (isThreadTimeline) {
const room = this.getRoom(eventTimeline.getRoomId());
if (!room) {
throw new Error("Unknown room " + eventTimeline.getRoomId());
}

promise = this.fetchRelations(
eventTimeline.getRoomId(),
eventTimeline.getTimelineSet().thread?.id,
THREAD_RELATION_TYPE.name,
null,
{ dir, limit: opts.limit, from: token },
).then((res) => {
const mapper = this.getEventMapper();
const matrixEvents = res.chunk.map(mapper);
eventTimeline.getTimelineSet().thread?.fetchEditsWhereNeeded(...matrixEvents);

const newToken = res.next_batch;

const timelineSet = eventTimeline.getTimelineSet();
timelineSet.addEventsToTimeline(matrixEvents, backwards, eventTimeline, newToken ?? null);
if (!newToken && backwards) {
timelineSet.addEventsToTimeline([mapper(res.original_event)], true, eventTimeline, null);
}
this.processBeaconEvents(timelineSet.room, matrixEvents);

// if we've hit the end of the timeline, we need to stop trying to
// paginate. We need to keep the 'forwards' token though, to make sure
// we can recover from gappy syncs.
if (backwards && !newToken) {
eventTimeline.setPaginationToken(null, dir);
}
return Boolean(newToken);
}).finally(() => {
eventTimeline.paginationRequests[dir] = null;
});
eventTimeline.paginationRequests[dir] = promise;
} else {
if (!room) {
throw new Error("Unknown room " + eventTimeline.getRoomId());
Expand All @@ -5756,7 +5926,12 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
const [timelineEvents, threadedEvents] = room.partitionThreadedEvents(matrixEvents);
timelineSet.addEventsToTimeline(timelineEvents, backwards, eventTimeline, token);
this.processBeaconEvents(room, timelineEvents);
this.processThreadEvents(room, threadedEvents, backwards);
if (Thread.hasServerSideSupport !== FeatureSupport.Stable) {
this.processThreadEvents(room, threadedEvents, backwards);
}
this.processThreadRoots(room,
timelineEvents.filter(it => it.isRelation(THREAD_RELATION_TYPE.name)),
false);

const atEnd = res.end === undefined || res.end === res.start;

Expand Down
6 changes: 6 additions & 0 deletions src/models/event-timeline-set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,12 @@ export class EventTimelineSet extends TypedEventEmitter<EmittedEvents, EventTime
}
}

if (this.thread) {
for (const event of events) {
this.thread.setEventMetadata(event);
}
}

const direction = toStartOfTimeline ? EventTimeline.BACKWARDS :
EventTimeline.FORWARDS;
const inverseDirection = toStartOfTimeline ? EventTimeline.FORWARDS :
Expand Down
Loading

0 comments on commit 37b5fa0

Please sign in to comment.