Skip to content

Commit

Permalink
x-pack/filebeat/input/httpjson: backport configurable keep-alives
Browse files Browse the repository at this point in the history
This is a manual backport of elastic#34014 tailored to 7.17.

Keep-alive options are provided for both v1 and v2 HTTPJSON, so to
simplify the change the keep-alive logic is put in v2 and exported for
use by v1.
  • Loading branch information
efd6 committed Mar 6, 2023
1 parent 3dbd8fb commit b6b90d1
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Filebeat*

- Allow user configuration of keep-alive behaviour for HTTPJSON input. {issue}33951[33951] {pull}34014[34014] {pull}34743[34743]

*Heartbeat*

Expand Down
20 changes: 20 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,26 @@ filebeat.inputs:
request.proxy_url: http://proxy.example:8080
----

[float]
==== `request.keep_alive.disable`

This specifies whether to disable keep-alives for HTTP end-points. Default: `true`.

[float]
==== `request.keep_alive.max_idle_connections`

The maximum number of idle connections across all hosts. Zero means no limit. Default: `0`.

[float]
==== `request.keep_alive.max_idle_connections_per_host`

The maximum idle connections to keep per-host. If zero, defaults to two. Default: `0`.

[float]
==== `request.keep_alive.idle_connection_timeout`

The maximum amount of time an idle connection will remain idle before closing itself. Valid time units are `ns`, `us`, `ms`, `s`, `m`, `h`. Zero means no limit. Default: `0s`.

[float]
==== `request.retry.max_attempts`

Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/httpjson/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport/httpcommon"
v2 "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson/internal/v2"
)

// config contains information about httpjson configuration
Expand All @@ -32,6 +33,7 @@ type config struct {
NoHTTPBody bool `config:"no_http_body"`
Pagination *paginationConfig `config:"pagination"`
RateLimit *rateLimitConfig `config:"rate_limit"`
KeepAlive v2.KeepAlive `config:"keep_alive"`
RetryMax int `config:"retry.max_attempts"`
RetryWaitMin time.Duration `config:"retry.wait_min"`
RetryWaitMax time.Duration `config:"retry.wait_max"`
Expand Down
11 changes: 5 additions & 6 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,11 @@ func run(
func newHTTPClient(ctx context.Context, config config) (*http.Client, error) {
config.Transport.Timeout = config.HTTPClientTimeout

httpClient, err :=
config.Transport.Client(
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: true},
httpcommon.WithHeaderRoundTripper(map[string]string{"User-Agent": userAgent}),
)
httpClient, err := config.Transport.Client(
httpcommon.WithAPMHTTPInstrumentation(),
config.KeepAlive.Settings(),
httpcommon.WithHeaderRoundTripper(map[string]string{"User-Agent": userAgent}),
)
if err != nil {
return nil, err
}
Expand Down
33 changes: 33 additions & 0 deletions x-pack/filebeat/input/httpjson/internal/v2/config_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,38 @@ func (c rateLimitConfig) Validate() error {
return nil
}

type KeepAlive struct {
Disable *bool `config:"disable"`
MaxIdleConns int `config:"max_idle_connections"`
MaxIdleConnsPerHost int `config:"max_idle_connections_per_host"` // If zero, http.DefaultMaxIdleConnsPerHost is the value used by http.Transport.
IdleConnTimeout time.Duration `config:"idle_connection_timeout"`
}

func (c KeepAlive) Validate() error {
if c.Disable == nil || *c.Disable {
return nil
}
if c.MaxIdleConns < 0 {
return errors.New("max_idle_connections must not be negative")
}
if c.MaxIdleConnsPerHost < 0 {
return errors.New("max_idle_connections_per_host must not be negative")
}
if c.IdleConnTimeout < 0 {
return errors.New("idle_connection_timeout must not be negative")
}
return nil
}

func (c KeepAlive) Settings() httpcommon.WithKeepaliveSettings {
return httpcommon.WithKeepaliveSettings{
Disable: c.Disable == nil || *c.Disable,
MaxIdleConns: c.MaxIdleConns,
MaxIdleConnsPerHost: c.MaxIdleConnsPerHost,
IdleConnTimeout: c.IdleConnTimeout,
}
}

type urlConfig struct {
*url.URL
}
Expand All @@ -94,6 +126,7 @@ type requestConfig struct {
RedirectHeadersBanList []string `config:"redirect.headers_ban_list"`
RedirectMaxRedirects int `config:"redirect.max_redirects"`
RateLimit *rateLimitConfig `config:"rate_limit"`
KeepAlive KeepAlive `config:"keep_alive"`
Transforms transformsConfig `config:"transforms"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`
Expand Down
66 changes: 62 additions & 4 deletions x-pack/filebeat/input/httpjson/internal/v2/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package v2
import (
"context"
"errors"
"fmt"
"os"
"testing"

Expand All @@ -15,6 +16,7 @@ import (
"golang.org/x/oauth2/google"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport/httpcommon"
)

func TestProviderCanonical(t *testing.T) {
Expand All @@ -39,7 +41,7 @@ func TestIsEnabled(t *testing.T) {
t.Fatal("OAuth2 should be enabled by default")
}

var enabled = false
enabled := false
oauth2.Enabled = &enabled

assert.False(t, oauth2.isEnabled())
Expand Down Expand Up @@ -69,19 +71,19 @@ func TestGetTokenURLWithAzure(t *testing.T) {
}

func TestGetEndpointParams(t *testing.T) {
var expected = map[string][]string{"foo": {"bar"}}
expected := map[string][]string{"foo": {"bar"}}
oauth2 := oAuth2Config{EndpointParams: map[string][]string{"foo": {"bar"}}}
assert.Equal(t, expected, oauth2.getEndpointParams())
}

func TestGetEndpointParamsWithAzure(t *testing.T) {
var expectedWithoutResource = map[string][]string{"foo": {"bar"}}
expectedWithoutResource := map[string][]string{"foo": {"bar"}}
oauth2 := oAuth2Config{Provider: "azure", EndpointParams: map[string][]string{"foo": {"bar"}}}

assert.Equal(t, expectedWithoutResource, oauth2.getEndpointParams())

oauth2.AzureResource = "baz"
var expectedWithResource = map[string][]string{"foo": {"bar"}, "resource": {"baz"}}
expectedWithResource := map[string][]string{"foo": {"bar"}, "resource": {"baz"}}

assert.Equal(t, expectedWithResource, oauth2.getEndpointParams())
}
Expand Down Expand Up @@ -392,3 +394,59 @@ func TestCursorEntryConfig(t *testing.T) {
assert.True(t, conf["entry3"].mustIgnoreEmptyValue())
assert.True(t, conf["entry4"].mustIgnoreEmptyValue())
}

var keepAliveTests = []struct {
name string
input map[string]interface{}
want httpcommon.WithKeepaliveSettings
wantErr error
}{
{
name: "keep_alive_none", // Default to the old behaviour of true.
input: map[string]interface{}{},
want: httpcommon.WithKeepaliveSettings{Disable: true},
},
{
name: "keep_alive_true",
input: map[string]interface{}{
"request.keep_alive.disable": true,
},
want: httpcommon.WithKeepaliveSettings{Disable: true},
},
{
name: "keep_alive_false",
input: map[string]interface{}{
"request.keep_alive.disable": false,
},
want: httpcommon.WithKeepaliveSettings{Disable: false},
},
{
name: "keep_alive_invalid_max",
input: map[string]interface{}{
"request.keep_alive.disable": false,
"request.keep_alive.max_idle_connections": -1,
},
wantErr: errors.New("max_idle_connections must not be negative accessing 'request.keep_alive'"),
},
}

func TestKeepAliveSetting(t *testing.T) {
for _, test := range keepAliveTests {
t.Run(test.name, func(t *testing.T) {
test.input["request.url"] = "localhost"
cfg := common.MustNewConfigFrom(test.input)
conf := defaultConfig()
err := cfg.Unpack(&conf)
if fmt.Sprint(err) != fmt.Sprint(test.wantErr) {
t.Errorf("unexpected error return from Unpack: got: %q want: %q", err, test.wantErr)
}
if err != nil {
return
}
got := conf.Request.KeepAlive.Settings()
if got != test.want {
t.Errorf("unexpected setting for %s: got: %#v\nwant:%#v", test.name, got, test.want)
}
})
}
}
13 changes: 9 additions & 4 deletions x-pack/filebeat/input/httpjson/internal/v2/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,7 @@ func run(

func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpClient, error) {
// Make retryable HTTP client
netHTTPClient, err := config.Request.Transport.Client(
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: true},
)
netHTTPClient, err := config.Request.Transport.Client(clientOptions(config.Request.URL.URL, config.Request.KeepAlive.Settings())...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -168,6 +165,14 @@ func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpC
return &httpClient{client: client.StandardClient(), limiter: limiter}, nil
}

// clientOptions returns constructed client configuration options.
func clientOptions(u *url.URL, keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption {
return []httpcommon.TransportOption{
httpcommon.WithAPMHTTPInstrumentation(),
keepalive,
}
}

func checkRedirect(config *requestConfig, log *logp.Logger) func(*http.Request, []*http.Request) error {
return func(req *http.Request, via []*http.Request) error {
log.Debug("http client: checking redirect")
Expand Down

0 comments on commit b6b90d1

Please sign in to comment.