Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): master stop sending exec opcode to replica #3289

Merged
merged 2 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/server/journal/journal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ TEST(Journal, WriteRead) {
{2, Op::COMMAND, 1, 1, nullopt, Payload("LPUSH", list("l", "v1", "v2"))},
{3, Op::COMMAND, 0, 1, nullopt, Payload("MSET", slice("D", "4"))},
{4, Op::COMMAND, 1, 1, nullopt, Payload("DEL", list("l1"))},
{5, Op::COMMAND, 2, 1, nullopt, Payload("DEL", list("E", "2"))},
{6, Op::MULTI_COMMAND, 2, 1, nullopt, Payload("SET", list("E", "2"))},
{6, Op::EXEC, 2, 1, nullopt}};
{5, Op::COMMAND, 2, 1, nullopt, Payload("DEL", list("E", "2"))}};

// Write all entries to a buffer.
base::IoBuf buf;
Expand Down
2 changes: 0 additions & 2 deletions src/server/journal/serializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ void JournalWriter::Write(const journal::Entry& entry) {
return;
case journal::Op::COMMAND:
case journal::Op::EXPIRED:
case journal::Op::MULTI_COMMAND:
case journal::Op::EXEC:
Write(entry.txid);
Write(entry.shard_cnt);
Write(entry.payload);
Expand Down
1 change: 0 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1993,7 +1993,6 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
cntx->transaction = stub_tx.get();

result = interpreter->RunFunction(eval_args.sha, &error);
cntx->transaction->FIX_ConcludeJournalExec(); // flush journal

cntx->transaction = tx;
return OpStatus::OK;
Expand Down
5 changes: 0 additions & 5 deletions src/server/multi_command_squasher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm

auto& sinfo = PrepareShardInfo(last_sid, slot_checker.GetUniqueSlotId());

sinfo.had_writes |= cmd->Cid()->IsWriteOnly();
sinfo.cmds.push_back(cmd);
order_.push_back(last_sid);

Expand Down Expand Up @@ -280,10 +279,6 @@ void MultiCommandSquasher::Run() {
// Set last txid.
cntx_->last_command_debug.clock = cntx_->transaction->txid();

if (!sharded_.empty())
cntx_->transaction->ReportWritesSquashedMulti(
[this](ShardId sid) { return sharded_[sid].had_writes; });

// UnlockMulti is a no-op for non-atomic multi transactions,
// still called for correctness and future changes
if (!IsAtomic()) {
Expand Down
3 changes: 1 addition & 2 deletions src/server/multi_command_squasher.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ class MultiCommandSquasher {
private:
// Per-shard execution info.
struct ShardExecInfo {
ShardExecInfo() : had_writes{false}, cmds{}, replies{}, local_tx{nullptr} {
ShardExecInfo() : cmds{}, replies{}, local_tx{nullptr} {
}

bool had_writes;
std::vector<StoredCmd*> cmds; // accumulated commands
std::vector<facade::CapturingReplyBuilder::Payload> replies;
boost::intrusive_ptr<Transaction> local_tx; // stub-mode tx for use inside shard
Expand Down
5 changes: 0 additions & 5 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,6 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
// no database switch can be performed between those two calls, because they are part of one
// transaction.
void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) {
// We ignore EXEC entries because we they have no meaning during
// the LOAD phase on replica.
if (item.opcode == journal::Op::EXEC)
return;

// To enable journal flushing to sync after non auto journal command is executed we call
// TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no
// additional journal change to serialize, it simply invokes PushSerializedToChannel.
Expand Down
79 changes: 4 additions & 75 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ Transaction::Transaction(const CommandId* cid) : cid_{cid} {
string_view cmd_name(cid_->name());
if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") {
multi_.reset(new MultiData);
multi_->shard_journal_write.resize(shard_set->size(), false);

multi_->mode = NOT_DETERMINED;
multi_->role = DEFAULT;
}
Expand All @@ -153,7 +151,6 @@ Transaction::Transaction(const Transaction* parent, ShardId shard_id,
}

multi_->role = SQUASHED_STUB;
multi_->shard_journal_write.resize(1);

MultiUpdateWithParent(parent);
if (slot_id.has_value()) {
Expand Down Expand Up @@ -597,11 +594,6 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
// This is the last hop, so clear cont_trans if its held by the current tx
shard->RemoveContTx(this);

if (IsAtomicMulti()) { // Can only be true if run through ScheduleSingleHop
DCHECK(cid_->IsMultiTransactional());
MultiReportJournalOnShard(shard);
}

// It has 2 responsibilities.
// 1: to go over potential wakened keys, verify them and activate watch queues.
// 2: if this transaction was notified and finished running - to remove it from the head
Expand Down Expand Up @@ -758,15 +750,6 @@ void Transaction::ScheduleInternal() {
}
}

void Transaction::ReportWritesSquashedMulti(absl::FunctionRef<bool(ShardId)> had_write) {
DCHECK(multi_);
for (unsigned i = 0; i < multi_->shard_journal_write.size(); i++)
multi_->shard_journal_write[i] |= had_write(i);

// Update imemdiately if we decide to conclude after one hop without UnlockMulti
multi_->shard_journal_cnt = CalcMultiNumOfShardJournals();
}

// Runs in the coordinator fiber.
void Transaction::UnlockMulti() {
VLOG(1) << "UnlockMulti " << DebugId();
Expand All @@ -782,8 +765,6 @@ void Transaction::UnlockMulti() {
sharded_keys[sid].emplace_back(fp);
}

multi_->shard_journal_cnt = ServerState::tlocal()->journal() ? CalcMultiNumOfShardJournals() : 0;

use_count_.fetch_add(shard_data_.size(), std::memory_order_relaxed);

DCHECK_EQ(shard_data_.size(), shard_set->size());
Expand All @@ -798,16 +779,6 @@ void Transaction::UnlockMulti() {
VLOG(1) << "UnlockMultiEnd " << DebugId();
}

uint32_t Transaction::CalcMultiNumOfShardJournals() const {
uint32_t shard_journals_cnt = 0;
for (bool was_shard_write : multi_->shard_journal_write) {
if (was_shard_write) {
++shard_journals_cnt;
}
}
return shard_journals_cnt;
}

OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
Execute(cb, true);
return local_result_;
Expand Down Expand Up @@ -919,14 +890,6 @@ const absl::flat_hash_set<std::pair<ShardId, LockFp>>& Transaction::GetMultiFps(
return multi_->tag_fps;
}

void Transaction::FIX_ConcludeJournalExec() {
if (!multi_->shard_journal_write.front())
return;

multi_->shard_journal_cnt = 1;
MultiReportJournalOnShard(EngineShard::tlocal());
}

string Transaction::DEBUG_PrintFailState(ShardId sid) const {
auto res = StrCat(
"usc: ", unique_shard_cnt_, ", name:", GetCId()->name(),
Expand Down Expand Up @@ -1262,21 +1225,9 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
return result;
}

void Transaction::MultiReportJournalOnShard(EngineShard* shard) const {
DCHECK_EQ(EngineShard::tlocal(), shard);
auto* journal = shard->journal();
size_t write_idx = multi_->role == SQUASHED_STUB ? 0 : shard->shard_id();
if (journal != nullptr && multi_->shard_journal_write[write_idx]) {
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, multi_->shard_journal_cnt,
unique_slot_checker_.GetUniqueSlotId(), {}, true);
}
}

void Transaction::UnlockMultiShardCb(absl::Span<const LockFp> fps, EngineShard* shard) {
DCHECK(multi_ && multi_->lock_mode);

MultiReportJournalOnShard(shard);

if (multi_->mode == GLOBAL) {
shard->shard_lock()->Release(IntentLock::EXCLUSIVE);
} else {
Expand Down Expand Up @@ -1402,37 +1353,15 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul
}
// Record to journal autojournal commands, here we allow await which anables writing to sync
// the journal change.
LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false, true);
LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, true);
}

void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload,
uint32_t shard_cnt, bool multi_commands,
bool allow_await) const {
auto journal = shard->journal();
CHECK(journal);

if (multi_) {
if (multi_->role != SQUASHED_STUB)
multi_->shard_journal_write[shard->shard_id()] = true;
else
multi_->shard_journal_write[0] = true;
}

bool is_multi = multi_commands || IsAtomicMulti();

auto opcode = is_multi ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND;
journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, unique_slot_checker_.GetUniqueSlotId(),
std::move(payload), allow_await);
}

void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const {
if (multi_) {
return;
}
uint32_t shard_cnt, bool allow_await) const {
auto journal = shard->journal();
CHECK(journal);
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt,
unique_slot_checker_.GetUniqueSlotId(), {}, false);
journal->RecordEntry(txid_, journal::Op::COMMAND, db_index_, shard_cnt,
unique_slot_checker_.GetUniqueSlotId(), std::move(payload), allow_await);
}

void Transaction::ReviveAutoJournal() {
Expand Down
26 changes: 2 additions & 24 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,6 @@ class Transaction {
// Start multi in NON_ATOMIC mode.
void StartMultiNonAtomic();

// Report which shards had write commands that executed on stub transactions
// and thus did not mark itself in MultiData::shard_journal_write.
void ReportWritesSquashedMulti(absl::FunctionRef<bool(ShardId)> had_write);

// Unlock key locks of a multi transaction.
void UnlockMulti();

Expand Down Expand Up @@ -325,14 +321,9 @@ class Transaction {
// to it must not block.
void PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdArgList args);

// Write a journal entry to a shard journal with the given payload. When logging a non-automatic
// journal command, multiple journal entries may be necessary. In this case, call with set
// multi_commands to true and call the FinishLogJournalOnShard function after logging the final
// entry.
// Write a journal entry to a shard journal with the given payload.
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, uint32_t shard_cnt,
bool multi_commands, bool allow_await) const;

void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const;
bool allow_await) const;

// Re-enable auto journal for commands marked as NO_AUTOJOURNAL. Call during setup.
void ReviveAutoJournal();
Expand All @@ -343,9 +334,6 @@ class Transaction {
// Get keys multi transaction was initialized with, normalized and unique
const absl::flat_hash_set<std::pair<ShardId, LockFp>>& GetMultiFps() const;

// Send journal EXEC opcode after a series of MULTI commands on the currently active shard
void FIX_ConcludeJournalExec();

// Print in-dept failure state for debugging.
std::string DEBUG_PrintFailState(ShardId sid) const;

Expand Down Expand Up @@ -442,13 +430,6 @@ class Transaction {
bool concluding = false;

unsigned cmd_seq_num = 0; // used for debugging purposes.
unsigned shard_journal_cnt;

// The shard_journal_write vector variable is used to determine the number of shards
// involved in a multi-command transaction. This information is utilized by replicas when
// executing multi-command. For every write to a shard journal, the corresponding index in the
// vector is marked as true.
absl::InlinedVector<bool, 4> shard_journal_write;
};

enum CoordinatorState : uint8_t {
Expand Down Expand Up @@ -543,9 +524,6 @@ class Transaction {
// Set time_now_ms_
void InitTxTime();

// If journaling is enabled, report final exec opcode to finish the chain of commands.
void MultiReportJournalOnShard(EngineShard* shard) const;

void UnlockMultiShardCb(absl::Span<const LockFp> fps, EngineShard* shard);

// In a multi-command transaction, we determine the number of shard journals that we wrote entries
Expand Down
14 changes: 4 additions & 10 deletions src/server/tx_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,15 @@ size_t ShardArgs::Size() const {
}

void RecordJournal(const OpArgs& op_args, string_view cmd, const ShardArgs& args,
uint32_t shard_cnt, bool multi_commands) {
uint32_t shard_cnt) {
VLOG(2) << "Logging command " << cmd << " from txn " << op_args.tx->txid();
op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, multi_commands,
false);
op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, false);
}

void RecordJournal(const OpArgs& op_args, std::string_view cmd, facade::ArgSlice args,
uint32_t shard_cnt, bool multi_commands) {
uint32_t shard_cnt) {
VLOG(2) << "Logging command " << cmd << " from txn " << op_args.tx->txid();
op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, multi_commands,
false);
}

void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt) {
op_args.tx->FinishLogJournalOnShard(op_args.shard, shard_cnt);
op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, false);
}

void RecordExpiry(DbIndex dbid, string_view key) {
Expand Down
7 changes: 2 additions & 5 deletions src/server/tx_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,9 @@ class ShardArgs {

// Record non auto journal command with own txid and dbid.
void RecordJournal(const OpArgs& op_args, std::string_view cmd, const ShardArgs& args,
uint32_t shard_cnt = 1, bool multi_commands = false);
uint32_t shard_cnt = 1);
void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args,
uint32_t shard_cnt = 1, bool multi_commands = false);

// Record non auto journal command finish. Call only when command translates to multi commands.
void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt);
uint32_t shard_cnt = 1);

// Record expiry in journal with independent transaction. Must be called from shard thread holding
// key.
Expand Down
Loading