From 971dd5d835456fd9c86303e2d52d3da1f4f99f96 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Tue, 25 Oct 2022 17:31:50 +0200 Subject: [PATCH 1/8] Expose Cloud Foundry filtering into user configuration --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-cloudfoundry.asciidoc | 13 +++++ x-pack/filebeat/input/cloudfoundry/v1.go | 1 + x-pack/libbeat/common/cloudfoundry/config.go | 17 +++++++ .../common/cloudfoundry/dopplerconsumer.go | 47 +++++++++++++++---- 5 files changed, 69 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2300269289b..17f99cb94fe 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -162,6 +162,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Add cloudflare R2 to provider list in AWS S3 input. {pull}32620[32620] - Add support for single string containing multiple relation-types in getRFC5988Link. {pull}32811[32811] - Fix handling of invalid UserIP and LocalIP values. {pull}32896[32896] +- Expose Cloud Foundry filtering options into user configuration. {pull}[] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc b/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc index 1c9eb17f2b2..0ac6e6e1b7c 100644 --- a/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc @@ -66,6 +66,19 @@ The URL of the Cloud Foundry UAA API. Optional. Default: "(value from ${api_addr The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(`log-stream` subdomain under the same domain as `api_server`)". +[float] +==== `firehose_filter` + +The kind of filtering to apply for events from Cloud Foundry's firehose. + +Possible values: +* `all`: {beatname_uc} doesn't apply any filter, all events in the firehose are requested. +* `all-logs`: {beatname_uc} requests all events in the firehose, and selects the + ones that can be handled as logs, that is the ones whose envelope type is log message, + error, or http start/stop. +* `logs`: {beatname_uc} requests to the firehose log messages only. +* `metrics`: {beatname_uc} requests to the firehose metrics messages only. + [float] ==== `client_id` diff --git a/x-pack/filebeat/input/cloudfoundry/v1.go b/x-pack/filebeat/input/cloudfoundry/v1.go index 6b0b5077b13..ac0bd53b8eb 100644 --- a/x-pack/filebeat/input/cloudfoundry/v1.go +++ b/x-pack/filebeat/input/cloudfoundry/v1.go @@ -41,6 +41,7 @@ func (i *inputV1) Run(ctx v2.Context, publisher stateless.Publisher) error { defer log.Info("Stopped cloudfoundry input") callbacks := cloudfoundry.DopplerCallbacks{ + Filter: i.config.FirehoseFilter, Log: func(evt cloudfoundry.Event) { publisher.Publish(createEvent(evt)) }, diff --git a/x-pack/libbeat/common/cloudfoundry/config.go b/x-pack/libbeat/common/cloudfoundry/config.go index cfb012a0911..4a8d0742f46 100644 --- a/x-pack/libbeat/common/cloudfoundry/config.go +++ b/x-pack/libbeat/common/cloudfoundry/config.go @@ -17,6 +17,20 @@ const ( ConsumerVersionV2 = "v2" ) +const ( + // FirehoseFilterAll doesn't apply any filter. + FirehoseFilterAll = "all" + + // FirehoseFilterAllLogs filters on client side, for log-like events (logs, errors and http events). + FirehoseFilterAllLogs = "all-logs" + + // FirehoseFilterLogs filters on server side for log events, adding the parameter "file-type=logs". + FirehoseFilterLogs = "logs" + + // FirehoseFilterMetrics filters on server side for metric events, adding the parameter "file-type=metrics". + FirehoseFilterMetrics = "metrics" +) + type Config struct { // Version of the consumer to use, it can be v1 or v2, defaults to v1 Version string `config:"version"` @@ -31,6 +45,9 @@ type Config struct { UaaAddress string `config:"uaa_address"` RlpAddress string `config:"rlp_address"` + // Override default firehose filtering + FirehoseFilter string `config:"firehose_filter"` + // ShardID when retrieving events from loggregator, sharing this ID across // multiple filebeats will shard the load of receiving and sending events. ShardID string `config:"shard_id" validate:"required"` diff --git a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go index a2ec852b66c..c9136665210 100644 --- a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go +++ b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go @@ -19,6 +19,7 @@ import ( ) type DopplerCallbacks struct { + Filter string Log func(evt Event) Metric func(evt Event) Error func(evt EventError) @@ -80,23 +81,49 @@ func (c *DopplerConsumer) Run() { } func (c *DopplerConsumer) logsFirehose() { - c.firehose(c.callbacks.Log, consumer.LogMessages) + filter := FirehoseFilterAllLogs + if c.callbacks.Filter != "" { + filter = c.callbacks.Filter + } + filterFn, envelopeFilter := selectFilter(filter) + c.firehose(c.callbacks.Log, filterFn, envelopeFilter) } func (c *DopplerConsumer) metricsFirehose() { - c.firehose(c.callbacks.Metric, consumer.Metrics) + filter := FirehoseFilterMetrics + if c.callbacks.Filter != "" { + filter = c.callbacks.Filter + } + filterFn, envelopeFilter := selectFilter(filter) + c.firehose(c.callbacks.Metric, filterFn, envelopeFilter) +} + +func selectFilter(firehoseFilter string) (func(*events.Envelope) bool, consumer.EnvelopeFilter) { + switch firehoseFilter { + case FirehoseFilterAll: + return filterNoFilter, -1 + case FirehoseFilterAllLogs: + // Requests all events, and selects log-like events. + return filterLogs, -1 + case FirehoseFilterLogs: + // Uses filter-type=logs in requests to the firehose. + return filterNoFilter, consumer.LogMessages + case FirehoseFilterMetrics: + // Uses filter-type=metrics in requests to the firehose. + return filterNoFilter, consumer.Metrics + default: + // TODO: Handle unknown filters. + return filterNoFilter, -1 + } } -func (c *DopplerConsumer) firehose(cb func(evt Event), filter consumer.EnvelopeFilter) { +func (c *DopplerConsumer) firehose(cb func(evt Event), filterFn func(*events.Envelope) bool, filter consumer.EnvelopeFilter) { var msgChan <-chan *events.Envelope var errChan <-chan error - filterFn := filterNoFilter - if filter == consumer.LogMessages { - // We are interested in more envelopes than the ones obtained when filtering - // by log messages, retrieve them all and filter later. - // If this causes performance or other problems, we will have to investigate - // if it is possible to pass different filters to the firehose url. - filterFn = filterLogs + if filterFn == nil { + filterFn = filterNoFilter + } + if filter == consumer.LogMessages || filter == consumer.Metrics { msgChan, errChan = c.consumer.Firehose(c.subscriptionID, "") } else { msgChan, errChan = c.consumer.FilteredFirehose(c.subscriptionID, "", filter) From 20c7ddcfccfe6de4ee2ec51e7c3f365bb5cc0827 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Tue, 25 Oct 2022 17:37:51 +0200 Subject: [PATCH 2/8] Fix changelog --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 17f99cb94fe..036bcacd717 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -162,7 +162,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Add cloudflare R2 to provider list in AWS S3 input. {pull}32620[32620] - Add support for single string containing multiple relation-types in getRFC5988Link. {pull}32811[32811] - Fix handling of invalid UserIP and LocalIP values. {pull}32896[32896] -- Expose Cloud Foundry filtering options into user configuration. {pull}[] +- Expose Cloud Foundry filtering options into user configuration. {pull}33456[33456] *Auditbeat* From 7fc340db84bafa511ccda96586696fa0f97c624a Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Tue, 25 Oct 2022 17:49:12 +0200 Subject: [PATCH 3/8] Linting and fix on docs --- x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc | 2 +- x-pack/filebeat/input/cloudfoundry/v1.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc b/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc index 0ac6e6e1b7c..244b8939855 100644 --- a/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc @@ -75,7 +75,7 @@ Possible values: * `all`: {beatname_uc} doesn't apply any filter, all events in the firehose are requested. * `all-logs`: {beatname_uc} requests all events in the firehose, and selects the ones that can be handled as logs, that is the ones whose envelope type is log message, - error, or http start/stop. + error, or http start/stop. This is the default. * `logs`: {beatname_uc} requests to the firehose log messages only. * `metrics`: {beatname_uc} requests to the firehose metrics messages only. diff --git a/x-pack/filebeat/input/cloudfoundry/v1.go b/x-pack/filebeat/input/cloudfoundry/v1.go index ac0bd53b8eb..1f6aa5f6e7a 100644 --- a/x-pack/filebeat/input/cloudfoundry/v1.go +++ b/x-pack/filebeat/input/cloudfoundry/v1.go @@ -8,7 +8,7 @@ package cloudfoundry import ( - "github.com/pkg/errors" + "fmt" v2 "github.com/elastic/beats/v7/filebeat/input/v2" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" @@ -52,7 +52,7 @@ func (i *inputV1) Run(ctx v2.Context, publisher stateless.Publisher) error { consumer, err := hub.DopplerConsumer(callbacks) if err != nil { - return errors.Wrapf(err, "initializing doppler consumer") + return fmt.Errorf("initializing doppler consumer: %w", err) } stopCtx, cancel := ctxtool.WithFunc(ctx.Cancelation, func() { From fae0b0a25e43acfbc9de505f8e01967dfc6d92cc Mon Sep 17 00:00:00 2001 From: MichaelKatsoulis Date: Mon, 7 Nov 2022 12:21:15 +0200 Subject: [PATCH 4/8] Add both server and client side filtering --- x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc | 3 ++- x-pack/libbeat/common/cloudfoundry/config.go | 3 ++- x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc b/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc index 244b8939855..3c17859b3fe 100644 --- a/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc @@ -76,7 +76,8 @@ Possible values: * `all-logs`: {beatname_uc} requests all events in the firehose, and selects the ones that can be handled as logs, that is the ones whose envelope type is log message, error, or http start/stop. This is the default. -* `logs`: {beatname_uc} requests to the firehose log messages only. +* `logs`: {beatname_uc} requests to the firehose log messages only and selects the ones + whose envelope type is log message, error, or http start/stop. * `metrics`: {beatname_uc} requests to the firehose metrics messages only. [float] diff --git a/x-pack/libbeat/common/cloudfoundry/config.go b/x-pack/libbeat/common/cloudfoundry/config.go index 4a8d0742f46..3b4c4c8bc2c 100644 --- a/x-pack/libbeat/common/cloudfoundry/config.go +++ b/x-pack/libbeat/common/cloudfoundry/config.go @@ -24,7 +24,8 @@ const ( // FirehoseFilterAllLogs filters on client side, for log-like events (logs, errors and http events). FirehoseFilterAllLogs = "all-logs" - // FirehoseFilterLogs filters on server side for log events, adding the parameter "file-type=logs". + // FirehoseFilterLogs filters on server side for log events, adding the parameter "file-type=logs" + // and then filters on client side, for log-like events (logs, errors and http events). FirehoseFilterLogs = "logs" // FirehoseFilterMetrics filters on server side for metric events, adding the parameter "file-type=metrics". diff --git a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go index c9136665210..8d45bb26763 100644 --- a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go +++ b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go @@ -107,7 +107,7 @@ func selectFilter(firehoseFilter string) (func(*events.Envelope) bool, consumer. return filterLogs, -1 case FirehoseFilterLogs: // Uses filter-type=logs in requests to the firehose. - return filterNoFilter, consumer.LogMessages + return filterLogs, consumer.LogMessages case FirehoseFilterMetrics: // Uses filter-type=metrics in requests to the firehose. return filterNoFilter, consumer.Metrics From 989dfded7eb205687cf83ff639ee565a0759c631 Mon Sep 17 00:00:00 2001 From: MichaelKatsoulis Date: Mon, 7 Nov 2022 12:25:42 +0200 Subject: [PATCH 5/8] Edit changelog --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c79e62a8c7a..6e091a7bb5d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -147,9 +147,9 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Add cloudflare R2 to provider list in AWS S3 input. {pull}32620[32620] - Add support for single string containing multiple relation-types in getRFC5988Link. {pull}32811[32811] - Fix handling of invalid UserIP and LocalIP values. {pull}32896[32896] -- Expose Cloud Foundry filtering options into user configuration. {pull}33456[33456] - Allow http_endpoint instances to share ports. {issue}32578[32578] {pull}33377[33377] - Improve httpjson documentation for split processor. {pull}33473[33473] +- Expose Cloud Foundry filtering options into user configuration. {pull}33456[33456] *Auditbeat* From 17d262bcd4ea771bf67b0c6545bdb64882aab977 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Mon, 7 Nov 2022 19:46:35 +0100 Subject: [PATCH 6/8] Remove all-logs option --- x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc | 3 --- x-pack/libbeat/common/cloudfoundry/config.go | 3 --- x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go | 9 +++------ 3 files changed, 3 insertions(+), 12 deletions(-) diff --git a/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc b/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc index 3c17859b3fe..c0a72d15349 100644 --- a/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc @@ -73,9 +73,6 @@ The kind of filtering to apply for events from Cloud Foundry's firehose. Possible values: * `all`: {beatname_uc} doesn't apply any filter, all events in the firehose are requested. -* `all-logs`: {beatname_uc} requests all events in the firehose, and selects the - ones that can be handled as logs, that is the ones whose envelope type is log message, - error, or http start/stop. This is the default. * `logs`: {beatname_uc} requests to the firehose log messages only and selects the ones whose envelope type is log message, error, or http start/stop. * `metrics`: {beatname_uc} requests to the firehose metrics messages only. diff --git a/x-pack/libbeat/common/cloudfoundry/config.go b/x-pack/libbeat/common/cloudfoundry/config.go index 3b4c4c8bc2c..dfb6563b7be 100644 --- a/x-pack/libbeat/common/cloudfoundry/config.go +++ b/x-pack/libbeat/common/cloudfoundry/config.go @@ -21,9 +21,6 @@ const ( // FirehoseFilterAll doesn't apply any filter. FirehoseFilterAll = "all" - // FirehoseFilterAllLogs filters on client side, for log-like events (logs, errors and http events). - FirehoseFilterAllLogs = "all-logs" - // FirehoseFilterLogs filters on server side for log events, adding the parameter "file-type=logs" // and then filters on client side, for log-like events (logs, errors and http events). FirehoseFilterLogs = "logs" diff --git a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go index 8d45bb26763..cab81082f47 100644 --- a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go +++ b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go @@ -81,7 +81,7 @@ func (c *DopplerConsumer) Run() { } func (c *DopplerConsumer) logsFirehose() { - filter := FirehoseFilterAllLogs + filter := FirehoseFilterLogs if c.callbacks.Filter != "" { filter = c.callbacks.Filter } @@ -102,17 +102,14 @@ func selectFilter(firehoseFilter string) (func(*events.Envelope) bool, consumer. switch firehoseFilter { case FirehoseFilterAll: return filterNoFilter, -1 - case FirehoseFilterAllLogs: - // Requests all events, and selects log-like events. - return filterLogs, -1 case FirehoseFilterLogs: - // Uses filter-type=logs in requests to the firehose. + // Uses filter-type=logs in requests to the firehose and selects log-like events. return filterLogs, consumer.LogMessages case FirehoseFilterMetrics: // Uses filter-type=metrics in requests to the firehose. return filterNoFilter, consumer.Metrics default: - // TODO: Handle unknown filters. + // No filter. return filterNoFilter, -1 } } From 3deb848d577e14dc6413150075c9954609aa100d Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Mon, 7 Nov 2022 19:53:00 +0100 Subject: [PATCH 7/8] Remove option, simplify --- CHANGELOG.next.asciidoc | 2 +- .../docs/inputs/input-cloudfoundry.asciidoc | 11 ----- x-pack/filebeat/input/cloudfoundry/v1.go | 1 - x-pack/libbeat/common/cloudfoundry/config.go | 15 ------- .../common/cloudfoundry/dopplerconsumer.go | 42 ++----------------- 5 files changed, 4 insertions(+), 67 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6e091a7bb5d..1518f112f28 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -149,7 +149,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix handling of invalid UserIP and LocalIP values. {pull}32896[32896] - Allow http_endpoint instances to share ports. {issue}32578[32578] {pull}33377[33377] - Improve httpjson documentation for split processor. {pull}33473[33473] -- Expose Cloud Foundry filtering options into user configuration. {pull}33456[33456] +- Cloud Foundry input uses server-side filtering when retrieving logs. {pull}33456[33456] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc b/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc index c0a72d15349..1c9eb17f2b2 100644 --- a/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc @@ -66,17 +66,6 @@ The URL of the Cloud Foundry UAA API. Optional. Default: "(value from ${api_addr The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(`log-stream` subdomain under the same domain as `api_server`)". -[float] -==== `firehose_filter` - -The kind of filtering to apply for events from Cloud Foundry's firehose. - -Possible values: -* `all`: {beatname_uc} doesn't apply any filter, all events in the firehose are requested. -* `logs`: {beatname_uc} requests to the firehose log messages only and selects the ones - whose envelope type is log message, error, or http start/stop. -* `metrics`: {beatname_uc} requests to the firehose metrics messages only. - [float] ==== `client_id` diff --git a/x-pack/filebeat/input/cloudfoundry/v1.go b/x-pack/filebeat/input/cloudfoundry/v1.go index 1f6aa5f6e7a..b80fba36b1b 100644 --- a/x-pack/filebeat/input/cloudfoundry/v1.go +++ b/x-pack/filebeat/input/cloudfoundry/v1.go @@ -41,7 +41,6 @@ func (i *inputV1) Run(ctx v2.Context, publisher stateless.Publisher) error { defer log.Info("Stopped cloudfoundry input") callbacks := cloudfoundry.DopplerCallbacks{ - Filter: i.config.FirehoseFilter, Log: func(evt cloudfoundry.Event) { publisher.Publish(createEvent(evt)) }, diff --git a/x-pack/libbeat/common/cloudfoundry/config.go b/x-pack/libbeat/common/cloudfoundry/config.go index dfb6563b7be..cfb012a0911 100644 --- a/x-pack/libbeat/common/cloudfoundry/config.go +++ b/x-pack/libbeat/common/cloudfoundry/config.go @@ -17,18 +17,6 @@ const ( ConsumerVersionV2 = "v2" ) -const ( - // FirehoseFilterAll doesn't apply any filter. - FirehoseFilterAll = "all" - - // FirehoseFilterLogs filters on server side for log events, adding the parameter "file-type=logs" - // and then filters on client side, for log-like events (logs, errors and http events). - FirehoseFilterLogs = "logs" - - // FirehoseFilterMetrics filters on server side for metric events, adding the parameter "file-type=metrics". - FirehoseFilterMetrics = "metrics" -) - type Config struct { // Version of the consumer to use, it can be v1 or v2, defaults to v1 Version string `config:"version"` @@ -43,9 +31,6 @@ type Config struct { UaaAddress string `config:"uaa_address"` RlpAddress string `config:"rlp_address"` - // Override default firehose filtering - FirehoseFilter string `config:"firehose_filter"` - // ShardID when retrieving events from loggregator, sharing this ID across // multiple filebeats will shard the load of receiving and sending events. ShardID string `config:"shard_id" validate:"required"` diff --git a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go index cab81082f47..f33a2f1f746 100644 --- a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go +++ b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go @@ -19,7 +19,6 @@ import ( ) type DopplerCallbacks struct { - Filter string Log func(evt Event) Metric func(evt Event) Error func(evt EventError) @@ -81,50 +80,15 @@ func (c *DopplerConsumer) Run() { } func (c *DopplerConsumer) logsFirehose() { - filter := FirehoseFilterLogs - if c.callbacks.Filter != "" { - filter = c.callbacks.Filter - } - filterFn, envelopeFilter := selectFilter(filter) - c.firehose(c.callbacks.Log, filterFn, envelopeFilter) + c.firehose(c.callbacks.Log, filterLogs, consumer.LogMessages) } func (c *DopplerConsumer) metricsFirehose() { - filter := FirehoseFilterMetrics - if c.callbacks.Filter != "" { - filter = c.callbacks.Filter - } - filterFn, envelopeFilter := selectFilter(filter) - c.firehose(c.callbacks.Metric, filterFn, envelopeFilter) -} - -func selectFilter(firehoseFilter string) (func(*events.Envelope) bool, consumer.EnvelopeFilter) { - switch firehoseFilter { - case FirehoseFilterAll: - return filterNoFilter, -1 - case FirehoseFilterLogs: - // Uses filter-type=logs in requests to the firehose and selects log-like events. - return filterLogs, consumer.LogMessages - case FirehoseFilterMetrics: - // Uses filter-type=metrics in requests to the firehose. - return filterNoFilter, consumer.Metrics - default: - // No filter. - return filterNoFilter, -1 - } + c.firehose(c.callbacks.Metric, filterNoFilter, consumer.Metrics) } func (c *DopplerConsumer) firehose(cb func(evt Event), filterFn func(*events.Envelope) bool, filter consumer.EnvelopeFilter) { - var msgChan <-chan *events.Envelope - var errChan <-chan error - if filterFn == nil { - filterFn = filterNoFilter - } - if filter == consumer.LogMessages || filter == consumer.Metrics { - msgChan, errChan = c.consumer.Firehose(c.subscriptionID, "") - } else { - msgChan, errChan = c.consumer.FilteredFirehose(c.subscriptionID, "", filter) - } + msgChan, errChan := c.consumer.FilteredFirehose(c.subscriptionID, "", filter) for { select { case env := <-msgChan: From f0ca944bf0581fc962496d8a101e6e825d98b9b1 Mon Sep 17 00:00:00 2001 From: MichaelKatsoulis Date: Tue, 8 Nov 2022 11:01:46 +0200 Subject: [PATCH 8/8] Merge upstream --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 448a9c4712c..2b8b161430b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -149,8 +149,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix handling of invalid UserIP and LocalIP values. {pull}32896[32896] - Allow http_endpoint instances to share ports. {issue}32578[32578] {pull}33377[33377] - Improve httpjson documentation for split processor. {pull}33473[33473] -- Cloud Foundry input uses server-side filtering when retrieving logs. {pull}33456[33456] - Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499] +- Cloud Foundry input uses server-side filtering when retrieving logs. {pull}33456[33456] *Auditbeat*