Skip to content

Commit

Permalink
feat(server): master stop sending exec opcode to replica (#3289)
Browse files Browse the repository at this point in the history
* feat server: master stop sending exec opcode to replica

Signed-off-by: adi_holden <[email protected]>
  • Loading branch information
adiholden authored Jul 9, 2024
1 parent 5c7c21b commit ccada87
Show file tree
Hide file tree
Showing 10 changed files with 14 additions and 132 deletions.
4 changes: 1 addition & 3 deletions src/server/journal/journal_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,7 @@ TEST(Journal, WriteRead) {
{2, Op::COMMAND, 1, 1, nullopt, Payload("LPUSH", list("l", "v1", "v2"))},
{3, Op::COMMAND, 0, 1, nullopt, Payload("MSET", slice("D", "4"))},
{4, Op::COMMAND, 1, 1, nullopt, Payload("DEL", list("l1"))},
{5, Op::COMMAND, 2, 1, nullopt, Payload("DEL", list("E", "2"))},
{6, Op::MULTI_COMMAND, 2, 1, nullopt, Payload("SET", list("E", "2"))},
{6, Op::EXEC, 2, 1, nullopt}};
{5, Op::COMMAND, 2, 1, nullopt, Payload("DEL", list("E", "2"))}};

// Write all entries to a buffer.
base::IoBuf buf;
Expand Down
2 changes: 0 additions & 2 deletions src/server/journal/serializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ void JournalWriter::Write(const journal::Entry& entry) {
return;
case journal::Op::COMMAND:
case journal::Op::EXPIRED:
case journal::Op::MULTI_COMMAND:
case journal::Op::EXEC:
Write(entry.txid);
Write(entry.shard_cnt);
Write(entry.payload);
Expand Down
1 change: 0 additions & 1 deletion src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1993,7 +1993,6 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
cntx->transaction = stub_tx.get();

result = interpreter->RunFunction(eval_args.sha, &error);
cntx->transaction->FIX_ConcludeJournalExec(); // flush journal

cntx->transaction = tx;
return OpStatus::OK;
Expand Down
5 changes: 0 additions & 5 deletions src/server/multi_command_squasher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm

auto& sinfo = PrepareShardInfo(last_sid, slot_checker.GetUniqueSlotId());

sinfo.had_writes |= cmd->Cid()->IsWriteOnly();
sinfo.cmds.push_back(cmd);
order_.push_back(last_sid);

Expand Down Expand Up @@ -280,10 +279,6 @@ void MultiCommandSquasher::Run() {
// Set last txid.
cntx_->last_command_debug.clock = cntx_->transaction->txid();

if (!sharded_.empty())
cntx_->transaction->ReportWritesSquashedMulti(
[this](ShardId sid) { return sharded_[sid].had_writes; });

// UnlockMulti is a no-op for non-atomic multi transactions,
// still called for correctness and future changes
if (!IsAtomic()) {
Expand Down
3 changes: 1 addition & 2 deletions src/server/multi_command_squasher.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@ class MultiCommandSquasher {
private:
// Per-shard execution info.
struct ShardExecInfo {
ShardExecInfo() : had_writes{false}, cmds{}, replies{}, local_tx{nullptr} {
ShardExecInfo() : cmds{}, replies{}, local_tx{nullptr} {
}

bool had_writes;
std::vector<StoredCmd*> cmds; // accumulated commands
std::vector<facade::CapturingReplyBuilder::Payload> replies;
boost::intrusive_ptr<Transaction> local_tx; // stub-mode tx for use inside shard
Expand Down
5 changes: 0 additions & 5 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,6 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
// no database switch can be performed between those two calls, because they are part of one
// transaction.
void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) {
// We ignore EXEC entries because we they have no meaning during
// the LOAD phase on replica.
if (item.opcode == journal::Op::EXEC)
return;

// To enable journal flushing to sync after non auto journal command is executed we call
// TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no
// additional journal change to serialize, it simply invokes PushSerializedToChannel.
Expand Down
79 changes: 4 additions & 75 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ Transaction::Transaction(const CommandId* cid) : cid_{cid} {
string_view cmd_name(cid_->name());
if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") {
multi_.reset(new MultiData);
multi_->shard_journal_write.resize(shard_set->size(), false);

multi_->mode = NOT_DETERMINED;
multi_->role = DEFAULT;
}
Expand All @@ -153,7 +151,6 @@ Transaction::Transaction(const Transaction* parent, ShardId shard_id,
}

multi_->role = SQUASHED_STUB;
multi_->shard_journal_write.resize(1);

MultiUpdateWithParent(parent);
if (slot_id.has_value()) {
Expand Down Expand Up @@ -597,11 +594,6 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
// This is the last hop, so clear cont_trans if its held by the current tx
shard->RemoveContTx(this);

if (IsAtomicMulti()) { // Can only be true if run through ScheduleSingleHop
DCHECK(cid_->IsMultiTransactional());
MultiReportJournalOnShard(shard);
}

// It has 2 responsibilities.
// 1: to go over potential wakened keys, verify them and activate watch queues.
// 2: if this transaction was notified and finished running - to remove it from the head
Expand Down Expand Up @@ -758,15 +750,6 @@ void Transaction::ScheduleInternal() {
}
}

void Transaction::ReportWritesSquashedMulti(absl::FunctionRef<bool(ShardId)> had_write) {
DCHECK(multi_);
for (unsigned i = 0; i < multi_->shard_journal_write.size(); i++)
multi_->shard_journal_write[i] |= had_write(i);

// Update imemdiately if we decide to conclude after one hop without UnlockMulti
multi_->shard_journal_cnt = CalcMultiNumOfShardJournals();
}

// Runs in the coordinator fiber.
void Transaction::UnlockMulti() {
VLOG(1) << "UnlockMulti " << DebugId();
Expand All @@ -782,8 +765,6 @@ void Transaction::UnlockMulti() {
sharded_keys[sid].emplace_back(fp);
}

multi_->shard_journal_cnt = ServerState::tlocal()->journal() ? CalcMultiNumOfShardJournals() : 0;

use_count_.fetch_add(shard_data_.size(), std::memory_order_relaxed);

DCHECK_EQ(shard_data_.size(), shard_set->size());
Expand All @@ -798,16 +779,6 @@ void Transaction::UnlockMulti() {
VLOG(1) << "UnlockMultiEnd " << DebugId();
}

uint32_t Transaction::CalcMultiNumOfShardJournals() const {
uint32_t shard_journals_cnt = 0;
for (bool was_shard_write : multi_->shard_journal_write) {
if (was_shard_write) {
++shard_journals_cnt;
}
}
return shard_journals_cnt;
}

OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
Execute(cb, true);
return local_result_;
Expand Down Expand Up @@ -919,14 +890,6 @@ const absl::flat_hash_set<std::pair<ShardId, LockFp>>& Transaction::GetMultiFps(
return multi_->tag_fps;
}

void Transaction::FIX_ConcludeJournalExec() {
if (!multi_->shard_journal_write.front())
return;

multi_->shard_journal_cnt = 1;
MultiReportJournalOnShard(EngineShard::tlocal());
}

string Transaction::DEBUG_PrintFailState(ShardId sid) const {
auto res = StrCat(
"usc: ", unique_shard_cnt_, ", name:", GetCId()->name(),
Expand Down Expand Up @@ -1262,21 +1225,9 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
return result;
}

void Transaction::MultiReportJournalOnShard(EngineShard* shard) const {
DCHECK_EQ(EngineShard::tlocal(), shard);
auto* journal = shard->journal();
size_t write_idx = multi_->role == SQUASHED_STUB ? 0 : shard->shard_id();
if (journal != nullptr && multi_->shard_journal_write[write_idx]) {
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, multi_->shard_journal_cnt,
unique_slot_checker_.GetUniqueSlotId(), {}, true);
}
}

void Transaction::UnlockMultiShardCb(absl::Span<const LockFp> fps, EngineShard* shard) {
DCHECK(multi_ && multi_->lock_mode);

MultiReportJournalOnShard(shard);

if (multi_->mode == GLOBAL) {
shard->shard_lock()->Release(IntentLock::EXCLUSIVE);
} else {
Expand Down Expand Up @@ -1402,37 +1353,15 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul
}
// Record to journal autojournal commands, here we allow await which anables writing to sync
// the journal change.
LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false, true);
LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, true);
}

void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload,
uint32_t shard_cnt, bool multi_commands,
bool allow_await) const {
auto journal = shard->journal();
CHECK(journal);

if (multi_) {
if (multi_->role != SQUASHED_STUB)
multi_->shard_journal_write[shard->shard_id()] = true;
else
multi_->shard_journal_write[0] = true;
}

bool is_multi = multi_commands || IsAtomicMulti();

auto opcode = is_multi ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND;
journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, unique_slot_checker_.GetUniqueSlotId(),
std::move(payload), allow_await);
}

void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const {
if (multi_) {
return;
}
uint32_t shard_cnt, bool allow_await) const {
auto journal = shard->journal();
CHECK(journal);
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt,
unique_slot_checker_.GetUniqueSlotId(), {}, false);
journal->RecordEntry(txid_, journal::Op::COMMAND, db_index_, shard_cnt,
unique_slot_checker_.GetUniqueSlotId(), std::move(payload), allow_await);
}

void Transaction::ReviveAutoJournal() {
Expand Down
26 changes: 2 additions & 24 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,6 @@ class Transaction {
// Start multi in NON_ATOMIC mode.
void StartMultiNonAtomic();

// Report which shards had write commands that executed on stub transactions
// and thus did not mark itself in MultiData::shard_journal_write.
void ReportWritesSquashedMulti(absl::FunctionRef<bool(ShardId)> had_write);

// Unlock key locks of a multi transaction.
void UnlockMulti();

Expand Down Expand Up @@ -325,14 +321,9 @@ class Transaction {
// to it must not block.
void PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdArgList args);

// Write a journal entry to a shard journal with the given payload. When logging a non-automatic
// journal command, multiple journal entries may be necessary. In this case, call with set
// multi_commands to true and call the FinishLogJournalOnShard function after logging the final
// entry.
// Write a journal entry to a shard journal with the given payload.
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, uint32_t shard_cnt,
bool multi_commands, bool allow_await) const;

void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const;
bool allow_await) const;

// Re-enable auto journal for commands marked as NO_AUTOJOURNAL. Call during setup.
void ReviveAutoJournal();
Expand All @@ -343,9 +334,6 @@ class Transaction {
// Get keys multi transaction was initialized with, normalized and unique
const absl::flat_hash_set<std::pair<ShardId, LockFp>>& GetMultiFps() const;

// Send journal EXEC opcode after a series of MULTI commands on the currently active shard
void FIX_ConcludeJournalExec();

// Print in-dept failure state for debugging.
std::string DEBUG_PrintFailState(ShardId sid) const;

Expand Down Expand Up @@ -442,13 +430,6 @@ class Transaction {
bool concluding = false;

unsigned cmd_seq_num = 0; // used for debugging purposes.
unsigned shard_journal_cnt;

// The shard_journal_write vector variable is used to determine the number of shards
// involved in a multi-command transaction. This information is utilized by replicas when
// executing multi-command. For every write to a shard journal, the corresponding index in the
// vector is marked as true.
absl::InlinedVector<bool, 4> shard_journal_write;
};

enum CoordinatorState : uint8_t {
Expand Down Expand Up @@ -543,9 +524,6 @@ class Transaction {
// Set time_now_ms_
void InitTxTime();

// If journaling is enabled, report final exec opcode to finish the chain of commands.
void MultiReportJournalOnShard(EngineShard* shard) const;

void UnlockMultiShardCb(absl::Span<const LockFp> fps, EngineShard* shard);

// In a multi-command transaction, we determine the number of shard journals that we wrote entries
Expand Down
14 changes: 4 additions & 10 deletions src/server/tx_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,15 @@ size_t ShardArgs::Size() const {
}

void RecordJournal(const OpArgs& op_args, string_view cmd, const ShardArgs& args,
uint32_t shard_cnt, bool multi_commands) {
uint32_t shard_cnt) {
VLOG(2) << "Logging command " << cmd << " from txn " << op_args.tx->txid();
op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, multi_commands,
false);
op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, false);
}

void RecordJournal(const OpArgs& op_args, std::string_view cmd, facade::ArgSlice args,
uint32_t shard_cnt, bool multi_commands) {
uint32_t shard_cnt) {
VLOG(2) << "Logging command " << cmd << " from txn " << op_args.tx->txid();
op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, multi_commands,
false);
}

void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt) {
op_args.tx->FinishLogJournalOnShard(op_args.shard, shard_cnt);
op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, false);
}

void RecordExpiry(DbIndex dbid, string_view key) {
Expand Down
7 changes: 2 additions & 5 deletions src/server/tx_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,9 @@ class ShardArgs {

// Record non auto journal command with own txid and dbid.
void RecordJournal(const OpArgs& op_args, std::string_view cmd, const ShardArgs& args,
uint32_t shard_cnt = 1, bool multi_commands = false);
uint32_t shard_cnt = 1);
void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args,
uint32_t shard_cnt = 1, bool multi_commands = false);

// Record non auto journal command finish. Call only when command translates to multi commands.
void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt);
uint32_t shard_cnt = 1);

// Record expiry in journal with independent transaction. Must be called from shard thread holding
// key.
Expand Down

0 comments on commit ccada87

Please sign in to comment.