Skip to content

Commit

Permalink
chore: Add coordinated omission mode (#3332)
Browse files Browse the repository at this point in the history
* chore: Add coordinated omission mode

* chore: implement sequential mode in dfly_bench
  • Loading branch information
romange authored Jul 18, 2024
1 parent b9f8671 commit c670ffd
Showing 1 changed file with 95 additions and 62 deletions.
157 changes: 95 additions & 62 deletions src/server/dfly_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@ constexpr string_view kKeyPlaceholder = "__key__"sv;

thread_local base::Xoroshiro128p bit_gen;

#if __INTELLISENSE__
#pragma diag_suppress 144
#endif

enum Protocol { RESP, MC_TEXT } protocol;
enum DistType { UNIFORM, NORMAL, ZIPFIAN, SEQUENTIAL } dist_type{UNIFORM};

class KeyGenerator {
public:
Expand All @@ -69,7 +74,6 @@ class KeyGenerator {
uint64_t min_, max_, range_;
double stddev_ = 1.0 / 6;
optional<base::ZipfianGenerator> zipf_;
enum DistType { UNIFORM, NORMAL, ZIPFIAN } dist_type_;
};

class CommandGenerator {
Expand Down Expand Up @@ -162,30 +166,33 @@ struct ClientStats {
// Per connection driver.
class Driver {
public:
explicit Driver(ProactorBase* p = nullptr) {
if (p)
socket_.reset(p->CreateSocket());
explicit Driver(uint32_t num_reqs, ClientStats* stats, ProactorBase* p)
: num_reqs_(num_reqs), stats_(*stats) {
socket_.reset(p->CreateSocket());
}

Driver(const Driver&) = delete;
Driver(Driver&&) = default;
Driver& operator=(Driver&&) = default;

void Connect(unsigned index, const tcp::endpoint& ep);
void Run(uint32_t num_reqs, uint64_t cycle_ns, ClientStats* stats);
void Run(uint64_t cycle_ns);

private:
void PopRequest(ClientStats* dest);
void ReceiveFb(ClientStats* dest);
void ParseRESP(facade::RedisParser* parser, io::IoBuf* io_buf, ClientStats* dest);
void PopRequest();
void ReceiveFb();
void ParseRESP(facade::RedisParser* parser, io::IoBuf* io_buf);

struct Req {
uint64_t start;
bool might_hit;
};

uint32_t num_reqs_;
ClientStats& stats_;
unique_ptr<FiberSocketBase> socket_;
queue<Req> reqs_;
fb2::CondVarAny cnd_;
};

// Per thread client.
Expand All @@ -194,7 +201,7 @@ class TLocalClient {
explicit TLocalClient(ProactorBase* p) : p_(p) {
drivers_.resize(GetFlag(FLAGS_c));
for (auto& driver : drivers_) {
driver = Driver{p_};
driver.reset(new Driver{GetFlag(FLAGS_n), &stats, p_});
}
}

Expand All @@ -207,34 +214,32 @@ class TLocalClient {

private:
ProactorBase* p_;
vector<Driver> drivers_;
vector<unique_ptr<Driver>> drivers_;
};

KeyGenerator::KeyGenerator(uint32_t min, uint32_t max)
: min_(min), max_(max), range_(max - min + 1) {
prefix_ = GetFlag(FLAGS_key_prefix);
string dist = GetFlag(FLAGS_key_dist);
CHECK_GT(range_, 0u);

if (dist == "U") {
dist_type_ = UNIFORM;
} else if (dist == "N") {
dist_type_ = NORMAL;
uint64_t stddev = GetFlag(FLAGS_key_stddev);
if (stddev != 0) {
stddev_ = double(stddev) / double(range_);
switch (dist_type) {
case NORMAL: {
uint64_t stddev = GetFlag(FLAGS_key_stddev);
if (stddev != 0) {
stddev_ = double(stddev) / double(range_);
}
break;
}
} else if (dist == "Z") {
dist_type_ = ZIPFIAN;
zipf_.emplace(min, max, GetFlag(FLAGS_zipf_alpha));
} else {
LOG(FATAL) << "Unknown distribution type: " << dist;
case ZIPFIAN:
zipf_.emplace(min, max, GetFlag(FLAGS_zipf_alpha));
break;
default:;
}
}

string KeyGenerator::operator()() {
uint64_t key_suffix{0};
switch (dist_type_) {
switch (dist_type) {
case UNIFORM:
key_suffix = absl::Uniform(bit_gen, min_, max_);
break;
Expand All @@ -246,6 +251,9 @@ string KeyGenerator::operator()() {
case ZIPFIAN:
key_suffix = zipf_->Next(bit_gen);
break;
case SEQUENTIAL:
key_suffix = min_++;
break;
}

return absl::StrCat(prefix_, key_suffix);
Expand All @@ -257,8 +265,8 @@ void Driver::Connect(unsigned index, const tcp::endpoint& ep) {
CHECK(!ec) << "Could not connect to " << ep << " " << ec;
}

void Driver::Run(uint32_t num_reqs, uint64_t cycle_ns, ClientStats* dest) {
auto receive_fb = MakeFiber([this, dest] { ReceiveFb(dest); });
void Driver::Run(uint64_t cycle_ns) {
auto receive_fb = MakeFiber([this] { ReceiveFb(); });

int64_t next_invocation = absl::GetCurrentTimeNanos();

Expand All @@ -269,18 +277,24 @@ void Driver::Run(uint32_t num_reqs, uint64_t cycle_ns, ClientStats* dest) {

KeyGenerator key_gen(key_minimum, key_maximum);
CommandGenerator cmd_gen(&key_gen);
for (unsigned i = 0; i < num_reqs; ++i) {
for (unsigned i = 0; i < num_reqs_; ++i) {
int64_t now = absl::GetCurrentTimeNanos();

int64_t sleep_ns = next_invocation - now;
if (sleep_ns > 0) {
VLOG(5) << "Sleeping for " << sleep_ns << "ns";
ThisFiber::SleepFor(chrono::nanoseconds(sleep_ns));
if (cycle_ns) {
int64_t sleep_ns = next_invocation - now;
if (sleep_ns > 0) {
VLOG(5) << "Sleeping for " << sleep_ns << "ns";
ThisFiber::SleepFor(chrono::nanoseconds(sleep_ns));
} else {
VLOG(5) << "Behind QPS schedule";
}
next_invocation += cycle_ns;
} else {
VLOG(5) << "Behind QPS schedule";
}
next_invocation += cycle_ns;
// Coordinated omission.

fb2::NoOpLock lk;
cnd_.wait(lk, [this] { return reqs_.empty(); });
}
string cmd = cmd_gen();

Req req;
Expand All @@ -299,7 +313,7 @@ void Driver::Run(uint32_t num_reqs, uint64_t cycle_ns, ClientStats* dest) {
}

const absl::Time finish = absl::Now();
VLOG(1) << "Done queuing " << num_reqs << " requests, which took " << finish - start
VLOG(1) << "Done queuing " << num_reqs_ << " requests, which took " << finish - start
<< ". Waiting for server processing";

// TODO: to change to a condvar or something.
Expand All @@ -323,17 +337,20 @@ static string_view FindLine(io::Bytes buf) {
return {};
};

void Driver::PopRequest(ClientStats* stats) {
void Driver::PopRequest() {
uint64_t now = absl::GetCurrentTimeNanos();
uint64_t usec = (now - reqs_.front().start) / 1000;
stats->hist.Add(usec);
stats->hit_opportunities += reqs_.front().might_hit;
stats_.hist.Add(usec);
stats_.hit_opportunities += reqs_.front().might_hit;

reqs_.pop();
++stats->num_responses;
if (reqs_.empty()) {
cnd_.notify_one();
}
++stats_.num_responses;
}

void Driver::ReceiveFb(ClientStats* stats) {
void Driver::ReceiveFb() {
facade::RedisParser parser{1 << 16, false};
io::IoBuf io_buf{512};

Expand All @@ -352,7 +369,7 @@ void Driver::ReceiveFb(ClientStats* stats) {
io_buf.CommitWrite(*recv_sz);

if (protocol == RESP) {
ParseRESP(&parser, &io_buf, stats);
ParseRESP(&parser, &io_buf);
} else {
// MC_TEXT
while (true) {
Expand All @@ -361,7 +378,7 @@ void Driver::ReceiveFb(ClientStats* stats) {
break;
CHECK_EQ(line.back(), '\n');
if (line == "STORED\r\n" || line == "END\r\n") {
PopRequest(stats);
PopRequest();
blob_len = 0;
} else if (absl::StartsWith(line, "VALUE")) {
// last token is a blob length.
Expand All @@ -374,10 +391,10 @@ void Driver::ReceiveFb(ClientStats* stats) {
LOG(ERROR) << "Invalid blob len " << line;
return;
}
++stats->hit_count;
++stats_.hit_count;
} else if (absl::StartsWith(line, "SERVER_ERROR")) {
++stats->num_errors;
PopRequest(stats);
++stats_.num_errors;
PopRequest();
blob_len = 0;
} else {
auto handle = socket_->native_handle();
Expand All @@ -392,7 +409,7 @@ void Driver::ReceiveFb(ClientStats* stats) {
VLOG(1) << "ReceiveFb done";
}

void Driver::ParseRESP(facade::RedisParser* parser, io::IoBuf* io_buf, ClientStats* stats) {
void Driver::ParseRESP(facade::RedisParser* parser, io::IoBuf* io_buf) {
uint32_t consumed = 0;
RedisParser::Result result = RedisParser::OK;
RespVec parse_args;
Expand All @@ -401,10 +418,10 @@ void Driver::ParseRESP(facade::RedisParser* parser, io::IoBuf* io_buf, ClientSta
result = parser->Parse(io_buf->InputBuffer(), &consumed, &parse_args);
if (result == RedisParser::OK && !parse_args.empty()) {
if (reqs_.front().might_hit && parse_args[0].type != facade::RespExpr::NIL) {
++stats->hit_count;
++stats_.hit_count;
}
parse_args.clear();
PopRequest(stats);
PopRequest();
}
io_buf->ConsumeInput(consumed);
} while (result == RedisParser::OK);
Expand All @@ -417,7 +434,7 @@ void TLocalClient::Connect(tcp::endpoint ep) {
for (size_t i = 0; i < fbs.size(); ++i) {
fbs[i] = MakeFiber([&, i] {
ThisFiber::SetName(absl::StrCat("connect/", i));
drivers_[i].Connect(i, ep);
drivers_[i]->Connect(i, ep);
});
}

Expand All @@ -427,11 +444,8 @@ void TLocalClient::Connect(tcp::endpoint ep) {

void TLocalClient::Run(uint64_t cycle_ns) {
vector<fb2::Fiber> fbs(drivers_.size());
uint32_t num_reqs = GetFlag(FLAGS_n);

for (size_t i = 0; i < fbs.size(); ++i) {
fbs[i] = fb2::Fiber(absl::StrCat("run/", i),
[&, i] { drivers_[i].Run(num_reqs, cycle_ns, &stats); });
fbs[i] = fb2::Fiber(absl::StrCat("run/", i), [&, i] { drivers_[i]->Run(cycle_ns); });
}

for (auto& fb : fbs)
Expand Down Expand Up @@ -491,6 +505,20 @@ int main(int argc, char* argv[]) {
protocol = RESP;
}

string dist = GetFlag(FLAGS_key_dist);

if (dist == "U") {
dist_type = UNIFORM;
} else if (dist == "N") {
dist_type = NORMAL;
} else if (dist == "Z") {
dist_type = ZIPFIAN;
} else if (dist == "S") {
dist_type = SEQUENTIAL;
} else {
LOG(FATAL) << "Unknown distribution type: " << dist;
}

auto* proactor = pp->GetNextProactor();
char ip_addr[128];

Expand All @@ -512,14 +540,18 @@ int main(int argc, char* argv[]) {
});

const uint32_t qps = GetFlag(FLAGS_qps);
const int64_t interval = 1000000000LL / qps;
const int64_t interval = qps ? 1000000000LL / qps : 0;
uint64_t num_reqs = GetFlag(FLAGS_n);

CONSOLE_INFO << "Running all threads, sending " << num_reqs << " requests at a rate of "
<< GetFlag(FLAGS_qps) << " rps per connection, i.e. request every "
<< interval / 1000 << "us";
CONSOLE_INFO << "Overall scheduled RPS: " << qps * pp->size() * GetFlag(FLAGS_c);

CONSOLE_INFO << "Running " << pp->size() << " threads, sending " << num_reqs
<< " requests per each connection";
if (interval) {
CONSOLE_INFO << "At a rate of " << GetFlag(FLAGS_qps)
<< " rps per connection, i.e. request every " << interval / 1000 << "us";
CONSOLE_INFO << "Overall scheduled RPS: " << qps * pp->size() * GetFlag(FLAGS_c);
} else {
CONSOLE_INFO << "Coordinated omission mode - the rate is determined by the server";
}
const absl::Time start_time = absl::Now();
atomic_bool finish{false};
auto watch_fb =
Expand All @@ -530,13 +562,11 @@ int main(int argc, char* argv[]) {
finish.store(true);
watch_fb.Join();

CONSOLE_INFO << "\nFinished. Total time: " << duration;

fb2::Mutex mutex;
base::Histogram hist;

LOG(INFO) << "Resetting all threads";
uint64_t hit_opportunities = 0, hit_count = 0, num_errors = 0;
uint64_t hit_opportunities = 0, hit_count = 0, num_errors = 0, num_responses = 0;

pp->AwaitFiberOnAll([&](auto* p) {
unique_lock lk(mutex);
Expand All @@ -545,10 +575,13 @@ int main(int argc, char* argv[]) {
hit_opportunities += client->stats.hit_opportunities;
hit_count += client->stats.hit_count;
num_errors += client->stats.num_errors;
num_responses += client->stats.num_responses;
lk.unlock();
client.reset();
});

CONSOLE_INFO << "\nTotal time: " << duration << ". Overall number of requests: " << num_responses;

if (num_errors) {
CONSOLE_INFO << "Got " << num_errors << " error responses!";
}
Expand Down

0 comments on commit c670ffd

Please sign in to comment.