Skip to content

Commit

Permalink
Add "feature-gates" flag and field in config file and add feature gat…
Browse files Browse the repository at this point in the history
…e for StopGRPCServiceOnDefrag

Signed-off-by: Siyuan Zhang <[email protected]>
  • Loading branch information
siyuanfoundation committed Jun 26, 2024
1 parent d80d0f0 commit 306c4f5
Show file tree
Hide file tree
Showing 15 changed files with 444 additions and 47 deletions.
3 changes: 3 additions & 0 deletions etcd.conf.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,6 @@ cipher-suites: [
# Limit etcd to specific TLS protocol versions
tls-min-version: 'TLS1.2'
tls-max-version: 'TLS1.3'

# Comma-separated list of feature gate enablement in the format of feature=true|false
feature-gates: StopGRPCServiceOnDefrag=false,DistributedTracing=false
21 changes: 17 additions & 4 deletions pkg/featuregate/feature_gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package featuregate

import (
"flag"
"fmt"
"sort"
"strconv"
Expand All @@ -30,7 +31,7 @@ import (
type Feature string

const (
flagName = "feature-gates"
FlagName = "feature-gates"

// allAlphaGate is a global toggle for alpha features. Per-feature key
// values override the default set by allAlphaGate. Examples:
Expand Down Expand Up @@ -98,7 +99,7 @@ type MutableFeatureGate interface {
FeatureGate

// AddFlag adds a flag for setting global feature gates to the specified FlagSet.
AddFlag(fs *pflag.FlagSet)
AddFlag(fs *flag.FlagSet)
// Set parses and stores flag gates for known features
// from a string like feature1=true,feature2=false,...
Set(value string) error
Expand All @@ -121,6 +122,8 @@ type MutableFeatureGate interface {
// overriding its default to true for a limited number of components without simultaneously
// changing its default for all consuming components.
OverrideDefault(name Feature, override bool) error
// SetLogger replaces the logger with the provided logger.
SetLogger(lg *zap.Logger)
}

// featureGate implements FeatureGate as well as pflag.Value for flag parsing.
Expand Down Expand Up @@ -165,6 +168,9 @@ func setUnsetBetaGates(known map[Feature]FeatureSpec, enabled map[Feature]bool,
var _ pflag.Value = &featureGate{}

func New(name string, lg *zap.Logger) *featureGate {
if lg == nil {
lg = zap.NewNop()
}
known := map[Feature]FeatureSpec{}
for k, v := range defaultFeatures {
known[k] = v
Expand Down Expand Up @@ -349,7 +355,7 @@ func (f *featureGate) Enabled(key Feature) bool {
}

// AddFlag adds a flag for setting global feature gates to the specified FlagSet.
func (f *featureGate) AddFlag(fs *pflag.FlagSet) {
func (f *featureGate) AddFlag(fs *flag.FlagSet) {
f.lock.Lock()
// TODO(mtaufen): Shouldn't we just close it on the first Set/SetFromMap instead?
// Not all components expose a feature gates flag using this AddFlag method, and
Expand All @@ -359,7 +365,7 @@ func (f *featureGate) AddFlag(fs *pflag.FlagSet) {
f.lock.Unlock()

known := f.KnownFeatures()
fs.Var(f, flagName, ""+
fs.Var(f, FlagName, ""+
"A set of key=value pairs that describe feature gates for alpha/experimental features. "+
"Options are:\n"+strings.Join(known, "\n"))
}
Expand Down Expand Up @@ -409,3 +415,10 @@ func (f *featureGate) DeepCopy() MutableFeatureGate {

return fg
}

// SetLogger replaces the logger with the provided logger.
func (f *featureGate) SetLogger(lg *zap.Logger) {
f.lock.Lock()
defer f.lock.Unlock()
f.lg = lg
}
8 changes: 4 additions & 4 deletions pkg/featuregate/feature_gate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
package featuregate

import (
"flag"
"fmt"
"strings"
"testing"

"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
"go.uber.org/zap/zaptest"
)
Expand Down Expand Up @@ -203,15 +203,15 @@ func TestFeatureGateFlag(t *testing.T) {
}
for i, test := range tests {
t.Run(test.arg, func(t *testing.T) {
fs := pflag.NewFlagSet("testfeaturegateflag", pflag.ContinueOnError)
fs := flag.NewFlagSet("testfeaturegateflag", flag.ContinueOnError)
f := New("test", zaptest.NewLogger(t))
f.Add(map[Feature]FeatureSpec{
testAlphaGate: {Default: false, PreRelease: Alpha},
testBetaGate: {Default: false, PreRelease: Beta},
})
f.AddFlag(fs)

err := fs.Parse([]string{fmt.Sprintf("--%s=%s", flagName, test.arg)})
err := fs.Parse([]string{fmt.Sprintf("--%s=%s", FlagName, test.arg)})
if test.parseError != "" {
if !strings.Contains(err.Error(), test.parseError) {
t.Errorf("%d: Parse() Expected %v, Got %v", i, test.parseError, err)
Expand Down Expand Up @@ -603,7 +603,7 @@ func TestFeatureGateOverrideDefault(t *testing.T) {

t.Run("returns error if already added to flag set", func(t *testing.T) {
f := New("test", zaptest.NewLogger(t))
fs := pflag.NewFlagSet("test", pflag.ContinueOnError)
fs := flag.NewFlagSet("test", flag.ContinueOnError)
f.AddFlag(fs)

if err := f.OverrideDefault("TestFeature", true); err == nil {
Expand Down
3 changes: 0 additions & 3 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,6 @@ type ServerConfig struct {
// a shared buffer in its readonly check operations.
ExperimentalTxnModeWriteWithSharedBuffer bool `json:"experimental-txn-mode-write-with-shared-buffer"`

// ExperimentalStopGRPCServiceOnDefrag enables etcd gRPC service to stop serving client requests on defragmentation.
ExperimentalStopGRPCServiceOnDefrag bool `json:"experimental-stop-grpc-service-on-defrag"`

// ExperimentalBootstrapDefragThresholdMegabytes is the minimum number of megabytes needed to be freed for etcd server to
// consider running defrag during bootstrap. Needs to be set to non-zero value to take effect.
ExperimentalBootstrapDefragThresholdMegabytes uint `json:"experimental-bootstrap-defrag-threshold-megabytes"`
Expand Down
51 changes: 51 additions & 0 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/pkg/v3/types"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/featuregate"
"go.etcd.io/etcd/pkg/v3/flags"
"go.etcd.io/etcd/pkg/v3/netutil"
"go.etcd.io/etcd/server/v3/config"
Expand All @@ -50,6 +51,7 @@ import (
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery"
"go.etcd.io/etcd/server/v3/features"
)

const (
Expand Down Expand Up @@ -455,6 +457,9 @@ type Config struct {

// V2Deprecation describes phase of API & Storage V2 support
V2Deprecation config.V2DeprecationEnum `json:"v2-deprecation"`

// ServerFeatureGate is a server level feature gate
ServerFeatureGate featuregate.FeatureGate
}

// configYAML holds the config suitable for yaml parsing
Expand All @@ -476,6 +481,8 @@ type configJSON struct {

ClientSecurityJSON securityConfig `json:"client-transport-security"`
PeerSecurityJSON securityConfig `json:"peer-transport-security"`

FeatureGatesJSON string `json:"feature-gates"`
}

type securityConfig struct {
Expand Down Expand Up @@ -576,6 +583,7 @@ func NewConfig() *Config {
},

AutoCompactionMode: DefaultAutoCompactionMode,
ServerFeatureGate: features.NewDefaultServerFeatureGate(DefaultName, nil),
}
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
return cfg
Expand Down Expand Up @@ -762,6 +770,9 @@ func (cfg *Config) AddFlags(fs *flag.FlagSet) {
// unsafe
fs.BoolVar(&cfg.UnsafeNoFsync, "unsafe-no-fsync", false, "Disables fsync, unsafe, will cause data loss.")
fs.BoolVar(&cfg.ForceNewCluster, "force-new-cluster", false, "Force to create a new one member cluster.")

// featuregate
cfg.ServerFeatureGate.(featuregate.MutableFeatureGate).AddFlag(fs)
}

func ConfigFromFile(path string) (*Config, error) {
Expand All @@ -785,6 +796,26 @@ func (cfg *configYAML) configFromFile(path string) error {
return err
}

if cfg.configJSON.FeatureGatesJSON != "" {
err = cfg.Config.ServerFeatureGate.(featuregate.MutableFeatureGate).Set(cfg.configJSON.FeatureGatesJSON)
if err != nil {
return err
}
}
var cfgMap map[string]interface{}
err = yaml.Unmarshal(b, &cfgMap)
if err != nil {
return err
}
isExperimentalFlagSet := func(expFlag string) bool {
_, ok := cfgMap[expFlag]
return ok
}
err = cfg.SetFeatureGatesFromExperimentalFlags(isExperimentalFlagSet, cfg.configJSON.FeatureGatesJSON)
if err != nil {
return err
}

if cfg.configJSON.ListenPeerURLs != "" {
u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenPeerURLs, ","))
if err != nil {
Expand Down Expand Up @@ -877,6 +908,25 @@ func (cfg *configYAML) configFromFile(path string) error {
return cfg.Validate()
}

// SetFeatureGatesFromExperimentalFlags sets the feature gate values if their corresponding experimental flags are
// explicitly set.
func (cfg *Config) SetFeatureGatesFromExperimentalFlags(isExperimentalFlagSet func(string) bool, featureGatesVal string) error {
// verify that the feature gate and its experimental flag are not both set at the same time.
for expFlagName, featureName := range features.ExperimentalFlagToFeatureMap {
if isExperimentalFlagSet(expFlagName) && strings.Contains(featureGatesVal, string(featureName)) {
return fmt.Errorf("cannot specified both flags: --%s=(true|false) and --%s=%s=(true|false) at the same time, please just use --%s=%s=(true|false)",
expFlagName, featuregate.FlagName, featureName, featuregate.FlagName, featureName)
}
}

m := make(map[string]bool)
defaultEc := NewConfig()
if cfg.ExperimentalStopGRPCServiceOnDefrag != defaultEc.ExperimentalStopGRPCServiceOnDefrag {
m[string(features.StopGRPCServiceOnDefrag)] = cfg.ExperimentalStopGRPCServiceOnDefrag
}
return cfg.ServerFeatureGate.(featuregate.MutableFeatureGate).SetFromMap(m)
}

func updateCipherSuites(tls *transport.TLSInfo, ss []string) error {
if len(tls.CipherSuites) > 0 && len(ss) > 0 {
return fmt.Errorf("TLSInfo.CipherSuites is already specified (given %v)", ss)
Expand Down Expand Up @@ -907,6 +957,7 @@ func (cfg *Config) Validate() error {
if err := cfg.setupLogging(); err != nil {
return err
}
cfg.ServerFeatureGate.(featuregate.MutableFeatureGate).SetLogger(cfg.logger)
if err := checkBindURLs(cfg.ListenPeerUrls); err != nil {
return err
}
Expand Down
113 changes: 113 additions & 0 deletions server/embed/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"go.etcd.io/etcd/client/pkg/v3/srv"
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/featuregate"
"go.etcd.io/etcd/server/v3/features"
)

func notFoundErr(service, domain string) error {
Expand Down Expand Up @@ -89,6 +91,117 @@ func TestConfigFileOtherFields(t *testing.T) {
assert.Equal(t, false, cfg.SocketOpts.ReuseAddress, "ReuseAddress does not match")
}

func TestConfigFileFeatureGates(t *testing.T) {
testCases := []struct {
name string
featureGatesJSON string
experimentalStopGRPCServiceOnDefrag bool
setExperimentalStopGRPCServiceOnDefrag bool
expectErr bool
expectedFeatures map[featuregate.Feature]bool
}{
{
name: "default",
expectedFeatures: map[featuregate.Feature]bool{
features.DistributedTracing: false,
features.StopGRPCServiceOnDefrag: false,
},
},
{
name: "cannot set both experimental flag and feature gate flag",
featureGatesJSON: "StopGRPCServiceOnDefrag=true",
experimentalStopGRPCServiceOnDefrag: false,
setExperimentalStopGRPCServiceOnDefrag: true,
expectErr: true,
},
{
name: "ok to set different experimental flag and feature gate flag",
featureGatesJSON: "DistributedTracing=false",
experimentalStopGRPCServiceOnDefrag: true,
setExperimentalStopGRPCServiceOnDefrag: true,
expectedFeatures: map[featuregate.Feature]bool{
features.DistributedTracing: false,
features.StopGRPCServiceOnDefrag: true,
},
},
{
name: "can set feature gate to true from experimental flag",
experimentalStopGRPCServiceOnDefrag: true,
setExperimentalStopGRPCServiceOnDefrag: true,
expectedFeatures: map[featuregate.Feature]bool{
features.StopGRPCServiceOnDefrag: true,
},
},
{
name: "can set feature gate to false from experimental flag",
experimentalStopGRPCServiceOnDefrag: false,
setExperimentalStopGRPCServiceOnDefrag: true,
expectedFeatures: map[featuregate.Feature]bool{
features.StopGRPCServiceOnDefrag: false,
},
},
{
name: "can set feature gate to true from feature gate flag",
featureGatesJSON: "StopGRPCServiceOnDefrag=true",
expectedFeatures: map[featuregate.Feature]bool{
features.StopGRPCServiceOnDefrag: true,
},
},
{
name: "can set feature gate to false from feature gate flag",
featureGatesJSON: "StopGRPCServiceOnDefrag=false",
expectedFeatures: map[featuregate.Feature]bool{
features.StopGRPCServiceOnDefrag: false,
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var b []byte
var err error
if tc.setExperimentalStopGRPCServiceOnDefrag {
yc := struct {
ExperimentalStopGRPCServiceOnDefrag bool `json:"experimental-stop-grpc-service-on-defrag"`
FeatureGatesJSON string `json:"feature-gates"`
}{
tc.experimentalStopGRPCServiceOnDefrag,
tc.featureGatesJSON,
}
b, err = yaml.Marshal(&yc)
} else {
yc := struct {
FeatureGatesJSON string `json:"feature-gates"`
}{

tc.featureGatesJSON,
}
b, err = yaml.Marshal(&yc)
}
if err != nil {
t.Fatal(err)
}

tmpfile := mustCreateCfgFile(t, b)
defer os.Remove(tmpfile.Name())

cfg, err := ConfigFromFile(tmpfile.Name())
if tc.expectErr {
if err == nil {
if err == nil {
t.Fatal("expect parse error")
}
}
return
}
for k, v := range tc.expectedFeatures {
if cfg.ServerFeatureGate.Enabled(k) != v {
t.Errorf("expected feature gate %s=%v, got %v", k, v, cfg.ServerFeatureGate.Enabled(k))
}
}
})
}
}

// TestUpdateDefaultClusterFromName ensures that etcd can start with 'etcd --name=abc'.
func TestUpdateDefaultClusterFromName(t *testing.T) {
cfg := NewConfig()
Expand Down
2 changes: 1 addition & 1 deletion server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,11 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
WarningUnaryRequestDuration: cfg.WarningUnaryRequestDuration,
ExperimentalMemoryMlock: cfg.ExperimentalMemoryMlock,
ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
ExperimentalStopGRPCServiceOnDefrag: cfg.ExperimentalStopGRPCServiceOnDefrag,
ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes,
ExperimentalMaxLearners: cfg.ExperimentalMaxLearners,
V2Deprecation: cfg.V2DeprecationEffective(),
ExperimentalLocalAddress: cfg.InferLocalAddr(),
ServerFeatureGate: cfg.ServerFeatureGate,
}

if srvcfg.ExperimentalEnableDistributedTracing {
Expand Down
Loading

0 comments on commit 306c4f5

Please sign in to comment.