Skip to content

Commit

Permalink
Merge #106793
Browse files Browse the repository at this point in the history
106793: kvserver: communicate snapshot index back along with snapshot response r=erikgrinaker a=tbg

This addresses the following race:

- n1 runs a ConfChange that adds n2 as a learner.
- n1 sends MsgApp to the learner.
- n1 starts the INITIAL snapshot, say at index 100.
- n2 receives n1's MsgApp. Since it's an uninitialized
  Replica and its log is empty, it rejects this MsgApp.
- n2 receives and applies the INITIAL snapshot, which prompts it to send an
  affirmative MsgAppResp to n1.
- n1's RawNode now tracks n2 as StateProbe (via call to ReportSnapshot(success))
- n1 receives the MsgApp rejection; n2 regresses to StateSnapshot because the
  rejection comes with a RejectHint (suggested next index to try) of zero,
  which is not in n1's log. In particular, the SnapshotIndex will likely be
  higher than the index of the snapshot actually sent, say 101.
- n1 receives the affirmative MsgAppResp (for index 100). However, 100 < 101
  so this is ignored and the follower remains in StateSnapshot.

With this commit, the last two steps cannot happen: n2 transitions straight to
StateReplicate because we step a copy of the affirmative MsgAppResp in. The
later rejection will be dropped, since it is stale (you can't hint at index zero
when you already have a positive index confirmed).

I will add that there is no great testing for the above other than stressing
the test with additional logging, noting the symptoms, and noting that they
disappear with this commit. Scripted testing of this code is within reach[^1]
but is outside of the scope of this PR.

[^1]: #105177

There is an additional bit of brittleness that is silently suppressed by this
commit, but which deserves to be fixed independently because how the problem
gets avoided seems accidental and incomplete. When raft requests a snapshot, it notes its
current LastIndex and uses it as the PendingSnapshot for the follower's
Progress.

At the time of writing, MsgAppResp that reconnect the follower to the log but
which are not greater than or equal to PendingSnapshot are ignored. In effect,
this means that perfectly good snapshots are thrown away if they happen to be a
little bit stale. In the example above, the snapshot is stale: PendingSnapshot
is 101, but the snapshot is at index 100. Then how does this commit (mostly)
fix the problem, i.e. why isn't the snapshot discarded? The key is that when we
synchronously step the MsgAppResp(100) into the leader's RawNode, the rejection
hasn't arrived yet and so the follower transitions into StateReplicate with a
Match of 100. This is then enough so that raft recognizes the rejected MsgApp
as stale (since it would regress on durably stored entries). However, there is
an alternative example where the rejection arrives earlier: after the snapshot
index has been picked, but before the follower has been transitioned into
StateReplicate. For this to have a negative effect, an entry has to be appended
to the leader's log between generating the snapshot and handling the rejection.
Without the combination of delegated snapshots and sustained write activity on
the leader, this window is small, and this combination is usually not present
in tests but it may well be relevant in "real" clusters. We track addressing
this in #106813.

Closes #87554.
Closes #97971.
Closes #84242.

Epic: None
Release note (bug fix): removed a source of unnecessary Raft snapshots during
replica movement.


Co-authored-by: Tobias Grieger <[email protected]>
Co-authored-by: Andrew Baptist <[email protected]>
  • Loading branch information
3 people committed Jul 21, 2023
2 parents 32f22f6 + 0e3b84b commit 4d7ec64
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 49 deletions.
2 changes: 0 additions & 2 deletions pkg/kv/kvserver/client_relocate_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -166,7 +165,6 @@ func usesAtomicReplicationChange(ops []kvpb.ReplicationChange) bool {

func TestAdminRelocateRange(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.WithIssue(t, 84242, "flaky test")
defer log.Scope(t).Close(t)

ctx := context.Background()
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,14 @@ message SnapshotResponse {
//
// MIGRATION: only guaranteed to be set when the message field is no longer there.
errorspb.EncodedError encoded_error = 5 [(gogoproto.nullable) = false];

// msg_app_resp stores an optional MsgAppResp the receiving RawNode may have
// generated in response to applying the snapshot. This message will also have
// been handed to the raft transport, but it is helpful to step it into the
// sender manually to avoid the race described in:
//
// https://github.com/cockroachdb/cockroach/issues/97971
raftpb.Message msg_app_resp = 6;
}

// TODO(baptist): Extend this if necessary to separate out the request for the throttle.
Expand Down Expand Up @@ -356,6 +364,14 @@ message DelegateSnapshotResponse {
// collected_spans stores trace spans recorded during the execution of this
// request.
repeated util.tracing.tracingpb.RecordedSpan collected_spans = 3 [(gogoproto.nullable) = false];

// msg_app_resp stores an optional MsgAppResp the receiving RawNode may have
// generated in response to applying the snapshot. This message will also have
// been handed to the raft transport, but it is helpful to step it into the
// sender manually to avoid the race described in:
//
// https://github.com/cockroachdb/cockroach/issues/97971
raftpb.Message msg_app_resp = 4;
}

// ConfChangeContext is encoded in the raftpb.ConfChange.Context field.
Expand Down
26 changes: 15 additions & 11 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,10 @@ func (t *RaftTransport) dropFlowTokensForDisconnectedNodes() {

// SendSnapshot streams the given outgoing snapshot. The caller is responsible
// for closing the OutgoingSnapshot.
//
// The optional (but usually present) returned message is an MsgAppResp that
// results from the follower applying the snapshot, acking the log at the index
// of the snapshot.
func (t *RaftTransport) SendSnapshot(
ctx context.Context,
storePool *storepool.StorePool,
Expand All @@ -1107,17 +1111,17 @@ func (t *RaftTransport) SendSnapshot(
newWriteBatch func() storage.WriteBatch,
sent func(),
recordBytesSent snapshotRecordMetrics,
) error {
) (*kvserverpb.SnapshotResponse, error) {
nodeID := header.RaftMessageRequest.ToReplica.NodeID

conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass)
if err != nil {
return err
return nil, err
}
client := NewMultiRaftClient(conn)
stream, err := client.RaftSnapshot(ctx)
if err != nil {
return err
return nil, err
}

defer func() {
Expand All @@ -1132,18 +1136,18 @@ func (t *RaftTransport) SendSnapshot(
// and determines if it encountered any errors when sending the snapshot.
func (t *RaftTransport) DelegateSnapshot(
ctx context.Context, req *kvserverpb.DelegateSendSnapshotRequest,
) error {
) (*kvserverpb.DelegateSnapshotResponse, error) {
nodeID := req.DelegatedSender.NodeID
conn, err := t.dialer.Dial(ctx, nodeID, rpc.DefaultClass)
if err != nil {
return errors.Mark(err, errMarkSnapshotError)
return nil, errors.Mark(err, errMarkSnapshotError)
}
client := NewMultiRaftClient(conn)

// Creates a rpc stream between the leaseholder and sender.
stream, err := client.DelegateRaftSnapshot(ctx)
if err != nil {
return errors.Mark(err, errMarkSnapshotError)
return nil, errors.Mark(err, errMarkSnapshotError)
}
defer func() {
if err := stream.CloseSend(); err != nil {
Expand All @@ -1154,12 +1158,12 @@ func (t *RaftTransport) DelegateSnapshot(
// Send the request.
wrappedRequest := &kvserverpb.DelegateSnapshotRequest{Value: &kvserverpb.DelegateSnapshotRequest_Send{Send: req}}
if err := stream.Send(wrappedRequest); err != nil {
return errors.Mark(err, errMarkSnapshotError)
return nil, errors.Mark(err, errMarkSnapshotError)
}
// Wait for response to see if the receiver successfully applied the snapshot.
resp, err := stream.Recv()
if err != nil {
return errors.Mark(
return nil, errors.Mark(
errors.Wrapf(err, "%v: remote failed to send snapshot", req), errMarkSnapshotError,
)
}
Expand All @@ -1175,14 +1179,14 @@ func (t *RaftTransport) DelegateSnapshot(

switch resp.Status {
case kvserverpb.DelegateSnapshotResponse_ERROR:
return errors.Mark(
return nil, errors.Mark(
errors.Wrapf(resp.Error(), "error sending couldn't accept %v", req), errMarkSnapshotError)
case kvserverpb.DelegateSnapshotResponse_APPLIED:
// This is the response we're expecting. Snapshot successfully applied.
log.VEventf(ctx, 3, "%s: delegated snapshot was successfully applied", resp)
return nil
return resp, nil
default:
return err
return nil, err
}
}

Expand Down
43 changes: 34 additions & 9 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2872,7 +2872,23 @@ func (r *Replica) sendSnapshotUsingDelegate(
retErr = timeutil.RunWithTimeout(
ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error {
// Sending snapshot
return r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest)
resp, err := r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest)
if err != nil {
return err
}
if resp.MsgAppResp != nil {
_ = r.withRaftGroup(func(rn *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) {
msg := *resp.MsgAppResp
// With a delegated snapshot, the recipient received the snapshot
// from another replica and will thus respond to it instead. But the
// message is valid for the actual originator of the send as well.
msg.To = rn.BasicStatus().ID
// We do want to unquiesce here - we wouldn't ever want state transitions
// on a quiesced replica.
return true, rn.Step(*resp.MsgAppResp)
})
}
return nil
},
)
if !selfDelegate {
Expand Down Expand Up @@ -3053,7 +3069,7 @@ func (r *Replica) followerSendSnapshot(
ctx context.Context,
recipient roachpb.ReplicaDescriptor,
req *kvserverpb.DelegateSendSnapshotRequest,
) error {
) (*raftpb.Message, error) {
ctx = r.AnnotateCtx(ctx)
sendThreshold := traceSnapshotThreshold.Get(&r.ClusterSettings().SV)
if sendThreshold > 0 {
Expand Down Expand Up @@ -3082,28 +3098,28 @@ func (r *Replica) followerSendSnapshot(
// expensive to send.
err := r.validateSnapshotDelegationRequest(ctx, req)
if err != nil {
return err
return nil, err
}

// Throttle snapshot sending. Obtain the send semaphore and determine the rate limit.
rangeSize := r.GetMVCCStats().Total()
cleanup, err := r.store.reserveSendSnapshot(ctx, req, rangeSize)
if err != nil {
return errors.Wrap(err, "Unable to reserve space for sending this snapshot")
return nil, errors.Wrap(err, "Unable to reserve space for sending this snapshot")
}
defer cleanup()

// Check validity again, it is possible that the pending request should not be
// sent after we are doing waiting.
err = r.validateSnapshotDelegationRequest(ctx, req)
if err != nil {
return err
return nil, err
}

snapType := req.Type
snap, err := r.GetSnapshot(ctx, snapType, req.SnapId)
if err != nil {
return errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType)
return nil, errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType)
}
defer snap.Close()
log.Event(ctx, "generated snapshot")
Expand Down Expand Up @@ -3174,9 +3190,10 @@ func (r *Replica) followerSendSnapshot(
}
}

return timeutil.RunWithTimeout(
var msgAppResp *raftpb.Message
if err := timeutil.RunWithTimeout(
ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error {
return r.store.cfg.Transport.SendSnapshot(
resp, err := r.store.cfg.Transport.SendSnapshot(
ctx,
r.store.cfg.StorePool,
header,
Expand All @@ -3185,8 +3202,16 @@ func (r *Replica) followerSendSnapshot(
sent,
recordBytesSent,
)
if err != nil {
return err
}
msgAppResp = resp.MsgAppResp
return nil
},
)
); err != nil {
return nil, err
}
return msgAppResp, nil
}

// replicasCollocated is used in AdminMerge to ensure that the ranges are
Expand Down
49 changes: 49 additions & 0 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2441,3 +2441,52 @@ func TestRebalancingAndCrossRegionZoneSnapshotMetrics(t *testing.T) {
})

}

// TestAddVotersWithoutRaftQueue verifies that in normal operations Raft
// snapshots are not required. This test creates a range with a single voter,
// then adds two additional voters. Most of the time this succeeds, however it
// fails (today) occasionally due to the addition of the first voter being
// "incomplete" and therefore the second voter is not able to be added because
// there is no quorum.
//
// Specifically the following sequence of events happens when the leader adds
// the first voter:
// 1. AdminChangeReplicasRequest is processed on n1.
// a) Adds a n2 as a LEARNER to raft.
// b) Sends an initial snapshot to n2.
// c) n2 receives and applies the snapshot.
// d) n2 responds that it successfully applied the snapshot.
// e) n1 receives the response and updates state to Follower.
// 2. Before step c above, n1 sends a MsgApp to n2
// a) MsgApp - entries up-to and including the conf change.
// b) The MsgApp is received and REJECTED because the term is wrong.
// c) After 1e above, n1 receives the rejection.
// d) n1 updates n2 from StateReplicate to StateProbe and then StateSnapshot.
//
// From n2's perspective, it receives the MsgApp prior to the initial snapshot.
// This results in it responding with a rejected MsgApp. Later it receives the
// snapshot and correctly applies it. However, when n1 sees the rejected MsgApp,
// it moves n2 status to StateProbe and stops sending Raft updates to it as it
// plans to fix it with a Raft Snapshot. As the raft snapshot queue is disabled
// this never happens and the state is stuck as a non-Learner in StateProbe. At
// this point, the Raft group is wedged since it only has 1/2 nodes available
// for Raft consensus.
func TestAddVotersWithoutRaftQueue(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

// Disable the raft snapshot queue to make sure we don't require a raft snapshot.
tc := testcluster.StartTestCluster(
t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{DisableRaftSnapshotQueue: true}},
},
ReplicationMode: base.ReplicationManual,
},
)
defer tc.Stopper().Stop(ctx)

key := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, key, tc.Target(1))
tc.AddVotersOrFatal(t, key, tc.Target(2))
}
16 changes: 13 additions & 3 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,18 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
if err := r.applySnapshot(ctx, inSnap, snap, hs, subsumedRepls); err != nil {
return stats, errors.Wrap(err, "while applying snapshot")
}
for _, msg := range msgStorageAppend.Responses {
// The caller would like to see the MsgAppResp that usually results from
// applying the snapshot synchronously, so fish it out.
if msg.To == uint64(inSnap.FromReplica.ReplicaID) &&
msg.Type == raftpb.MsgAppResp &&
!msg.Reject &&
msg.Index == snap.Metadata.Index {

inSnap.msgAppRespCh <- msg
break
}
}
stats.tSnapEnd = timeutil.Now()
stats.snap.applied = true

Expand Down Expand Up @@ -1827,9 +1839,7 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID
// index it requested is now actually durable on the follower. Note also that
// the follower will generate an MsgAppResp reflecting the applied snapshot
// which typically moves the follower to StateReplicate when (if) received
// by the leader.
//
// See: https://github.com/cockroachdb/cockroach/issues/87581
// by the leader, which as of #106793 we do synchronously.
if err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) {
raftGroup.ReportSnapshot(uint64(to), snapStatus)
return true, nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ type IncomingSnapshot struct {
DataSize int64
snapType kvserverpb.SnapshotRequest_Type
placeholder *ReplicaPlaceholder
raftAppliedIndex kvpb.RaftIndex // logging only
raftAppliedIndex kvpb.RaftIndex // logging only
msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied
}

func (s IncomingSnapshot) String() string {
Expand Down
25 changes: 22 additions & 3 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ func (s *Store) HandleDelegatedSnapshot(
}

// Pass the request to the sender replica.
if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req); err != nil {
msgAppResp, err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req)
if err != nil {
// If an error occurred during snapshot sending, send an error response.
return &kvserverpb.DelegateSnapshotResponse{
Status: kvserverpb.DelegateSnapshotResponse_ERROR,
Expand All @@ -195,6 +196,7 @@ func (s *Store) HandleDelegatedSnapshot(
return &kvserverpb.DelegateSnapshotResponse{
Status: kvserverpb.DelegateSnapshotResponse_APPLIED,
CollectedSpans: sp.GetConfiguredRecording(),
MsgAppResp: msgAppResp,
}
}

Expand Down Expand Up @@ -426,8 +428,9 @@ func (s *Store) processRaftRequestWithReplica(
// will have been removed.
func (s *Store) processRaftSnapshotRequest(
ctx context.Context, snapHeader *kvserverpb.SnapshotRequest_Header, inSnap IncomingSnapshot,
) *kvpb.Error {
return s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func(
) (*raftpb.Message, *kvpb.Error) {
var msgAppResp *raftpb.Message
pErr := s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func(
ctx context.Context, r *Replica,
) (pErr *kvpb.Error) {
ctx = r.AnnotateCtx(ctx)
Expand Down Expand Up @@ -496,8 +499,24 @@ func (s *Store) processRaftSnapshotRequest(
log.Infof(ctx, "ignored stale snapshot at index %d", snapHeader.RaftMessageRequest.Message.Snapshot.Metadata.Index)
s.metrics.RangeSnapshotRecvUnusable.Inc(1)
}
// If the snapshot was applied and acked with an MsgAppResp, return that
// message up the stack. We're using msgAppRespCh as a shortcut to avoid
// plumbing return parameters through an additional few layers of raft
// handling.
//
// NB: in practice there's always an MsgAppResp here, but it is better not
// to rely on what is essentially discretionary raft behavior.
select {
case msg := <-inSnap.msgAppRespCh:
msgAppResp = &msg
default:
}
return nil
})
if pErr != nil {
return nil, pErr
}
return msgAppResp, nil
}

// HandleRaftResponse implements the IncomingRaftMessageHandler interface. Per
Expand Down
Loading

0 comments on commit 4d7ec64

Please sign in to comment.