diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 774ff44a693f..6647cc8e3587 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -77,6 +77,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Filebeat* +- Allow user configuration of keep-alive behaviour for HTTPJSON input. {issue}33951[33951] {pull}34014[34014] {pull}34743[34743] *Heartbeat* diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index 39f414e600a1..f0c4ab8f26c2 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -419,6 +419,26 @@ filebeat.inputs: request.proxy_url: http://proxy.example:8080 ---- +[float] +==== `request.keep_alive.disable` + +This specifies whether to disable keep-alives for HTTP end-points. Default: `true`. + +[float] +==== `request.keep_alive.max_idle_connections` + +The maximum number of idle connections across all hosts. Zero means no limit. Default: `0`. + +[float] +==== `request.keep_alive.max_idle_connections_per_host` + +The maximum idle connections to keep per-host. If zero, defaults to two. Default: `0`. + +[float] +==== `request.keep_alive.idle_connection_timeout` + +The maximum amount of time an idle connection will remain idle before closing itself. Valid time units are `ns`, `us`, `ms`, `s`, `m`, `h`. Zero means no limit. Default: `0s`. + [float] ==== `request.retry.max_attempts` diff --git a/x-pack/filebeat/input/httpjson/config.go b/x-pack/filebeat/input/httpjson/config.go index a2965ace4ef3..b690f8c6866a 100644 --- a/x-pack/filebeat/input/httpjson/config.go +++ b/x-pack/filebeat/input/httpjson/config.go @@ -15,6 +15,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" + v2 "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/internal/v2" ) // config contains information about httpjson configuration @@ -32,6 +33,7 @@ type config struct { NoHTTPBody bool `config:"no_http_body"` Pagination *paginationConfig `config:"pagination"` RateLimit *rateLimitConfig `config:"rate_limit"` + KeepAlive v2.KeepAlive `config:"keep_alive"` RetryMax int `config:"retry.max_attempts"` RetryWaitMin time.Duration `config:"retry.wait_min"` RetryWaitMax time.Duration `config:"retry.wait_max"` diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index ba0bae85dd03..bcd5e6efdb34 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -160,12 +160,11 @@ func run( func newHTTPClient(ctx context.Context, config config) (*http.Client, error) { config.Transport.Timeout = config.HTTPClientTimeout - httpClient, err := - config.Transport.Client( - httpcommon.WithAPMHTTPInstrumentation(), - httpcommon.WithKeepaliveSettings{Disable: true}, - httpcommon.WithHeaderRoundTripper(map[string]string{"User-Agent": userAgent}), - ) + httpClient, err := config.Transport.Client( + httpcommon.WithAPMHTTPInstrumentation(), + config.KeepAlive.Settings(), + httpcommon.WithHeaderRoundTripper(map[string]string{"User-Agent": userAgent}), + ) if err != nil { return nil, err } diff --git a/x-pack/filebeat/input/httpjson/internal/v2/config_request.go b/x-pack/filebeat/input/httpjson/internal/v2/config_request.go index adf744519da8..f448407a5ca2 100644 --- a/x-pack/filebeat/input/httpjson/internal/v2/config_request.go +++ b/x-pack/filebeat/input/httpjson/internal/v2/config_request.go @@ -69,6 +69,38 @@ func (c rateLimitConfig) Validate() error { return nil } +type KeepAlive struct { + Disable *bool `config:"disable"` + MaxIdleConns int `config:"max_idle_connections"` + MaxIdleConnsPerHost int `config:"max_idle_connections_per_host"` // If zero, http.DefaultMaxIdleConnsPerHost is the value used by http.Transport. + IdleConnTimeout time.Duration `config:"idle_connection_timeout"` +} + +func (c KeepAlive) Validate() error { + if c.Disable == nil || *c.Disable { + return nil + } + if c.MaxIdleConns < 0 { + return errors.New("max_idle_connections must not be negative") + } + if c.MaxIdleConnsPerHost < 0 { + return errors.New("max_idle_connections_per_host must not be negative") + } + if c.IdleConnTimeout < 0 { + return errors.New("idle_connection_timeout must not be negative") + } + return nil +} + +func (c KeepAlive) Settings() httpcommon.WithKeepaliveSettings { + return httpcommon.WithKeepaliveSettings{ + Disable: c.Disable == nil || *c.Disable, + MaxIdleConns: c.MaxIdleConns, + MaxIdleConnsPerHost: c.MaxIdleConnsPerHost, + IdleConnTimeout: c.IdleConnTimeout, + } +} + type urlConfig struct { *url.URL } @@ -94,6 +126,7 @@ type requestConfig struct { RedirectHeadersBanList []string `config:"redirect.headers_ban_list"` RedirectMaxRedirects int `config:"redirect.max_redirects"` RateLimit *rateLimitConfig `config:"rate_limit"` + KeepAlive KeepAlive `config:"keep_alive"` Transforms transformsConfig `config:"transforms"` Transport httpcommon.HTTPTransportSettings `config:",inline"` diff --git a/x-pack/filebeat/input/httpjson/internal/v2/config_test.go b/x-pack/filebeat/input/httpjson/internal/v2/config_test.go index 4110430b25e8..306e2ce18f0e 100644 --- a/x-pack/filebeat/input/httpjson/internal/v2/config_test.go +++ b/x-pack/filebeat/input/httpjson/internal/v2/config_test.go @@ -7,6 +7,7 @@ package v2 import ( "context" "errors" + "fmt" "os" "testing" @@ -15,6 +16,7 @@ import ( "golang.org/x/oauth2/google" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/transport/httpcommon" ) func TestProviderCanonical(t *testing.T) { @@ -39,7 +41,7 @@ func TestIsEnabled(t *testing.T) { t.Fatal("OAuth2 should be enabled by default") } - var enabled = false + enabled := false oauth2.Enabled = &enabled assert.False(t, oauth2.isEnabled()) @@ -69,19 +71,19 @@ func TestGetTokenURLWithAzure(t *testing.T) { } func TestGetEndpointParams(t *testing.T) { - var expected = map[string][]string{"foo": {"bar"}} + expected := map[string][]string{"foo": {"bar"}} oauth2 := oAuth2Config{EndpointParams: map[string][]string{"foo": {"bar"}}} assert.Equal(t, expected, oauth2.getEndpointParams()) } func TestGetEndpointParamsWithAzure(t *testing.T) { - var expectedWithoutResource = map[string][]string{"foo": {"bar"}} + expectedWithoutResource := map[string][]string{"foo": {"bar"}} oauth2 := oAuth2Config{Provider: "azure", EndpointParams: map[string][]string{"foo": {"bar"}}} assert.Equal(t, expectedWithoutResource, oauth2.getEndpointParams()) oauth2.AzureResource = "baz" - var expectedWithResource = map[string][]string{"foo": {"bar"}, "resource": {"baz"}} + expectedWithResource := map[string][]string{"foo": {"bar"}, "resource": {"baz"}} assert.Equal(t, expectedWithResource, oauth2.getEndpointParams()) } @@ -392,3 +394,59 @@ func TestCursorEntryConfig(t *testing.T) { assert.True(t, conf["entry3"].mustIgnoreEmptyValue()) assert.True(t, conf["entry4"].mustIgnoreEmptyValue()) } + +var keepAliveTests = []struct { + name string + input map[string]interface{} + want httpcommon.WithKeepaliveSettings + wantErr error +}{ + { + name: "keep_alive_none", // Default to the old behaviour of true. + input: map[string]interface{}{}, + want: httpcommon.WithKeepaliveSettings{Disable: true}, + }, + { + name: "keep_alive_true", + input: map[string]interface{}{ + "request.keep_alive.disable": true, + }, + want: httpcommon.WithKeepaliveSettings{Disable: true}, + }, + { + name: "keep_alive_false", + input: map[string]interface{}{ + "request.keep_alive.disable": false, + }, + want: httpcommon.WithKeepaliveSettings{Disable: false}, + }, + { + name: "keep_alive_invalid_max", + input: map[string]interface{}{ + "request.keep_alive.disable": false, + "request.keep_alive.max_idle_connections": -1, + }, + wantErr: errors.New("max_idle_connections must not be negative accessing 'request.keep_alive'"), + }, +} + +func TestKeepAliveSetting(t *testing.T) { + for _, test := range keepAliveTests { + t.Run(test.name, func(t *testing.T) { + test.input["request.url"] = "localhost" + cfg := common.MustNewConfigFrom(test.input) + conf := defaultConfig() + err := cfg.Unpack(&conf) + if fmt.Sprint(err) != fmt.Sprint(test.wantErr) { + t.Errorf("unexpected error return from Unpack: got: %q want: %q", err, test.wantErr) + } + if err != nil { + return + } + got := conf.Request.KeepAlive.Settings() + if got != test.want { + t.Errorf("unexpected setting for %s: got: %#v\nwant:%#v", test.name, got, test.want) + } + }) + } +} diff --git a/x-pack/filebeat/input/httpjson/internal/v2/input.go b/x-pack/filebeat/input/httpjson/internal/v2/input.go index b8f32655c2b8..84f26757d814 100644 --- a/x-pack/filebeat/input/httpjson/internal/v2/input.go +++ b/x-pack/filebeat/input/httpjson/internal/v2/input.go @@ -135,10 +135,7 @@ func run( func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpClient, error) { // Make retryable HTTP client - netHTTPClient, err := config.Request.Transport.Client( - httpcommon.WithAPMHTTPInstrumentation(), - httpcommon.WithKeepaliveSettings{Disable: true}, - ) + netHTTPClient, err := config.Request.Transport.Client(clientOptions(config.Request.URL.URL, config.Request.KeepAlive.Settings())...) if err != nil { return nil, err } @@ -168,6 +165,14 @@ func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpC return &httpClient{client: client.StandardClient(), limiter: limiter}, nil } +// clientOptions returns constructed client configuration options. +func clientOptions(u *url.URL, keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption { + return []httpcommon.TransportOption{ + httpcommon.WithAPMHTTPInstrumentation(), + keepalive, + } +} + func checkRedirect(config *requestConfig, log *logp.Logger) func(*http.Request, []*http.Request) error { return func(req *http.Request, via []*http.Request) error { log.Debug("http client: checking redirect")