From 306c4f574aff53d44de9d02b0a562d05de347fea Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Thu, 20 Jun 2024 16:37:08 -0700 Subject: [PATCH] Add "feature-gates" flag and field in config file and add feature gate for StopGRPCServiceOnDefrag Signed-off-by: Siyuan Zhang --- etcd.conf.yml.sample | 3 + pkg/featuregate/feature_gate.go | 21 ++++- pkg/featuregate/feature_gate_test.go | 8 +- server/config/config.go | 3 - server/embed/config.go | 51 +++++++++++ server/embed/config_test.go | 113 +++++++++++++++++++++++++ server/embed/etcd.go | 2 +- server/etcdmain/config.go | 14 +++ server/etcdmain/config_test.go | 95 +++++++++++++++++++++ server/etcdmain/help.go | 4 +- server/etcdserver/api/v3rpc/health.go | 3 +- server/features/etcd_features.go | 65 ++++++++++++++ tests/e2e/failover_test.go | 38 +++++++++ tests/framework/e2e/cluster.go | 9 ++ tests/framework/integration/cluster.go | 62 +++++++------- 15 files changed, 444 insertions(+), 47 deletions(-) create mode 100644 server/features/etcd_features.go diff --git a/etcd.conf.yml.sample b/etcd.conf.yml.sample index 40cda38310fc..764ae813f50c 100644 --- a/etcd.conf.yml.sample +++ b/etcd.conf.yml.sample @@ -155,3 +155,6 @@ cipher-suites: [ # Limit etcd to specific TLS protocol versions tls-min-version: 'TLS1.2' tls-max-version: 'TLS1.3' + +# Comma-separated list of feature gate enablement in the format of feature=true|false +feature-gates: StopGRPCServiceOnDefrag=false,DistributedTracing=false diff --git a/pkg/featuregate/feature_gate.go b/pkg/featuregate/feature_gate.go index 60c2ab114bbc..73ec57211299 100644 --- a/pkg/featuregate/feature_gate.go +++ b/pkg/featuregate/feature_gate.go @@ -16,6 +16,7 @@ package featuregate import ( + "flag" "fmt" "sort" "strconv" @@ -30,7 +31,7 @@ import ( type Feature string const ( - flagName = "feature-gates" + FlagName = "feature-gates" // allAlphaGate is a global toggle for alpha features. Per-feature key // values override the default set by allAlphaGate. Examples: @@ -98,7 +99,7 @@ type MutableFeatureGate interface { FeatureGate // AddFlag adds a flag for setting global feature gates to the specified FlagSet. - AddFlag(fs *pflag.FlagSet) + AddFlag(fs *flag.FlagSet) // Set parses and stores flag gates for known features // from a string like feature1=true,feature2=false,... Set(value string) error @@ -121,6 +122,8 @@ type MutableFeatureGate interface { // overriding its default to true for a limited number of components without simultaneously // changing its default for all consuming components. OverrideDefault(name Feature, override bool) error + // SetLogger replaces the logger with the provided logger. + SetLogger(lg *zap.Logger) } // featureGate implements FeatureGate as well as pflag.Value for flag parsing. @@ -165,6 +168,9 @@ func setUnsetBetaGates(known map[Feature]FeatureSpec, enabled map[Feature]bool, var _ pflag.Value = &featureGate{} func New(name string, lg *zap.Logger) *featureGate { + if lg == nil { + lg = zap.NewNop() + } known := map[Feature]FeatureSpec{} for k, v := range defaultFeatures { known[k] = v @@ -349,7 +355,7 @@ func (f *featureGate) Enabled(key Feature) bool { } // AddFlag adds a flag for setting global feature gates to the specified FlagSet. -func (f *featureGate) AddFlag(fs *pflag.FlagSet) { +func (f *featureGate) AddFlag(fs *flag.FlagSet) { f.lock.Lock() // TODO(mtaufen): Shouldn't we just close it on the first Set/SetFromMap instead? // Not all components expose a feature gates flag using this AddFlag method, and @@ -359,7 +365,7 @@ func (f *featureGate) AddFlag(fs *pflag.FlagSet) { f.lock.Unlock() known := f.KnownFeatures() - fs.Var(f, flagName, ""+ + fs.Var(f, FlagName, ""+ "A set of key=value pairs that describe feature gates for alpha/experimental features. "+ "Options are:\n"+strings.Join(known, "\n")) } @@ -409,3 +415,10 @@ func (f *featureGate) DeepCopy() MutableFeatureGate { return fg } + +// SetLogger replaces the logger with the provided logger. +func (f *featureGate) SetLogger(lg *zap.Logger) { + f.lock.Lock() + defer f.lock.Unlock() + f.lg = lg +} diff --git a/pkg/featuregate/feature_gate_test.go b/pkg/featuregate/feature_gate_test.go index a5bdcf8bd4c9..586ad3c8bcba 100644 --- a/pkg/featuregate/feature_gate_test.go +++ b/pkg/featuregate/feature_gate_test.go @@ -15,11 +15,11 @@ package featuregate import ( + "flag" "fmt" "strings" "testing" - "github.com/spf13/pflag" "github.com/stretchr/testify/assert" "go.uber.org/zap/zaptest" ) @@ -203,7 +203,7 @@ func TestFeatureGateFlag(t *testing.T) { } for i, test := range tests { t.Run(test.arg, func(t *testing.T) { - fs := pflag.NewFlagSet("testfeaturegateflag", pflag.ContinueOnError) + fs := flag.NewFlagSet("testfeaturegateflag", flag.ContinueOnError) f := New("test", zaptest.NewLogger(t)) f.Add(map[Feature]FeatureSpec{ testAlphaGate: {Default: false, PreRelease: Alpha}, @@ -211,7 +211,7 @@ func TestFeatureGateFlag(t *testing.T) { }) f.AddFlag(fs) - err := fs.Parse([]string{fmt.Sprintf("--%s=%s", flagName, test.arg)}) + err := fs.Parse([]string{fmt.Sprintf("--%s=%s", FlagName, test.arg)}) if test.parseError != "" { if !strings.Contains(err.Error(), test.parseError) { t.Errorf("%d: Parse() Expected %v, Got %v", i, test.parseError, err) @@ -603,7 +603,7 @@ func TestFeatureGateOverrideDefault(t *testing.T) { t.Run("returns error if already added to flag set", func(t *testing.T) { f := New("test", zaptest.NewLogger(t)) - fs := pflag.NewFlagSet("test", pflag.ContinueOnError) + fs := flag.NewFlagSet("test", flag.ContinueOnError) f.AddFlag(fs) if err := f.OverrideDefault("TestFeature", true); err == nil { diff --git a/server/config/config.go b/server/config/config.go index f1cd47bffa5c..5f3a0d8e9f51 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -193,9 +193,6 @@ type ServerConfig struct { // a shared buffer in its readonly check operations. ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"` - // ExperimentalStopGRPCServiceOnDefrag enables etcd gRPC service to stop serving client requests on defragmentation. - ExperimentalStopGRPCServiceOnDefrag bool `json:"experimental-stop-grpc-service-on-defrag"` - // ExperimentalBootstrapDefragThresholdMegabytes is the minimum number of megabytes needed to be freed for etcd server to // consider running defrag during bootstrap. Needs to be set to non-zero value to take effect. ExperimentalBootstrapDefragThresholdMegabytes uint `json:"experimental-bootstrap-defrag-threshold-megabytes"` diff --git a/server/embed/config.go b/server/embed/config.go index 04f4ca0fa2d4..6f1388be1b10 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -42,6 +42,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/transport" "go.etcd.io/etcd/client/pkg/v3/types" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/pkg/v3/featuregate" "go.etcd.io/etcd/pkg/v3/flags" "go.etcd.io/etcd/pkg/v3/netutil" "go.etcd.io/etcd/server/v3/config" @@ -50,6 +51,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor" "go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery" + "go.etcd.io/etcd/server/v3/features" ) const ( @@ -455,6 +457,9 @@ type Config struct { // V2Deprecation describes phase of API & Storage V2 support V2Deprecation config.V2DeprecationEnum `json:"v2-deprecation"` + + // ServerFeatureGate is a server level feature gate + ServerFeatureGate featuregate.FeatureGate } // configYAML holds the config suitable for yaml parsing @@ -476,6 +481,8 @@ type configJSON struct { ClientSecurityJSON securityConfig `json:"client-transport-security"` PeerSecurityJSON securityConfig `json:"peer-transport-security"` + + FeatureGatesJSON string `json:"feature-gates"` } type securityConfig struct { @@ -576,6 +583,7 @@ func NewConfig() *Config { }, AutoCompactionMode: DefaultAutoCompactionMode, + ServerFeatureGate: features.NewDefaultServerFeatureGate(DefaultName, nil), } cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) return cfg @@ -762,6 +770,9 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) { // unsafe fs.BoolVar(&cfg.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.") fs.BoolVar(&cfg.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.") + + // featuregate + cfg.ServerFeatureGate.(featuregate.MutableFeatureGate).AddFlag(fs) } func ConfigFromFile(path string) (*Config, error) { @@ -785,6 +796,26 @@ func (cfg *configYAML) configFromFile(path string) error { return err } + if cfg.configJSON.FeatureGatesJSON != "" { + err = cfg.Config.ServerFeatureGate.(featuregate.MutableFeatureGate).Set(cfg.configJSON.FeatureGatesJSON) + if err != nil { + return err + } + } + var cfgMap map[string]interface{} + err = yaml.Unmarshal(b, &cfgMap) + if err != nil { + return err + } + isExperimentalFlagSet := func(expFlag string) bool { + _, ok := cfgMap[expFlag] + return ok + } + err = cfg.SetFeatureGatesFromExperimentalFlags(isExperimentalFlagSet, cfg.configJSON.FeatureGatesJSON) + if err != nil { + return err + } + if cfg.configJSON.ListenPeerURLs != "" { u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenPeerURLs, ",")) if err != nil { @@ -877,6 +908,25 @@ func (cfg *configYAML) configFromFile(path string) error { return cfg.Validate() } +// SetFeatureGatesFromExperimentalFlags sets the feature gate values if their corresponding experimental flags are +// explicitly set. +func (cfg *Config) SetFeatureGatesFromExperimentalFlags(isExperimentalFlagSet func(string) bool, featureGatesVal string) error { + // verify that the feature gate and its experimental flag are not both set at the same time. + for expFlagName, featureName := range features.ExperimentalFlagToFeatureMap { + if isExperimentalFlagSet(expFlagName) && strings.Contains(featureGatesVal, string(featureName)) { + return fmt.Errorf("cannot specified both flags: --%s=(true|false) and --%s=%s=(true|false) at the same time, please just use --%s=%s=(true|false)", + expFlagName, featuregate.FlagName, featureName, featuregate.FlagName, featureName) + } + } + + m := make(map[string]bool) + defaultEc := NewConfig() + if cfg.ExperimentalStopGRPCServiceOnDefrag != defaultEc.ExperimentalStopGRPCServiceOnDefrag { + m[string(features.StopGRPCServiceOnDefrag)] = cfg.ExperimentalStopGRPCServiceOnDefrag + } + return cfg.ServerFeatureGate.(featuregate.MutableFeatureGate).SetFromMap(m) +} + func updateCipherSuites(tls *transport.TLSInfo, ss []string) error { if len(tls.CipherSuites) > 0 && len(ss) > 0 { return fmt.Errorf("TLSInfo.CipherSuites is already specified (given %v)", ss) @@ -907,6 +957,7 @@ func (cfg *Config) Validate() error { if err := cfg.setupLogging(); err != nil { return err } + cfg.ServerFeatureGate.(featuregate.MutableFeatureGate).SetLogger(cfg.logger) if err := checkBindURLs(cfg.ListenPeerUrls); err != nil { return err } diff --git a/server/embed/config_test.go b/server/embed/config_test.go index 9153feb06e68..4d69e7a2f060 100644 --- a/server/embed/config_test.go +++ b/server/embed/config_test.go @@ -31,6 +31,8 @@ import ( "go.etcd.io/etcd/client/pkg/v3/srv" "go.etcd.io/etcd/client/pkg/v3/transport" "go.etcd.io/etcd/client/pkg/v3/types" + "go.etcd.io/etcd/pkg/v3/featuregate" + "go.etcd.io/etcd/server/v3/features" ) func notFoundErr(service, domain string) error { @@ -89,6 +91,117 @@ func TestConfigFileOtherFields(t *testing.T) { assert.Equal(t, false, cfg.SocketOpts.ReuseAddress, "ReuseAddress does not match") } +func TestConfigFileFeatureGates(t *testing.T) { + testCases := []struct { + name string + featureGatesJSON string + experimentalStopGRPCServiceOnDefrag bool + setExperimentalStopGRPCServiceOnDefrag bool + expectErr bool + expectedFeatures map[featuregate.Feature]bool + }{ + { + name: "default", + expectedFeatures: map[featuregate.Feature]bool{ + features.DistributedTracing: false, + features.StopGRPCServiceOnDefrag: false, + }, + }, + { + name: "cannot set both experimental flag and feature gate flag", + featureGatesJSON: "StopGRPCServiceOnDefrag=true", + experimentalStopGRPCServiceOnDefrag: false, + setExperimentalStopGRPCServiceOnDefrag: true, + expectErr: true, + }, + { + name: "ok to set different experimental flag and feature gate flag", + featureGatesJSON: "DistributedTracing=false", + experimentalStopGRPCServiceOnDefrag: true, + setExperimentalStopGRPCServiceOnDefrag: true, + expectedFeatures: map[featuregate.Feature]bool{ + features.DistributedTracing: false, + features.StopGRPCServiceOnDefrag: true, + }, + }, + { + name: "can set feature gate to true from experimental flag", + experimentalStopGRPCServiceOnDefrag: true, + setExperimentalStopGRPCServiceOnDefrag: true, + expectedFeatures: map[featuregate.Feature]bool{ + features.StopGRPCServiceOnDefrag: true, + }, + }, + { + name: "can set feature gate to false from experimental flag", + experimentalStopGRPCServiceOnDefrag: false, + setExperimentalStopGRPCServiceOnDefrag: true, + expectedFeatures: map[featuregate.Feature]bool{ + features.StopGRPCServiceOnDefrag: false, + }, + }, + { + name: "can set feature gate to true from feature gate flag", + featureGatesJSON: "StopGRPCServiceOnDefrag=true", + expectedFeatures: map[featuregate.Feature]bool{ + features.StopGRPCServiceOnDefrag: true, + }, + }, + { + name: "can set feature gate to false from feature gate flag", + featureGatesJSON: "StopGRPCServiceOnDefrag=false", + expectedFeatures: map[featuregate.Feature]bool{ + features.StopGRPCServiceOnDefrag: false, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var b []byte + var err error + if tc.setExperimentalStopGRPCServiceOnDefrag { + yc := struct { + ExperimentalStopGRPCServiceOnDefrag bool `json:"experimental-stop-grpc-service-on-defrag"` + FeatureGatesJSON string `json:"feature-gates"` + }{ + tc.experimentalStopGRPCServiceOnDefrag, + tc.featureGatesJSON, + } + b, err = yaml.Marshal(&yc) + } else { + yc := struct { + FeatureGatesJSON string `json:"feature-gates"` + }{ + + tc.featureGatesJSON, + } + b, err = yaml.Marshal(&yc) + } + if err != nil { + t.Fatal(err) + } + + tmpfile := mustCreateCfgFile(t, b) + defer os.Remove(tmpfile.Name()) + + cfg, err := ConfigFromFile(tmpfile.Name()) + if tc.expectErr { + if err == nil { + if err == nil { + t.Fatal("expect parse error") + } + } + return + } + for k, v := range tc.expectedFeatures { + if cfg.ServerFeatureGate.Enabled(k) != v { + t.Errorf("expected feature gate %s=%v, got %v", k, v, cfg.ServerFeatureGate.Enabled(k)) + } + } + }) + } +} + // TestUpdateDefaultClusterFromName ensures that etcd can start with 'etcd --name=abc'. func TestUpdateDefaultClusterFromName(t *testing.T) { cfg := NewConfig() diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 110636b59cc7..2970a804d110 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -223,11 +223,11 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { WarningUnaryRequestDuration: cfg.WarningUnaryRequestDuration, ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock, ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer, - ExperimentalStopGRPCServiceOnDefrag: cfg.ExperimentalStopGRPCServiceOnDefrag, ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes, ExperimentalMaxLearners: cfg.ExperimentalMaxLearners, V2Deprecation: cfg.V2DeprecationEffective(), ExperimentalLocalAddress: cfg.InferLocalAddr(), + ServerFeatureGate: cfg.ServerFeatureGate, } if srvcfg.ExperimentalEnableDistributedTracing { diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 4a46192cf0ff..21480b665b83 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -28,6 +28,7 @@ import ( "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/client/pkg/v3/logutil" + "go.etcd.io/etcd/pkg/v3/featuregate" "go.etcd.io/etcd/pkg/v3/flags" cconfig "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/embed" @@ -238,6 +239,19 @@ func (cfg *config) configFromCmdLine() error { cfg.ec.InitialCluster = "" } + isExperimentalFlagSet := func(expFlag string) bool { + foundExperimentalFlag := false + cfg.cf.flagSet.Visit(func(f *flag.Flag) { + if f.Name == expFlag { + foundExperimentalFlag = true + } + }) + return foundExperimentalFlag + } + err = cfg.ec.SetFeatureGatesFromExperimentalFlags(isExperimentalFlagSet, cfg.cf.flagSet.Lookup(featuregate.FlagName).Value.String()) + if err != nil { + return err + } return cfg.validate() } diff --git a/server/etcdmain/config_test.go b/server/etcdmain/config_test.go index a49dbb4d9aa4..ef25b5faa2c7 100644 --- a/server/etcdmain/config_test.go +++ b/server/etcdmain/config_test.go @@ -25,8 +25,10 @@ import ( "sigs.k8s.io/yaml" + "go.etcd.io/etcd/pkg/v3/featuregate" "go.etcd.io/etcd/pkg/v3/flags" "go.etcd.io/etcd/server/v3/embed" + "go.etcd.io/etcd/server/v3/features" ) func TestConfigParsingMemberFlags(t *testing.T) { @@ -395,6 +397,99 @@ func TestFlagsPresentInHelp(t *testing.T) { }) } +func TestParseFeatureGateFlags(t *testing.T) { + testCases := []struct { + name string + args []string + expectErr bool + expectedFeatures map[featuregate.Feature]bool + }{ + { + name: "default", + expectedFeatures: map[featuregate.Feature]bool{ + features.DistributedTracing: false, + features.StopGRPCServiceOnDefrag: false, + }, + }, + { + name: "cannot set both experimental flag and feature gate flag", + args: []string{ + "--experimental-stop-grpc-service-on-defrag=false", + "--feature-gates=StopGRPCServiceOnDefrag=true", + }, + expectErr: true, + }, + { + name: "ok to set different experimental flag and feature gate flag", + args: []string{ + "--experimental-stop-grpc-service-on-defrag=true", + "--feature-gates=DistributedTracing=false", + }, + expectedFeatures: map[featuregate.Feature]bool{ + features.DistributedTracing: false, + features.StopGRPCServiceOnDefrag: true, + }, + }, + { + name: "can set feature gate to true from experimental flag", + args: []string{ + "--experimental-stop-grpc-service-on-defrag=true", + }, + expectedFeatures: map[featuregate.Feature]bool{ + features.StopGRPCServiceOnDefrag: true, + }, + }, + { + name: "can set feature gate to false from experimental flag", + args: []string{ + "--experimental-stop-grpc-service-on-defrag=false", + }, + expectedFeatures: map[featuregate.Feature]bool{ + features.StopGRPCServiceOnDefrag: false, + }, + }, + { + name: "can set feature gate to true from feature gate flag", + args: []string{ + "--feature-gates=StopGRPCServiceOnDefrag=true", + }, + expectedFeatures: map[featuregate.Feature]bool{ + features.StopGRPCServiceOnDefrag: true, + }, + }, + { + name: "can set feature gate to false from feature gate flag", + args: []string{ + "--feature-gates=StopGRPCServiceOnDefrag=false", + }, + expectedFeatures: map[featuregate.Feature]bool{ + features.StopGRPCServiceOnDefrag: false, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cfg := newConfig() + err := cfg.parse(tc.args) + if tc.expectErr { + if err == nil { + t.Fatal("expect parse error") + } + return + } + if err != nil { + t.Fatal(err) + } + for k, v := range tc.expectedFeatures { + if cfg.ec.ServerFeatureGate.Enabled(k) != v { + t.Errorf("expected feature gate %s=%v, got %v", k, v, cfg.ec.ServerFeatureGate.Enabled(k)) + } + } + }) + } +} + func mustCreateCfgFile(t *testing.T, b []byte) *os.File { tmpfile, err := os.CreateTemp("", "servercfg") if err != nil { diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index d9bb14525e5c..a245c6c5df29 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -103,6 +103,8 @@ Member: Read timeout set on each rafthttp connection --raft-write-timeout '` + rafthttp.DefaultConnWriteTimeout.String() + `' Write timeout set on each rafthttp connection + --feature-gates '' + A set of key=value pairs that describe feature gates for alpha/experimental features. Clustering: --initial-advertise-peer-urls 'http://localhost:2380' @@ -308,7 +310,7 @@ Experimental feature: --experimental-snapshot-catchup-entries Number of entries for a slow follower to catch up after compacting the raft storage entries. --experimental-stop-grpc-service-on-defrag - Enable etcd gRPC service to stop serving client requests on defragmentation. + Enable etcd gRPC service to stop serving client requests on defragmentation. TO BE DEPRECATED, use '--feature-gates=StopGRPCServiceOnDefrag=true' instead. Unsafe feature: --force-new-cluster 'false' diff --git a/server/etcdserver/api/v3rpc/health.go b/server/etcdserver/api/v3rpc/health.go index e87140d17432..fd683cdc21d9 100644 --- a/server/etcdserver/api/v3rpc/health.go +++ b/server/etcdserver/api/v3rpc/health.go @@ -20,6 +20,7 @@ import ( healthpb "google.golang.org/grpc/health/grpc_health_v1" "go.etcd.io/etcd/server/v3/etcdserver" + "go.etcd.io/etcd/server/v3/features" ) const ( @@ -35,7 +36,7 @@ func newHealthNotifier(hs *health.Server, s *etcdserver.EtcdServer) notifier { if hs == nil { panic("unexpected nil gRPC health server") } - hc := &healthNotifier{hs: hs, lg: s.Logger(), stopGRPCServiceOnDefrag: s.Cfg.ExperimentalStopGRPCServiceOnDefrag} + hc := &healthNotifier{hs: hs, lg: s.Logger(), stopGRPCServiceOnDefrag: s.Cfg.ServerFeatureGate.Enabled(features.StopGRPCServiceOnDefrag)} // set grpc health server as serving status blindly since // the grpc server will serve iff s.ReadyNotify() is closed. hc.startServe() diff --git a/server/features/etcd_features.go b/server/features/etcd_features.go new file mode 100644 index 000000000000..fd4640d7443d --- /dev/null +++ b/server/features/etcd_features.go @@ -0,0 +1,65 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package features + +import ( + "fmt" + + "go.uber.org/zap" + + "go.etcd.io/etcd/pkg/v3/featuregate" +) + +const ( + // Every feature gate should add method here following this template: + // + // // owner: @username + // // kep: https://kep.k8s.io/NNN (or issue: https://github.com/etcd-io/etcd/issues/NNN) + // // alpha: v3.X + // MyFeature featuregate.Feature = "MyFeature" + // + // Feature gates should be listed in alphabetical, case-sensitive + // (upper before any lower case character) order. This reduces the risk + // of code conflicts because changes are more likely to be scattered + // across the file. + + // DistributedTracing enables experimental distributed tracing using OpenTelemetry Tracing. + // alpha: v3.5 + // issue: https://github.com/etcd-io/etcd/issues/12460 + DistributedTracing featuregate.Feature = "DistributedTracing" + // StopGRPCServiceOnDefrag enables etcd gRPC service to stop serving client requests on defragmentation. + // owner: @chaochn47 + // alpha: v3.6 + StopGRPCServiceOnDefrag featuregate.Feature = "StopGRPCServiceOnDefrag" +) + +var ( + DefaultEtcdServerFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ + DistributedTracing: {Default: false, PreRelease: featuregate.Alpha}, + StopGRPCServiceOnDefrag: {Default: false, PreRelease: featuregate.Alpha}, + } + ExperimentalFlagToFeatureMap = map[string]featuregate.Feature{ + "experimental-enable-distributed-tracing": DistributedTracing, + "experimental-stop-grpc-service-on-defrag": StopGRPCServiceOnDefrag, + } +) + +func NewDefaultServerFeatureGate(name string, lg *zap.Logger) featuregate.FeatureGate { + fg := featuregate.New(fmt.Sprintf("%sServerFeatureGate", name), lg) + if err := fg.Add(DefaultEtcdServerFeatureGates); err != nil { + panic(err) + } + return fg +} diff --git a/tests/e2e/failover_test.go b/tests/e2e/failover_test.go index aec467fcc9cc..878603673485 100644 --- a/tests/e2e/failover_test.go +++ b/tests/e2e/failover_test.go @@ -93,6 +93,44 @@ func TestFailoverOnDefrag(t *testing.T) { expectedMinQPS: 20, expectedMinFailureRate: 0.25, }, + { + name: "defrag failover happy case with feature gate", + clusterOptions: []e2e.EPClusterOption{ + e2e.WithClusterSize(3), + e2e.WithServerFeatureGate("StopGRPCServiceOnDefrag", true), + e2e.WithGoFailEnabled(true), + }, + gRPCDialOptions: []grpc.DialOption{ + grpc.WithDisableServiceConfig(), + grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`), + }, + expectedMinQPS: 20, + expectedMaxFailureRate: 0.01, + }, + { + name: "defrag blocks one-third of requests with StopGRPCServiceOnDefrag feature gate set to false", + clusterOptions: []e2e.EPClusterOption{ + e2e.WithClusterSize(3), + e2e.WithServerFeatureGate("StopGRPCServiceOnDefrag", false), + e2e.WithGoFailEnabled(true), + }, + gRPCDialOptions: []grpc.DialOption{ + grpc.WithDisableServiceConfig(), + grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin", "healthCheckConfig": {"serviceName": ""}}`), + }, + expectedMinQPS: 20, + expectedMinFailureRate: 0.25, + }, + { + name: "defrag blocks one-third of requests with StopGRPCServiceOnDefrag feature gate set to true and client health check disabled", + clusterOptions: []e2e.EPClusterOption{ + e2e.WithClusterSize(3), + e2e.WithServerFeatureGate("StopGRPCServiceOnDefrag", true), + e2e.WithGoFailEnabled(true), + }, + expectedMinQPS: 20, + expectedMinFailureRate: 0.25, + }, } for _, tc := range tcs { diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index cb8b35d7fd85..23d422aa3130 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -32,6 +32,7 @@ import ( "go.etcd.io/etcd/api/v3/etcdserverpb" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/pkg/v3/featuregate" "go.etcd.io/etcd/pkg/v3/proxy" "go.etcd.io/etcd/server/v3/embed" "go.etcd.io/etcd/server/v3/etcdserver" @@ -358,6 +359,14 @@ func WithExperimentalStopGRPCServiceOnDefrag(stopGRPCServiceOnDefrag bool) EPClu } } +func WithServerFeatureGate(featureName string, val bool) EPClusterOption { + return func(c *EtcdProcessClusterConfig) { + if err := c.ServerConfig.ServerFeatureGate.(featuregate.MutableFeatureGate).Set(fmt.Sprintf("%s=%v", featureName, val)); err != nil { + panic(err) + } + } +} + func WithCompactionBatchLimit(limit int) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.ServerConfig.ExperimentalCompactionBatchLimit = limit } } diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index e3ef2a448ad3..95b5c88d9f80 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -63,6 +63,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/v3lock" lockpb "go.etcd.io/etcd/server/v3/etcdserver/api/v3lock/v3lockpb" "go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc" + "go.etcd.io/etcd/server/v3/features" "go.etcd.io/etcd/server/v3/verify" framecfg "go.etcd.io/etcd/tests/v3/framework/config" "go.etcd.io/etcd/tests/v3/framework/testutils" @@ -174,8 +175,6 @@ type ClusterConfig struct { ExperimentalMaxLearners int DisableStrictReconfigCheck bool CorruptCheckTime time.Duration - - ExperimentalStopGRPCServiceOnDefrag bool } type Cluster struct { @@ -266,34 +265,33 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member { m := MustNewMember(t, MemberConfig{ - Name: fmt.Sprintf("m%v", memberNumber), - MemberNumber: memberNumber, - AuthToken: c.Cfg.AuthToken, - PeerTLS: c.Cfg.PeerTLS, - ClientTLS: c.Cfg.ClientTLS, - QuotaBackendBytes: c.Cfg.QuotaBackendBytes, - BackendBatchInterval: c.Cfg.BackendBatchInterval, - MaxTxnOps: c.Cfg.MaxTxnOps, - MaxRequestBytes: c.Cfg.MaxRequestBytes, - SnapshotCount: c.Cfg.SnapshotCount, - SnapshotCatchUpEntries: c.Cfg.SnapshotCatchUpEntries, - GRPCKeepAliveMinTime: c.Cfg.GRPCKeepAliveMinTime, - GRPCKeepAliveInterval: c.Cfg.GRPCKeepAliveInterval, - GRPCKeepAliveTimeout: c.Cfg.GRPCKeepAliveTimeout, - GRPCAdditionalServerOptions: c.Cfg.GRPCAdditionalServerOptions, - ClientMaxCallSendMsgSize: c.Cfg.ClientMaxCallSendMsgSize, - ClientMaxCallRecvMsgSize: c.Cfg.ClientMaxCallRecvMsgSize, - UseIP: c.Cfg.UseIP, - UseBridge: c.Cfg.UseBridge, - UseTCP: c.Cfg.UseTCP, - EnableLeaseCheckpoint: c.Cfg.EnableLeaseCheckpoint, - LeaseCheckpointInterval: c.Cfg.LeaseCheckpointInterval, - LeaseCheckpointPersist: c.Cfg.LeaseCheckpointPersist, - WatchProgressNotifyInterval: c.Cfg.WatchProgressNotifyInterval, - ExperimentalMaxLearners: c.Cfg.ExperimentalMaxLearners, - DisableStrictReconfigCheck: c.Cfg.DisableStrictReconfigCheck, - CorruptCheckTime: c.Cfg.CorruptCheckTime, - ExperimentalStopGRPCServiceOnDefrag: c.Cfg.ExperimentalStopGRPCServiceOnDefrag, + Name: fmt.Sprintf("m%v", memberNumber), + MemberNumber: memberNumber, + AuthToken: c.Cfg.AuthToken, + PeerTLS: c.Cfg.PeerTLS, + ClientTLS: c.Cfg.ClientTLS, + QuotaBackendBytes: c.Cfg.QuotaBackendBytes, + BackendBatchInterval: c.Cfg.BackendBatchInterval, + MaxTxnOps: c.Cfg.MaxTxnOps, + MaxRequestBytes: c.Cfg.MaxRequestBytes, + SnapshotCount: c.Cfg.SnapshotCount, + SnapshotCatchUpEntries: c.Cfg.SnapshotCatchUpEntries, + GRPCKeepAliveMinTime: c.Cfg.GRPCKeepAliveMinTime, + GRPCKeepAliveInterval: c.Cfg.GRPCKeepAliveInterval, + GRPCKeepAliveTimeout: c.Cfg.GRPCKeepAliveTimeout, + GRPCAdditionalServerOptions: c.Cfg.GRPCAdditionalServerOptions, + ClientMaxCallSendMsgSize: c.Cfg.ClientMaxCallSendMsgSize, + ClientMaxCallRecvMsgSize: c.Cfg.ClientMaxCallRecvMsgSize, + UseIP: c.Cfg.UseIP, + UseBridge: c.Cfg.UseBridge, + UseTCP: c.Cfg.UseTCP, + EnableLeaseCheckpoint: c.Cfg.EnableLeaseCheckpoint, + LeaseCheckpointInterval: c.Cfg.LeaseCheckpointInterval, + LeaseCheckpointPersist: c.Cfg.LeaseCheckpointPersist, + WatchProgressNotifyInterval: c.Cfg.WatchProgressNotifyInterval, + ExperimentalMaxLearners: c.Cfg.ExperimentalMaxLearners, + DisableStrictReconfigCheck: c.Cfg.DisableStrictReconfigCheck, + CorruptCheckTime: c.Cfg.CorruptCheckTime, }) m.DiscoveryURL = c.Cfg.DiscoveryURL return m @@ -619,8 +617,6 @@ type MemberConfig struct { ExperimentalMaxLearners int DisableStrictReconfigCheck bool CorruptCheckTime time.Duration - - ExperimentalStopGRPCServiceOnDefrag bool } // MustNewMember return an inited member with the given name. If peerTLS is @@ -729,7 +725,6 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member { if mcfg.CorruptCheckTime > time.Duration(0) { m.CorruptCheckTime = mcfg.CorruptCheckTime } - m.ExperimentalStopGRPCServiceOnDefrag = mcfg.ExperimentalStopGRPCServiceOnDefrag m.WarningApplyDuration = embed.DefaultWarningApplyDuration m.WarningUnaryRequestDuration = embed.DefaultWarningUnaryRequestDuration m.ExperimentalMaxLearners = membership.DefaultMaxLearners @@ -740,6 +735,7 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member { m.GRPCServerRecorder = &grpctesting.GRPCRecorder{} m.Logger, m.LogObserver = memberLogger(t, mcfg.Name) + m.ServerFeatureGate = features.NewDefaultServerFeatureGate(m.Name, m.Logger) m.StrictReconfigCheck = !mcfg.DisableStrictReconfigCheck if err := m.listenGRPC(); err != nil {