Skip to content

Commit

Permalink
Reworked implementation; Room::requestedEvents()
Browse files Browse the repository at this point in the history
  • Loading branch information
KitsuneRal committed Dec 4, 2024
1 parent d5ccb08 commit 6c7978d
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 41 deletions.
99 changes: 58 additions & 41 deletions Quotient/room.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,14 @@ using namespace Quotient;
using namespace std::placeholders;
using std::llround;

namespace {
enum EventsPlacement : int { Older = -1, Newer = 1 };

struct EventPromise : QPromise<const RoomEvent*> {
QDeadlineTimer deadline;
};
}

class Q_DECL_HIDDEN Room::Private {
public:
Private(Connection* c, QString id_, JoinState initialJoinState)
Expand Down Expand Up @@ -147,6 +153,7 @@ class Q_DECL_HIDDEN Room::Private {
std::optional<QString> prevBatch = QString();
int lastRequestedHistorySize = 0;
JobHandle<GetRoomEventsJob> eventsHistoryJob;
std::vector<std::pair<EventId, EventPromise>> eventPromises;
JobHandle<GetMembersByRoomJob> allMembersJob;
//! Map from megolm sessionId to set of eventIds
std::unordered_map<QString, QSet<QString>> undecryptedEvents;
Expand Down Expand Up @@ -221,6 +228,7 @@ class Q_DECL_HIDDEN Room::Private {
Timeline::const_iterator syncEdge() const { return timeline.cend(); }

JobHandle<GetRoomEventsJob> getPreviousContent(int limit = 10, const QString &filter = {});
void checkForRequestedEvents(const rev_iter_t& from, bool allHistoryLoaded);

Changes updateStateFrom(StateEvents&& events)
{
Expand Down Expand Up @@ -2335,44 +2343,53 @@ QFuture<const RoomEvent*> Room::ensureEvent(const QString& eventId, std::chrono:
if (auto eventIt = findInTimeline(eventId); eventIt != historyEdge())
return makeReadyValueFuture(eventIt->event());

auto&& p = QPromise<const RoomEvent*>{};
auto future = p.future();
connectUntil(this, &Room::addedMessages, this,
[this, promise = std::move(p), eventId,
deadline = QDeadlineTimer(maxWaitTime, Qt::VeryCoarseTimer)](int fromIndex,
int toIndex) mutable {
using namespace std::ranges;
if (toIndex < 0) { // If it's >=0 then it's from sync, not interesting
const auto rangeToCheck =
subrange(findInTimeline(fromIndex), findInTimeline(toIndex) + 1);
if (auto it = find(rangeToCheck, eventId, &RoomEvent::id);
it != rangeToCheck.cend()) {
promise.addResult(it->event());
promise.finish();
return true;
}
}
if (deadline.hasExpired()) {
qCWarning(MESSAGES)
<< "Timeout - giving up on obtaining event" << eventId;
promise.future().cancel();
return true;
}
if (toIndex < 0) {
static constexpr auto EventsProgression =
std::array{ 50, 100, 200, 500, 1000 };
static_assert(is_sorted(EventsProgression));
const auto thisMany =
d->lastRequestedHistorySize >= EventsProgression.back()
? EventsProgression.back()
: *upper_bound(EventsProgression, d->lastRequestedHistorySize);
qCDebug(MESSAGES)
<< "Requesting" << thisMany << "events to ensure event" << eventId;
getPreviousContent(thisMany);
}
return false;
});
return future;
return d->eventPromises
.emplace_back(eventId, EventPromise{ {}, { maxWaitTime, Qt::VeryCoarseTimer } })
.second.future();
}

QStringList Room::requestedEvents() const
{
auto requestedIdsView = std::views::keys(d->eventPromises);
QStringList evtIds(begin(requestedIdsView), end(requestedIdsView));
return evtIds; // TODO: use std::ranges::to when it's available from all toolchains
}

void Room::Private::checkForRequestedEvents(const rev_iter_t& from, bool allHistoryLoaded)
{
using namespace std::ranges;
std::erase_if(eventPromises, [this, from](auto& idAndPromise) {
auto&& [eventId, promise] = idAndPromise;
if (auto it = find(from, historyEdge(), eventId, &RoomEvent::id); it != historyEdge()) {
promise.addResult(it->event());
promise.finish();
return true;
}
if (promise.deadline.hasExpired()) {
qCWarning(MESSAGES) << "Timeout - giving up on obtaining event" << eventId;
promise.future().cancel();
return true;
}
return false;
});
if (!eventPromises.empty()) {
if (allHistoryLoaded) {
for_each(views::values(eventPromises), [](auto& p) { p.future().cancel(); });
eventPromises.clear();
}
static constexpr auto EventsProgression = std::array{ 50, 100, 200, 500, 1000 };
static_assert(is_sorted(EventsProgression));
const auto thisMany = lastRequestedHistorySize >= EventsProgression.back()
? EventsProgression.back()
: *upper_bound(EventsProgression, lastRequestedHistorySize);
qCDebug(MESSAGES) << "Requesting" << thisMany << "events, looking for" <<
#if defined(__cpp_lib_ranges_join_with) && defined(__cpp_lib_ranges_to_container)
(views::keys(eventPromises) | views::join_with(u","_s) | to<QString>());
#else
q->requestedEvents().join(u',');
#endif
getPreviousContent(thisMany);
}
}

JobHandle<GetRoomEventsJob> Room::Private::getPreviousContent(int limit, const QString& filter)
Expand All @@ -2387,7 +2404,7 @@ JobHandle<GetRoomEventsJob> Room::Private::getPreviousContent(int limit, const Q
eventsHistoryJob =
connection->callApi<GetRoomEventsJob>(id, "b"_L1, *prevBatch, QString(), limit, filter);
emit q->eventsHistoryJobChanged();
connect(eventsHistoryJob, &BaseJob::success, q, [this] {
eventsHistoryJob.then([this] {
if (const auto newPrevBatch = eventsHistoryJob->end();
!newPrevBatch.isEmpty() && *prevBatch != newPrevBatch) //
{
Expand All @@ -2406,9 +2423,9 @@ JobHandle<GetRoomEventsJob> Room::Private::getPreviousContent(int limit, const Q
changes |= updateStats(from, historyEdge());
if (changes > 0)
postprocessChanges(changes);
checkForRequestedEvents(from, !prevBatch);
});
connect(eventsHistoryJob, &QObject::destroyed, q,
&Room::eventsHistoryJobChanged);
connect(eventsHistoryJob, &QObject::destroyed, q, &Room::eventsHistoryJobChanged);
return eventsHistoryJob;
}

Expand Down
2 changes: 2 additions & 0 deletions Quotient/room.h
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,8 @@ class QUOTIENT_API Room : public QObject {
Q_INVOKABLE QFuture<const RoomEvent*> ensureEvent(
const QString& eventId, std::chrono::seconds maxWaitTime = std::chrono::seconds(60));

Q_INVOKABLE QStringList requestedEvents() const;

public Q_SLOTS:
/** Check whether the room should be upgraded */
void checkVersion();
Expand Down

0 comments on commit 6c7978d

Please sign in to comment.