diff --git a/leaderelection/leader_election.go b/leaderelection/leader_election.go index f3065c5a..a4e5c661 100644 --- a/leaderelection/leader_election.go +++ b/leaderelection/leader_election.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io/ioutil" + "net/http" "os" "regexp" "strings" @@ -39,6 +40,14 @@ const ( defaultLeaseDuration = 15 * time.Second defaultRenewDeadline = 10 * time.Second defaultRetryPeriod = 5 * time.Second + + DefaultHealthCheckTimeout = 20 * time.Second + + // HealthCheckerAddress is the address at which the leader election health + // checker reports status. + // The caller sidecar should document this address in appropriate flag + // descriptions. + HealthCheckerAddress = "/healthz/leader-election" ) // leaderElection is a convenience wrapper around client-go's leader election library. @@ -55,6 +64,9 @@ type leaderElection struct { // valid options are resourcelock.LeasesResourceLock, resourcelock.EndpointsResourceLock, // and resourcelock.ConfigMapsResourceLock resourceLock string + // healthCheck reports unhealthy if leader election fails to renew leadership + // within a timeout period. + healthCheck *leaderelection.HealthzAdaptor leaseDuration time.Duration renewDeadline time.Duration @@ -71,6 +83,8 @@ func NewLeaderElection(clientset kubernetes.Interface, lockName string, runFunc } // NewLeaderElectionWithLeases returns an implementation of leader election using Leases +// healthCheckTimeout determines the max duration beyond lease expiration +// allowed before reporting unhealthy. func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection { return &leaderElection{ runFunc: runFunc, @@ -84,6 +98,8 @@ func NewLeaderElectionWithLeases(clientset kubernetes.Interface, lockName string } // NewLeaderElectionWithEndpoints returns an implementation of leader election using Endpoints +// healthCheckTimeout determines the max duration beyond lease expiration +// allowed before reporting unhealthy. func NewLeaderElectionWithEndpoints(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection { return &leaderElection{ runFunc: runFunc, @@ -97,6 +113,8 @@ func NewLeaderElectionWithEndpoints(clientset kubernetes.Interface, lockName str } // NewLeaderElectionWithConfigMaps returns an implementation of leader election using ConfigMaps +// healthCheckTimeout determines the max duration beyond lease expiration +// allowed before reporting unhealthy. func NewLeaderElectionWithConfigMaps(clientset kubernetes.Interface, lockName string, runFunc func(ctx context.Context)) *leaderElection { return &leaderElection{ runFunc: runFunc, @@ -134,6 +152,27 @@ func (l *leaderElection) WithContext(ctx context.Context) { l.ctx = ctx } +// Server represents any type that could serve HTTP requests for the leader +// election health check endpoint. +type Server interface { + Handle(pattern string, handler http.Handler) +} + +// PrepareHealthCheck creates a health check for this leader election object +// with the given healthCheckTimeout and registers its HTTP handler to the given +// server at the path specified by the constant "healthCheckerAddress". +// healthCheckTimeout determines the max duration beyond lease expiration +// allowed before reporting unhealthy. +// The caller sidecar should document the handler address in appropriate flag +// descriptions. +func (l *leaderElection) PrepareHealthCheck( + s Server, + healthCheckTimeout time.Duration) { + + l.healthCheck = leaderelection.NewLeaderHealthzAdaptor(healthCheckTimeout) + s.Handle(HealthCheckerAddress, adaptCheckToHandler(l.healthCheck.Check)) +} + func (l *leaderElection) Run() error { if l.identity == "" { id, err := defaultLeaderElectionIdentity() @@ -179,6 +218,7 @@ func (l *leaderElection) Run() error { klog.V(3).Infof("new leader detected, current leader: %s", identity) }, }, + WatchDog: l.healthCheck, } ctx := l.ctx @@ -220,3 +260,15 @@ func inClusterNamespace() string { return "default" } + +// adaptCheckToHandler returns an http.HandlerFunc that serves the provided checks. +func adaptCheckToHandler(c func(r *http.Request) error) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + err := c(r) + if err != nil { + http.Error(w, fmt.Sprintf("internal server error: %v", err), http.StatusInternalServerError) + } else { + fmt.Fprint(w, "ok") + } + }) +} diff --git a/metrics/metrics.go b/metrics/metrics.go index 6ca2b9ae..733e6923 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -27,7 +27,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/component-base/metrics" - "k8s.io/klog/v2" ) const ( @@ -90,10 +89,15 @@ type CSIMetricsManager interface { // driverName - Name of the CSI driver against which this operation was executed. SetDriverName(driverName string) - // StartMetricsEndpoint starts the metrics endpoint at the specified address/path - // for this metrics manager. - // If the metricsAddress is an empty string, this will be a no op. - StartMetricsEndpoint(metricsAddress, metricsPath string) + // RegisterToServer registers an HTTP handler for this metrics manager to the + // given server at the specified address/path. + RegisterToServer(s Server, metricsPath string) +} + +// Server represents any type that could serve HTTP requests for the metrics +// endpoint. +type Server interface { + Handle(pattern string, handler http.Handler) } // MetricsManagerOption is used to pass optional configuration to a @@ -325,27 +329,13 @@ func (cmm *csiMetricsManager) SetDriverName(driverName string) { } } -// StartMetricsEndpoint starts the metrics endpoint at the specified address/path -// for this metrics manager on a new go routine. -// If the metricsAddress is an empty string, this will be a no op. -func (cmm *csiMetricsManager) StartMetricsEndpoint(metricsAddress, metricsPath string) { - if metricsAddress == "" { - klog.Warningf("metrics endpoint will not be started because `metrics-address` was not specified.") - return - } - - http.Handle(metricsPath, metrics.HandlerFor( +// RegisterToServer registers an HTTP handler for this metrics manager to the +// given server at the specified address/path. +func (cmm *csiMetricsManager) RegisterToServer(s Server, metricsPath string) { + s.Handle(metricsPath, metrics.HandlerFor( cmm.GetRegistry(), metrics.HandlerOpts{ ErrorHandling: metrics.ContinueOnError})) - - // Spawn a new go routine to listen on specified endpoint - go func() { - err := http.ListenAndServe(metricsAddress, nil) - if err != nil { - klog.Fatalf("Failed to start prometheus metrics endpoint on specified address (%q) and path (%q): %s", metricsAddress, metricsPath, err) - } - }() } // VerifyMetricsMatch is a helper function that verifies that the expected and diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go index 18233ec8..d6e6d4b0 100644 --- a/metrics/metrics_test.go +++ b/metrics/metrics_test.go @@ -19,6 +19,7 @@ package metrics import ( "io/ioutil" "net/http" + "net/http/httptest" "strings" "testing" "time" @@ -481,29 +482,26 @@ func TestRecordMetrics_Negative(t *testing.T) { } } -func TestStartMetricsEndPoint_Noop(t *testing.T) { +func TestRegisterToServer_Noop(t *testing.T) { // Arrange cmm := NewCSIMetricsManagerForSidecar( "fake.csi.driver.io" /* driverName */) operationDuration, _ := time.ParseDuration("20s") + mux := http.NewServeMux() // Act - cmm.StartMetricsEndpoint(":8080", "/metrics") + cmm.RegisterToServer(mux, "/metrics") cmm.RecordMetrics( "/csi.v1.Controller/ControllerGetCapabilities", /* operationName */ nil, /* operationErr */ operationDuration /* operationDuration */) // Assert - request, err := http.NewRequest("GET", "http://localhost:8080/metrics", strings.NewReader("")) - if err != nil { - t.Fatalf("Creating request for metrics endpoint failed: %v", err) - } - client := &http.Client{} - resp, err := client.Do(request) - if err != nil { - t.Fatalf("Failed to GET metrics. Error: %v", err) - } + request := httptest.NewRequest("GET", "/metrics", strings.NewReader("")) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, request) + resp := rec.Result() + if resp.StatusCode != 200 { t.Fatalf("/metrics response status not 200. Response was: %+v", resp) }