Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use server-side filtering when retrieving Cloud Foundry logs #33456

Merged
merged 11 commits into from
Nov 8, 2022
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix handling of invalid UserIP and LocalIP values. {pull}32896[32896]
- Allow http_endpoint instances to share ports. {issue}32578[32578] {pull}33377[33377]
- Improve httpjson documentation for split processor. {pull}33473[33473]
- Expose Cloud Foundry filtering options into user configuration. {pull}33456[33456]

*Auditbeat*

Expand Down
14 changes: 14 additions & 0 deletions x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,20 @@ The URL of the Cloud Foundry UAA API. Optional. Default: "(value from ${api_addr

The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(`log-stream` subdomain under the same domain as `api_server`)".

[float]
==== `firehose_filter`

The kind of filtering to apply for events from Cloud Foundry's firehose.

Possible values:
* `all`: {beatname_uc} doesn't apply any filter, all events in the firehose are requested.
* `all-logs`: {beatname_uc} requests all events in the firehose, and selects the
ones that can be handled as logs, that is the ones whose envelope type is log message,
error, or http start/stop. This is the default.
* `logs`: {beatname_uc} requests to the firehose log messages only and selects the ones
whose envelope type is log message, error, or http start/stop.
* `metrics`: {beatname_uc} requests to the firehose metrics messages only.

[float]
==== `client_id`

Expand Down
5 changes: 3 additions & 2 deletions x-pack/filebeat/input/cloudfoundry/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package cloudfoundry

import (
"github.com/pkg/errors"
"fmt"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
Expand Down Expand Up @@ -41,6 +41,7 @@ func (i *inputV1) Run(ctx v2.Context, publisher stateless.Publisher) error {
defer log.Info("Stopped cloudfoundry input")

callbacks := cloudfoundry.DopplerCallbacks{
Filter: i.config.FirehoseFilter,
Log: func(evt cloudfoundry.Event) {
publisher.Publish(createEvent(evt))
},
Expand All @@ -51,7 +52,7 @@ func (i *inputV1) Run(ctx v2.Context, publisher stateless.Publisher) error {

consumer, err := hub.DopplerConsumer(callbacks)
if err != nil {
return errors.Wrapf(err, "initializing doppler consumer")
return fmt.Errorf("initializing doppler consumer: %w", err)
}

stopCtx, cancel := ctxtool.WithFunc(ctx.Cancelation, func() {
Expand Down
18 changes: 18 additions & 0 deletions x-pack/libbeat/common/cloudfoundry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,21 @@ const (
ConsumerVersionV2 = "v2"
)

const (
// FirehoseFilterAll doesn't apply any filter.
FirehoseFilterAll = "all"

// FirehoseFilterAllLogs filters on client side, for log-like events (logs, errors and http events).
FirehoseFilterAllLogs = "all-logs"

// FirehoseFilterLogs filters on server side for log events, adding the parameter "file-type=logs"
// and then filters on client side, for log-like events (logs, errors and http events).
FirehoseFilterLogs = "logs"

// FirehoseFilterMetrics filters on server side for metric events, adding the parameter "file-type=metrics".
FirehoseFilterMetrics = "metrics"
)

type Config struct {
// Version of the consumer to use, it can be v1 or v2, defaults to v1
Version string `config:"version"`
Expand All @@ -31,6 +46,9 @@ type Config struct {
UaaAddress string `config:"uaa_address"`
RlpAddress string `config:"rlp_address"`

// Override default firehose filtering
FirehoseFilter string `config:"firehose_filter"`

// ShardID when retrieving events from loggregator, sharing this ID across
// multiple filebeats will shard the load of receiving and sending events.
ShardID string `config:"shard_id" validate:"required"`
Expand Down
47 changes: 37 additions & 10 deletions x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
)

type DopplerCallbacks struct {
Filter string
Log func(evt Event)
Metric func(evt Event)
Error func(evt EventError)
Expand Down Expand Up @@ -80,23 +81,49 @@ func (c *DopplerConsumer) Run() {
}

func (c *DopplerConsumer) logsFirehose() {
c.firehose(c.callbacks.Log, consumer.LogMessages)
filter := FirehoseFilterAllLogs
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are sure that FirehoseFilterAllLogs and FirehoseFilterLogs collect the same logs, but FirehoseFilterLogs is much more efficient, wdyt about using FirehoseFilterLogs here and don't expose the option to the user?

Suggested change
filter := FirehoseFilterAllLogs
filter := FirehoseFilterLogs

if c.callbacks.Filter != "" {
filter = c.callbacks.Filter
}
filterFn, envelopeFilter := selectFilter(filter)
c.firehose(c.callbacks.Log, filterFn, envelopeFilter)
}

func (c *DopplerConsumer) metricsFirehose() {
c.firehose(c.callbacks.Metric, consumer.Metrics)
filter := FirehoseFilterMetrics
if c.callbacks.Filter != "" {
filter = c.callbacks.Filter
}
filterFn, envelopeFilter := selectFilter(filter)
c.firehose(c.callbacks.Metric, filterFn, envelopeFilter)
}

func selectFilter(firehoseFilter string) (func(*events.Envelope) bool, consumer.EnvelopeFilter) {
switch firehoseFilter {
case FirehoseFilterAll:
return filterNoFilter, -1
case FirehoseFilterAllLogs:
// Requests all events, and selects log-like events.
return filterLogs, -1
case FirehoseFilterLogs:
// Uses filter-type=logs in requests to the firehose.
return filterLogs, consumer.LogMessages
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM to use both filters here.

case FirehoseFilterMetrics:
// Uses filter-type=metrics in requests to the firehose.
return filterNoFilter, consumer.Metrics
default:
// TODO: Handle unknown filters.
return filterNoFilter, -1
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should decide if we want some kind of error handling here, or if we just consider all the default in this case.

}
}

func (c *DopplerConsumer) firehose(cb func(evt Event), filter consumer.EnvelopeFilter) {
func (c *DopplerConsumer) firehose(cb func(evt Event), filterFn func(*events.Envelope) bool, filter consumer.EnvelopeFilter) {
var msgChan <-chan *events.Envelope
var errChan <-chan error
filterFn := filterNoFilter
if filter == consumer.LogMessages {
// We are interested in more envelopes than the ones obtained when filtering
// by log messages, retrieve them all and filter later.
// If this causes performance or other problems, we will have to investigate
// if it is possible to pass different filters to the firehose url.
filterFn = filterLogs
if filterFn == nil {
filterFn = filterNoFilter
}
if filter == consumer.LogMessages || filter == consumer.Metrics {
msgChan, errChan = c.consumer.Firehose(c.subscriptionID, "")
} else {
msgChan, errChan = c.consumer.FilteredFirehose(c.subscriptionID, "", filter)
Expand Down