Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: corruption in replication stream #3344

Merged
merged 1 commit into from
Jul 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 34 additions & 36 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ iovec IoVec(io::Bytes src) {
return iovec{const_cast<uint8_t*>(src.data()), src.size()};
}

constexpr size_t kFlushThreshold = 2_KB;
uint32_t replication_stream_output_limit_cached = 64_KB;

} // namespace
Expand Down Expand Up @@ -90,44 +89,42 @@ void JournalStreamer::Write(std::string_view str) {
DCHECK(!str.empty());
DVLOG(2) << "Writing " << str.size() << " bytes";

// If we do not have any in flight requests we send the string right a way.
// We can not aggregate it since we do not know when the next update will follow.
size_t total_pending = pending_buf_.size() + str.size();
if (in_flight_bytes_ == 0 || total_pending > kFlushThreshold) {
// because of potential SOO with strings we allocate explicitly on heap
uint8_t* buf(new uint8_t[str.size()]);

// TODO: it is possible to remove these redundant copies if we adjust high level
// interfaces to pass reference-counted buffers.
memcpy(buf, str.data(), str.size());
in_flight_bytes_ += total_pending;
if (in_flight_bytes_ > 0) {
// We can not flush data while there are in flight requests because AsyncWrite
// is not atomic. Therefore, we just aggregate.
size_t tail = pending_buf_.size();
pending_buf_.resize(pending_buf_.size() + str.size());
memcpy(pending_buf_.data() + tail, str.data(), str.size());
return;
}

iovec v[2];
unsigned next_buf_id = 0;
// If we do not have any in flight requests we send the string right a way.
// We can not aggregate it since we do not know when the next update will follow.
// because of potential SOO with strings, we allocate explicitly on heap.
uint8_t* buf(new uint8_t[str.size()]);

if (!pending_buf_.empty()) {
v[0] = IoVec(pending_buf_);
++next_buf_id;
}
v[next_buf_id++] = IoVec(io::Bytes(buf, str.size()));
// TODO: it is possible to remove these redundant copies if we adjust high level
// interfaces to pass reference-counted buffers.
memcpy(buf, str.data(), str.size());
in_flight_bytes_ += total_pending;
total_sent_ += total_pending;

dest_->AsyncWrite(
v, next_buf_id,
[buf0 = std::move(pending_buf_), buf, this, len = total_pending](std::error_code ec) {
delete[] buf;
OnCompletion(ec, len);
});
iovec v[2];
unsigned next_buf_id = 0;

return;
if (!pending_buf_.empty()) {
v[0] = IoVec(pending_buf_);
++next_buf_id;
}
v[next_buf_id++] = IoVec(io::Bytes(buf, str.size()));

DCHECK_GT(in_flight_bytes_, 0u);
DCHECK_LE(pending_buf_.size() + str.size(), kFlushThreshold);

// Aggregate
size_t tail = pending_buf_.size();
pending_buf_.resize(pending_buf_.size() + str.size());
memcpy(pending_buf_.data() + tail, str.data(), str.size());
dest_->AsyncWrite(
v, next_buf_id,
[buf0 = std::move(pending_buf_), buf, this, len = total_pending](std::error_code ec) {
delete[] buf;
OnCompletion(ec, len);
});
}

void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
Expand Down Expand Up @@ -160,13 +157,14 @@ void JournalStreamer::ThrottleIfNeeded() {

auto next = chrono::steady_clock::now() +
chrono::milliseconds(absl::GetFlag(FLAGS_replication_stream_timeout));
auto inflight_start = in_flight_bytes_;
size_t inflight_start = in_flight_bytes_;
size_t sent_start = total_sent_;

std::cv_status status =
waker_.await_until([this]() { return !IsStalled() || IsStopped(); }, next);
if (status == std::cv_status::timeout) {
LOG(WARNING) << "Stream timed out, inflight bytes start: " << inflight_start
<< ", end: " << in_flight_bytes_;
LOG(WARNING) << "Stream timed out, inflight bytes/sent start: " << inflight_start << "/"
<< sent_start << ", end: " << in_flight_bytes_ << "/" << total_sent_;
cntx_->ReportError(make_error_code(errc::stream_timeout));
}
}
Expand All @@ -182,7 +180,7 @@ void JournalStreamer::WaitForInflightToComplete() {
}

bool JournalStreamer::IsStalled() const {
return in_flight_bytes_ >= replication_stream_output_limit_cached;
return in_flight_bytes_ + pending_buf_.size() >= replication_stream_output_limit_cached;
}

RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal,
Expand Down
3 changes: 2 additions & 1 deletion src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class JournalStreamer {

journal::Journal* journal_;
std::vector<uint8_t> pending_buf_;
size_t in_flight_bytes_ = 0;
size_t in_flight_bytes_ = 0, total_sent_ = 0;

time_t last_lsn_time_ = 0;
util::fb2::EventCount waker_;
uint32_t journal_cb_id_{0};
Expand Down
Loading