Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

liveness: remove one-time init fields from the mu lock #103601

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 63 additions & 43 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ type IsLiveCallback func(livenesspb.Liveness)

// HeartbeatCallback is invoked whenever this node updates its own liveness status,
// indicating that it is alive.
// TODO(baptist): Remove this callback. The only usage of this is for logging an
// event at startup. This is a little heavyweight of a mechanism for that.
type HeartbeatCallback func(context.Context)

// NodeLiveness is a centralized failure detector that coordinates
Expand Down Expand Up @@ -221,9 +223,24 @@ type NodeLiveness struct {
onNodeDecommissioning OnNodeDecommissionCallback // noop if nil
engineSyncs *singleflight.Group

// onIsLive is a callback registered by stores prior to starting liveness.
// It fires when a node transitions from not live to live.
onIsLive []IsLiveCallback // see RegisterCallback

// onSelfHeartbeat is invoked after every successful heartbeat
// of the local liveness instance's heartbeat loop.
onSelfHeartbeat HeartbeatCallback

// engines is written to before heartbeating to avoid maintaining liveness
// when a local disks is stalled.
engines []storage.Engine

// Set to true once Start is called. RegisterCallback can not be called after
// Start is called.
started syncutil.AtomicBool

mu struct {
syncutil.RWMutex
onIsLive []IsLiveCallback // see NodeLivenessOptions.OnSelfLive
// nodes is an in-memory cache of liveness records that NodeLiveness
// knows about (having learnt of them through gossip or through KV).
// It's a look-aside cache, and is accessed primarily through
Expand Down Expand Up @@ -257,11 +274,7 @@ type NodeLiveness struct {
//
// More concretely, we want `getLivenessRecordFromKV` to be tucked away
// within `getLivenessLocked`.
nodes map[roachpb.NodeID]Record
onSelfLive HeartbeatCallback // set in Start()
// Before heartbeating, we write to each of these engines to avoid
// maintaining liveness when a local disks is stalled.
engines []storage.Engine // set in Start()
nodes map[roachpb.NodeID]Record
}
}

Expand All @@ -278,11 +291,8 @@ type Record struct {

// NodeLivenessOptions is the input to NewNodeLiveness.
//
// Note that there is yet another struct, NodeLivenessStartOptions, which
// is supplied when the instance is started. This is necessary as during
// server startup, some inputs can only be constructed at Start time. The
// separation has grown organically and various options could in principle
// be moved back and forth.
// The IsLiveCallbacks are registered after construction but before Start is
// called. Everything else is initialized through these Options.
type NodeLivenessOptions struct {
AmbientCtx log.AmbientContext
Stopper *stop.Stopper
Expand All @@ -301,6 +311,8 @@ type NodeLivenessOptions struct {
// OnNodeDecommissioning is invoked when a node is detected to be
// decommissioning.
OnNodeDecommissioning OnNodeDecommissionCallback
Engines []storage.Engine
OnSelfHeartbeat HeartbeatCallback
}

// NewNodeLiveness returns a new instance of NodeLiveness configured
Expand All @@ -321,6 +333,8 @@ func NewNodeLiveness(opts NodeLivenessOptions) *NodeLiveness {
onNodeDecommissioned: opts.OnNodeDecommissioned,
onNodeDecommissioning: opts.OnNodeDecommissioning,
engineSyncs: singleflight.NewGroup("engine sync", "engine"),
engines: opts.Engines,
onSelfHeartbeat: opts.OnSelfHeartbeat,
}
nl.metrics = Metrics{
LiveNodes: metric.NewFunctionalGauge(metaLiveNodes, nl.numLiveNodes),
Expand Down Expand Up @@ -729,34 +743,37 @@ func (nl *NodeLiveness) IsAvailableNotDraining(nodeID roachpb.NodeID) bool {
// detected to be decommissioning.
type OnNodeDecommissionCallback func(nodeID roachpb.NodeID)

// NodeLivenessStartOptions are the arguments to `NodeLiveness.Start`.
type NodeLivenessStartOptions struct {
Engines []storage.Engine
// OnSelfLive is invoked after every successful heartbeat
// of the local liveness instance's heartbeat loop.
OnSelfLive HeartbeatCallback
}

// Start starts a periodic heartbeat to refresh this node's last
// heartbeat in the node liveness table. The optionally provided
// HeartbeatCallback will be invoked whenever this node updates its
// own liveness. The slice of engines will be written to before each
// heartbeat to avoid maintaining liveness in the presence of disk stalls.
func (nl *NodeLiveness) Start(ctx context.Context, opts NodeLivenessStartOptions) {
// TODO(baptist): If we completely remove epoch leases, this can be merged with
// the NewNodeLiveness function. Currently the liveness is required prior to
// Start getting called in replica_range_lease. For non-epoch leases this should
// be possible.
func (nl *NodeLiveness) Start(ctx context.Context) {
log.VEventf(ctx, 1, "starting node liveness instance")
if nl.started.Get() {
// This is meant to prevent tests from calling start twice.
log.Fatal(ctx, "liveness already started")
}

retryOpts := base.DefaultRetryOptions()
retryOpts.Closer = nl.stopper.ShouldQuiesce()

if len(opts.Engines) == 0 {
// Avoid silently forgetting to pass the engines. It happened before.
log.Fatalf(ctx, "must supply at least one engine")
nl.started.Set(true)
now := nl.clock.Now()
// We may have received some liveness records from Gossip prior to Start being
// called. We need to go through and notify all the callers of them now.
for _, l := range nl.GetLivenesses() {
if l.IsLive(now) {
for _, fn := range nl.onIsLive {
fn(l)
}
}
}

nl.mu.Lock()
nl.mu.onSelfLive = opts.OnSelfLive
nl.mu.engines = opts.Engines
nl.mu.Unlock()

_ = nl.stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: "liveness-hb", SpanOpt: stop.SterileRootSpan}, func(context.Context) {
ambient := nl.ambientCtx
ambient.AddLogTag("liveness-hb", nil)
Expand Down Expand Up @@ -1254,11 +1271,12 @@ func (nl *NodeLiveness) Metrics() Metrics {
}

// RegisterCallback registers a callback to be invoked any time a
// node's IsLive() state changes to true.
// node's IsLive() state changes to true. This must be called before Start.
func (nl *NodeLiveness) RegisterCallback(cb IsLiveCallback) {
nl.mu.Lock()
defer nl.mu.Unlock()
nl.mu.onIsLive = append(nl.mu.onIsLive, cb)
if nl.started.Get() {
log.Fatalf(context.TODO(), "RegisterCallback called after Start")
}
nl.onIsLive = append(nl.onIsLive, cb)
}

// updateLiveness does a conditional put on the node liveness record for the
Expand Down Expand Up @@ -1292,11 +1310,8 @@ func (nl *NodeLiveness) updateLiveness(
// we need to return a timely NLHE to the caller such that it will try a
// different replica and nudge it into acquiring the lease. This can leak a
// goroutine in the case of a stalled disk.
nl.mu.RLock()
engines := nl.mu.engines
nl.mu.RUnlock()
resultCs := make([]singleflight.Future, len(engines))
for i, eng := range engines {
resultCs := make([]singleflight.Future, len(nl.engines))
for i, eng := range nl.engines {
eng := eng // pin the loop variable
resultCs[i], _ = nl.engineSyncs.DoChan(ctx,
strconv.Itoa(i),
Expand Down Expand Up @@ -1404,18 +1419,16 @@ func (nl *NodeLiveness) updateLivenessAttempt(
return Record{}, err
}

nl.mu.RLock()
cb := nl.mu.onSelfLive
nl.mu.RUnlock()
if cb != nil {
cb(ctx)
if nl.onSelfHeartbeat != nil {
nl.onSelfHeartbeat(ctx)
}
return Record{Liveness: update.newLiveness, raw: v.TagAndDataBytes()}, nil
}

// maybeUpdate replaces the liveness (if it appears newer) and invokes the
// registered callbacks if the node became live in the process.
func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record) {

if newLivenessRec.Liveness == (livenesspb.Liveness{}) {
log.Fatal(ctx, "invalid new liveness record; found to be empty")
}
Expand All @@ -1436,7 +1449,14 @@ func (nl *NodeLiveness) maybeUpdate(ctx context.Context, newLivenessRec Record)
var onIsLive []IsLiveCallback
if shouldReplace {
nl.mu.nodes[newLivenessRec.NodeID] = newLivenessRec
onIsLive = append(onIsLive, nl.mu.onIsLive...)
// NB: If we are not started, we don't call the onIsLive callbacks since
// they can volatile. This is a bit of a tangled mess since the startup of
// liveness requires the stores to be started, but stores can't start until
// maybeUpdate can run. Ideally we could cache all these updates and
// initialize onIsLive as part of start.
if nl.started.Get() {
onIsLive = append(onIsLive, nl.onIsLive...)
}
}
nl.mu.Unlock()

Expand Down
24 changes: 16 additions & 8 deletions pkg/kv/kvserver/node_liveness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,27 @@ func TestNodeIsLiveCallback(t *testing.T) {

ctx := context.Background()
manualClock := hlc.NewHybridManualClock()
var started syncutil.AtomicBool
var cbMu syncutil.Mutex
cbs := map[roachpb.NodeID]struct{}{}
tc := testcluster.StartTestCluster(t, 3,
base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,

ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
WallClock: manualClock,
},
NodeLiveness: kvserver.NodeLivenessTestingKnobs{
IsLiveCallback: func(l livenesspb.Liveness) {
if started.Get() {
cbMu.Lock()
defer cbMu.Unlock()
cbs[l.NodeID] = struct{}{}
}
},
},
},
},
})
Expand All @@ -307,14 +320,9 @@ func TestNodeIsLiveCallback(t *testing.T) {
// Verify liveness of all nodes for all nodes.
verifyLiveness(t, tc)
pauseNodeLivenessHeartbeatLoops(tc)

var cbMu syncutil.Mutex
cbs := map[roachpb.NodeID]struct{}{}
tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).RegisterCallback(func(l livenesspb.Liveness) {
cbMu.Lock()
defer cbMu.Unlock()
cbs[l.NodeID] = struct{}{}
})
// Only record entires after we have paused the normal heartbeat loop to make
// sure they come from the Heartbeat below.
started.Set(true)

// Advance clock past the liveness threshold.
manualClock.Increment(tc.Servers[0].NodeLiveness().(*liveness.NodeLiveness).GetLivenessThreshold().Nanoseconds() + 1)
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -494,6 +495,8 @@ type NodeLivenessTestingKnobs struct {
// StorePoolNodeLivenessFn is the function used by the StorePool to determine
// whether a node is live or not.
StorePoolNodeLivenessFn storepool.NodeLivenessFunc
// IsLiveCallback, will be called when a node becomes live.
IsLiveCallback liveness.IsLiveCallback
}

var _ base.ModuleTestingKnobs = NodeLivenessTestingKnobs{}
Expand Down
33 changes: 19 additions & 14 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,13 +514,28 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {

decomNodeMap.onNodeDecommissioned(liveness.NodeID)
},
Engines: engines,
OnSelfHeartbeat: func(ctx context.Context) {
now := clock.Now()
if err := stores.VisitStores(func(s *kvserver.Store) error {
return s.WriteLastUpTimestamp(ctx, now)
}); err != nil {
log.Ops.Warningf(ctx, "writing last up timestamp: %v", err)
}
},
})

registry.AddMetricStruct(nodeLiveness.Metrics())

nodeLivenessFn := storepool.MakeStorePoolNodeLivenessFunc(nodeLiveness)
if nodeLivenessKnobs, ok := cfg.TestingKnobs.NodeLiveness.(kvserver.NodeLivenessTestingKnobs); ok &&
nodeLivenessKnobs.StorePoolNodeLivenessFn != nil {
nodeLivenessFn = nodeLivenessKnobs.StorePoolNodeLivenessFn
if nodeLivenessKnobs, ok := cfg.TestingKnobs.NodeLiveness.(kvserver.NodeLivenessTestingKnobs); ok {
if nodeLivenessKnobs.StorePoolNodeLivenessFn != nil {
nodeLivenessFn = nodeLivenessKnobs.StorePoolNodeLivenessFn
}

if nodeLivenessKnobs.IsLiveCallback != nil {
nodeLiveness.RegisterCallback(nodeLivenessKnobs.IsLiveCallback)
}
}
storePool := storepool.NewStorePool(
cfg.AmbientCtx,
Expand Down Expand Up @@ -1835,17 +1850,7 @@ func (s *Server) PreStart(ctx context.Context) error {
// Begin the node liveness heartbeat. Add a callback which records the local
// store "last up" timestamp for every store whenever the liveness record is
// updated.
s.nodeLiveness.Start(workersCtx, liveness.NodeLivenessStartOptions{
Engines: s.engines,
OnSelfLive: func(ctx context.Context) {
now := s.clock.Now()
if err := s.node.stores.VisitStores(func(s *kvserver.Store) error {
return s.WriteLastUpTimestamp(ctx, now)
}); err != nil {
log.Ops.Warningf(ctx, "writing last up timestamp: %v", err)
}
},
})
s.nodeLiveness.Start(workersCtx)

// Begin recording status summaries.
if err := s.node.startWriteNodeStatus(base.DefaultMetricsSampleInterval); err != nil {
Expand Down
11 changes: 6 additions & 5 deletions pkg/testutils/localtestcluster/local_test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto
RenewalDuration: renewal,
Settings: cfg.Settings,
HistogramWindowInterval: cfg.HistogramWindowInterval,
Engines: []storage.Engine{ltc.Eng},
})
storepool.TimeUntilStoreDead.Override(ctx, &cfg.Settings.SV, storepool.TestTimeUntilStoreDead)
cfg.StorePool = storepool.NewStorePool(
Expand Down Expand Up @@ -281,16 +282,16 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto
t.Fatalf("unable to set node descriptor: %s", err)
}

if !ltc.DisableLivenessHeartbeat {
cfg.NodeLiveness.Start(ctx,
liveness.NodeLivenessStartOptions{Engines: []storage.Engine{ltc.Eng}})
}

if err := ltc.Store.Start(ctx, ltc.stopper); err != nil {
t.Fatalf("unable to start local test cluster: %s", err)
}

ltc.Stores.AddStore(ltc.Store)

if !ltc.DisableLivenessHeartbeat {
cfg.NodeLiveness.Start(ctx)
}

ltc.Cfg = cfg
}

Expand Down