diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index e2a04a7bd595..b8071de5ce23 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -171,6 +171,8 @@ type IsLiveCallback func(livenesspb.Liveness) // HeartbeatCallback is invoked whenever this node updates its own liveness status, // indicating that it is alive. +// TODO(baptist): Remove this callback. The only usage of this is for logging an +// event at startup. This is a little heavyweight of a mechanism for that. type HeartbeatCallback func(context.Context) // NodeLiveness is a centralized failure detector that coordinates @@ -221,9 +223,24 @@ type NodeLiveness struct { onNodeDecommissioning OnNodeDecommissionCallback // noop if nil engineSyncs *singleflight.Group + // onIsLive is a callback registered by stores prior to starting liveness. + // It fires when a node transitions from not live to live. + onIsLive []IsLiveCallback // see RegisterCallback + + // onSelfHeartbeat is invoked after every successful heartbeat + // of the local liveness instance's heartbeat loop. + onSelfHeartbeat HeartbeatCallback + + // engines is written to before heartbeating to avoid maintaining liveness + // when a local disks is stalled. + engines []storage.Engine + + // Set to true once Start is called. RegisterCallback can not be called after + // Start is called. + started syncutil.AtomicBool + mu struct { syncutil.RWMutex - onIsLive []IsLiveCallback // see NodeLivenessOptions.OnSelfLive // nodes is an in-memory cache of liveness records that NodeLiveness // knows about (having learnt of them through gossip or through KV). // It's a look-aside cache, and is accessed primarily through @@ -257,11 +274,7 @@ type NodeLiveness struct { // // More concretely, we want `getLivenessRecordFromKV` to be tucked away // within `getLivenessLocked`. - nodes map[roachpb.NodeID]Record - onSelfLive HeartbeatCallback // set in Start() - // Before heartbeating, we write to each of these engines to avoid - // maintaining liveness when a local disks is stalled. - engines []storage.Engine // set in Start() + nodes map[roachpb.NodeID]Record } } @@ -278,11 +291,8 @@ type Record struct { // NodeLivenessOptions is the input to NewNodeLiveness. // -// Note that there is yet another struct, NodeLivenessStartOptions, which -// is supplied when the instance is started. This is necessary as during -// server startup, some inputs can only be constructed at Start time. The -// separation has grown organically and various options could in principle -// be moved back and forth. +// The IsLiveCallbacks are registered after construction but before Start is +// called. Everything else is initialized through these Options. type NodeLivenessOptions struct { AmbientCtx log.AmbientContext Stopper *stop.Stopper @@ -301,6 +311,8 @@ type NodeLivenessOptions struct { // OnNodeDecommissioning is invoked when a node is detected to be // decommissioning. OnNodeDecommissioning OnNodeDecommissionCallback + Engines []storage.Engine + OnSelfHeartbeat HeartbeatCallback } // NewNodeLiveness returns a new instance of NodeLiveness configured @@ -321,6 +333,8 @@ func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness { onNodeDecommissioned: opts.OnNodeDecommissioned, onNodeDecommissioning: opts.OnNodeDecommissioning, engineSyncs: singleflight.NewGroup("engine sync", "engine"), + engines: opts.Engines, + onSelfHeartbeat: opts.OnSelfHeartbeat, } nl.metrics = Metrics{ LiveNodes: metric.NewFunctionalGauge(metaLiveNodes, nl.numLiveNodes), @@ -729,34 +743,37 @@ func (nl *NodeLiveness) IsAvailableNotDraining(nodeID roachpb.NodeID) bool { // detected to be decommissioning. type OnNodeDecommissionCallback func(nodeID roachpb.NodeID) -// NodeLivenessStartOptions are the arguments to `NodeLiveness.Start`. -type NodeLivenessStartOptions struct { - Engines []storage.Engine - // OnSelfLive is invoked after every successful heartbeat - // of the local liveness instance's heartbeat loop. - OnSelfLive HeartbeatCallback -} - // Start starts a periodic heartbeat to refresh this node's last // heartbeat in the node liveness table. The optionally provided // HeartbeatCallback will be invoked whenever this node updates its // own liveness. The slice of engines will be written to before each // heartbeat to avoid maintaining liveness in the presence of disk stalls. -func (nl *NodeLiveness) Start(ctx context.Context, opts NodeLivenessStartOptions) { +// TODO(baptist): If we completely remove epoch leases, this can be merged with +// the NewNodeLiveness function. Currently the liveness is required prior to +// Start getting called in replica_range_lease. For non-epoch leases this should +// be possible. +func (nl *NodeLiveness) Start(ctx context.Context) { log.VEventf(ctx, 1, "starting node liveness instance") + if nl.started.Get() { + // This is meant to prevent tests from calling start twice. + log.Fatal(ctx, "liveness already started") + } + retryOpts := base.DefaultRetryOptions() retryOpts.Closer = nl.stopper.ShouldQuiesce() - if len(opts.Engines) == 0 { - // Avoid silently forgetting to pass the engines. It happened before. - log.Fatalf(ctx, "must supply at least one engine") + nl.started.Set(true) + now := nl.clock.Now() + // We may have received some liveness records from Gossip prior to Start being + // called. We need to go through and notify all the callers of them now. + for _, l := range nl.GetLivenesses() { + if l.IsLive(now) { + for _, fn := range nl.onIsLive { + fn(l) + } + } } - nl.mu.Lock() - nl.mu.onSelfLive = opts.OnSelfLive - nl.mu.engines = opts.Engines - nl.mu.Unlock() - _ = nl.stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: "liveness-hb", SpanOpt: stop.SterileRootSpan}, func(context.Context) { ambient := nl.ambientCtx ambient.AddLogTag("liveness-hb", nil) @@ -1254,11 +1271,12 @@ func (nl *NodeLiveness) Metrics() Metrics { } // RegisterCallback registers a callback to be invoked any time a -// node's IsLive() state changes to true. +// node's IsLive() state changes to true. This must be called before Start. func (nl *NodeLiveness) RegisterCallback(cb IsLiveCallback) { - nl.mu.Lock() - defer nl.mu.Unlock() - nl.mu.onIsLive = append(nl.mu.onIsLive, cb) + if nl.started.Get() { + log.Fatalf(context.TODO(), "RegisterCallback called after Start") + } + nl.onIsLive = append(nl.onIsLive, cb) } // updateLiveness does a conditional put on the node liveness record for the @@ -1292,11 +1310,8 @@ func (nl *NodeLiveness) updateLiveness( // we need to return a timely NLHE to the caller such that it will try a // different replica and nudge it into acquiring the lease. This can leak a // goroutine in the case of a stalled disk. - nl.mu.RLock() - engines := nl.mu.engines - nl.mu.RUnlock() - resultCs := make([]singleflight.Future, len(engines)) - for i, eng := range engines { + resultCs := make([]singleflight.Future, len(nl.engines)) + for i, eng := range nl.engines { eng := eng // pin the loop variable resultCs[i], _ = nl.engineSyncs.DoChan(ctx, strconv.Itoa(i), @@ -1404,11 +1419,8 @@ func (nl *NodeLiveness) updateLivenessAttempt( return Record{}, err } - nl.mu.RLock() - cb := nl.mu.onSelfLive - nl.mu.RUnlock() - if cb != nil { - cb(ctx) + if nl.onSelfHeartbeat != nil { + nl.onSelfHeartbeat(ctx) } return Record{Liveness: update.newLiveness, raw: v.TagAndDataBytes()}, nil } @@ -1416,6 +1428,7 @@ func (nl *NodeLiveness) updateLivenessAttempt( // maybeUpdate replaces the liveness (if it appears newer) and invokes the // registered callbacks if the node became live in the process. func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record) { + if newLivenessRec.Liveness == (livenesspb.Liveness{}) { log.Fatal(ctx, "invalid new liveness record; found to be empty") } @@ -1436,7 +1449,14 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record) var onIsLive []IsLiveCallback if shouldReplace { nl.mu.nodes[newLivenessRec.NodeID] = newLivenessRec - onIsLive = append(onIsLive, nl.mu.onIsLive...) + // NB: If we are not started, we don't call the onIsLive callbacks since + // they can volatile. This is a bit of a tangled mess since the startup of + // liveness requires the stores to be started, but stores can't start until + // maybeUpdate can run. Ideally we could cache all these updates and + // initialize onIsLive as part of start. + if nl.started.Get() { + onIsLive = append(onIsLive, nl.onIsLive...) + } } nl.mu.Unlock() diff --git a/pkg/kv/kvserver/node_liveness_test.go b/pkg/kv/kvserver/node_liveness_test.go index 816d5254ac96..c078152c7344 100644 --- a/pkg/kv/kvserver/node_liveness_test.go +++ b/pkg/kv/kvserver/node_liveness_test.go @@ -291,14 +291,27 @@ func TestNodeIsLiveCallback(t *testing.T) { ctx := context.Background() manualClock := hlc.NewHybridManualClock() + var started syncutil.AtomicBool + var cbMu syncutil.Mutex + cbs := map[roachpb.NodeID]struct{}{} tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ Server: &server.TestingKnobs{ WallClock: manualClock, }, + NodeLiveness: kvserver.NodeLivenessTestingKnobs{ + IsLiveCallback: func(l livenesspb.Liveness) { + if started.Get() { + cbMu.Lock() + defer cbMu.Unlock() + cbs[l.NodeID] = struct{}{} + } + }, + }, }, }, }) @@ -307,14 +320,9 @@ func TestNodeIsLiveCallback(t *testing.T) { // Verify liveness of all nodes for all nodes. verifyLiveness(t, tc) pauseNodeLivenessHeartbeatLoops(tc) - - var cbMu syncutil.Mutex - cbs := map[roachpb.NodeID]struct{}{} - tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).RegisterCallback(func(l livenesspb.Liveness) { - cbMu.Lock() - defer cbMu.Unlock() - cbs[l.NodeID] = struct{}{} - }) + // Only record entires after we have paused the normal heartbeat loop to make + // sure they come from the Heartbeat below. + started.Set(true) // Advance clock past the liveness threshold. manualClock.Increment(tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).GetLivenessThreshold().Nanoseconds() + 1) diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 798de7e0bbb4..e5da7381bba4 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -494,6 +495,8 @@ type NodeLivenessTestingKnobs struct { // StorePoolNodeLivenessFn is the function used by the StorePool to determine // whether a node is live or not. StorePoolNodeLivenessFn storepool.NodeLivenessFunc + // IsLiveCallback, will be called when a node becomes live. + IsLiveCallback liveness.IsLiveCallback } var _ base.ModuleTestingKnobs = NodeLivenessTestingKnobs{} diff --git a/pkg/server/server.go b/pkg/server/server.go index e07ac24df99b..7f0640c56ae2 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -514,13 +514,28 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { decomNodeMap.onNodeDecommissioned(liveness.NodeID) }, + Engines: engines, + OnSelfHeartbeat: func(ctx context.Context) { + now := clock.Now() + if err := stores.VisitStores(func(s *kvserver.Store) error { + return s.WriteLastUpTimestamp(ctx, now) + }); err != nil { + log.Ops.Warningf(ctx, "writing last up timestamp: %v", err) + } + }, }) + registry.AddMetricStruct(nodeLiveness.Metrics()) nodeLivenessFn := storepool.MakeStorePoolNodeLivenessFunc(nodeLiveness) - if nodeLivenessKnobs, ok := cfg.TestingKnobs.NodeLiveness.(kvserver.NodeLivenessTestingKnobs); ok && - nodeLivenessKnobs.StorePoolNodeLivenessFn != nil { - nodeLivenessFn = nodeLivenessKnobs.StorePoolNodeLivenessFn + if nodeLivenessKnobs, ok := cfg.TestingKnobs.NodeLiveness.(kvserver.NodeLivenessTestingKnobs); ok { + if nodeLivenessKnobs.StorePoolNodeLivenessFn != nil { + nodeLivenessFn = nodeLivenessKnobs.StorePoolNodeLivenessFn + } + + if nodeLivenessKnobs.IsLiveCallback != nil { + nodeLiveness.RegisterCallback(nodeLivenessKnobs.IsLiveCallback) + } } storePool := storepool.NewStorePool( cfg.AmbientCtx, @@ -1835,17 +1850,7 @@ func (s *Server) PreStart(ctx context.Context) error { // Begin the node liveness heartbeat. Add a callback which records the local // store "last up" timestamp for every store whenever the liveness record is // updated. - s.nodeLiveness.Start(workersCtx, liveness.NodeLivenessStartOptions{ - Engines: s.engines, - OnSelfLive: func(ctx context.Context) { - now := s.clock.Now() - if err := s.node.stores.VisitStores(func(s *kvserver.Store) error { - return s.WriteLastUpTimestamp(ctx, now) - }); err != nil { - log.Ops.Warningf(ctx, "writing last up timestamp: %v", err) - } - }, - }) + s.nodeLiveness.Start(workersCtx) // Begin recording status summaries. if err := s.node.startWriteNodeStatus(base.DefaultMetricsSampleInterval); err != nil { diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index b2de4a294243..ea2fdd22301f 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -200,6 +200,7 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto RenewalDuration: renewal, Settings: cfg.Settings, HistogramWindowInterval: cfg.HistogramWindowInterval, + Engines: []storage.Engine{ltc.Eng}, }) storepool.TimeUntilStoreDead.Override(ctx, &cfg.Settings.SV, storepool.TestTimeUntilStoreDead) cfg.StorePool = storepool.NewStorePool( @@ -281,16 +282,16 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto t.Fatalf("unable to set node descriptor: %s", err) } - if !ltc.DisableLivenessHeartbeat { - cfg.NodeLiveness.Start(ctx, - liveness.NodeLivenessStartOptions{Engines: []storage.Engine{ltc.Eng}}) - } - if err := ltc.Store.Start(ctx, ltc.stopper); err != nil { t.Fatalf("unable to start local test cluster: %s", err) } ltc.Stores.AddStore(ltc.Store) + + if !ltc.DisableLivenessHeartbeat { + cfg.NodeLiveness.Start(ctx) + } + ltc.Cfg = cfg }