Skip to content

Commit

Permalink
[processor/logdedup] feat: add ottl condition to logdedup processor (#…
Browse files Browse the repository at this point in the history
…35443)

**Description:**

Adds OTTL Condition field to Deduplicate Logs Processor

**Link to tracking Issue:** Closes #35440 

**Testing:** 
- Tested functionality with BindPlane
- Added unit tests for the condition logic

**Documentation:** Added documentation to the logdedup processor README
about the condition field and an example configuration with a condition.

---------

Co-authored-by: Mike Goldsmith <[email protected]>
  • Loading branch information
colelaven and MikeGoldsmith authored Oct 8, 2024
1 parent 679374a commit a4b60c7
Show file tree
Hide file tree
Showing 25 changed files with 881 additions and 67 deletions.
27 changes: 27 additions & 0 deletions .chloggen/feat_add-condition-to-logdedup-processor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: logdedupprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add a `condition` field to the Log DeDuplication Processor.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35440]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
8 changes: 8 additions & 0 deletions pkg/pdatatest/plogtest/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ func TestCompareLogs(t *testing.T) {
withoutOptions: errors.New(`resource "map[]": scope "collector": log record "map[]": timestamp doesn't match expected: 11651379494838206465, actual: 11651379494838206464`),
withOptions: nil,
},
{
name: "ignore-log-record-attribute-value",
compareOptions: []CompareLogsOption{
IgnoreLogRecordAttributeValue("Key1"),
},
withoutOptions: errors.New(`resource "map[]": scope "": missing expected log record: map[Key1:Val2]; resource "map[]": scope "": unexpected log record: map[Key1:Val1]`),
withOptions: nil,
},
}

for _, tc := range tcs {
Expand Down
38 changes: 36 additions & 2 deletions pkg/pdatatest/plogtest/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,47 @@ func (opt ignoreResourceAttributeValue) applyOnLogs(expected, actual plog.Logs)
opt.maskLogsResourceAttributeValue(actual)
}

func (opt ignoreResourceAttributeValue) maskLogsResourceAttributeValue(metrics plog.Logs) {
rls := metrics.ResourceLogs()
func (opt ignoreResourceAttributeValue) maskLogsResourceAttributeValue(logs plog.Logs) {
rls := logs.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
internal.MaskResourceAttributeValue(rls.At(i).Resource(), opt.attributeName)
}
}

// IgnoreLogRecordAttributeValue is a CompareLogsOption that sets the value of an attribute
// to empty bytes for every log record
func IgnoreLogRecordAttributeValue(attributeName string) CompareLogsOption {
return ignoreLogRecordAttributeValue{
attributeName: attributeName,
}
}

type ignoreLogRecordAttributeValue struct {
attributeName string
}

func (opt ignoreLogRecordAttributeValue) applyOnLogs(expected, actual plog.Logs) {
opt.maskLogRecordAttributeValue(expected)
opt.maskLogRecordAttributeValue(actual)
}

func (opt ignoreLogRecordAttributeValue) maskLogRecordAttributeValue(logs plog.Logs) {
rls := logs.ResourceLogs()
for i := 0; i < logs.ResourceLogs().Len(); i++ {
sls := rls.At(i).ScopeLogs()
for j := 0; j < sls.Len(); j++ {
lrs := sls.At(j).LogRecords()
for k := 0; k < lrs.Len(); k++ {
lr := lrs.At(k)
val, exists := lr.Attributes().Get(opt.attributeName)
if exists {
val.SetEmptyBytes()
}
}
}
}
}

func IgnoreTimestamp() CompareLogsOption {
return compareLogsOptionFunc(func(expected, actual plog.Logs) {
now := pcommon.NewTimestampFromTime(time.Now())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
resourceLogs:
- resource: {}
scopeLogs:
- logRecords:
- attributes:
- key: Key1
value:
stringValue: Val1
body: {}
spanId: ""
traceId: ""
scope: {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
resourceLogs:
- resource: {}
scopeLogs:
- logRecords:
- attributes:
- key: Key1
value:
stringValue: Val2
body: {}
spanId: ""
traceId: ""
scope: {}
45 changes: 38 additions & 7 deletions processor/logdedupprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ This processor is used to deduplicate logs by detecting identical logs over a ra

## How It Works
1. The user configures the log deduplication processor in the desired logs pipeline.
2. All logs sent to the processor and aggregated over the configured `interval`. Logs are considered identical if they have the same body, resource attributes, severity, and log attributes.
2. If the processor does not provide `conditions`, all logs are considered eligible for aggregation. If the processor does have configured `conditions`, all log entries where at least one of the `conditions` evaluates `true` are considered eligible for aggregation. Eligible identical logs are aggregated over the configured `interval`. Logs are considered identical if they have the same body, resource attributes, severity, and log attributes. Logs that do not match any condition in `conditions` are passed onward in the pipeline without aggregating.
3. After the interval, the processor emits a single log with the count of logs that were deduplicated. The emitted log will have the same body, resource attributes, severity, and log attributes as the original log. The emitted log will also have the following new attributes:

- `log_count`: The count of logs that were deduplicated over the interval. The name of the attribute is configurable via the `log_count_attribute` parameter.
Expand All @@ -25,13 +25,17 @@ This processor is used to deduplicate logs by detecting identical logs over a ra
**Note**: The `ObservedTimestamp` and `Timestamp` of the emitted log will be the time that the aggregated log was emitted and will not be the same as the `ObservedTimestamp` and `Timestamp` of the original logs.

## Configuration
| Field | Type | Default | Description |
| --- | --- | --- | --- |
| interval | duration | `10s` | The interval at which logs are aggregated. The counter will reset after each interval. |
| log_count_attribute | string | `log_count` | The name of the count attribute of deduplicated logs that will be added to the emitted aggregated log. |
| timezone | string | `UTC` | The timezone of the `first_observed_timestamp` and `last_observed_timestamp` timestamps on the emitted aggregated log. The available locations depend on the local IANA Time Zone database. [This page](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones) contains many examples, such as `America/New_York`. |
| exclude_fields | []string | `[]` | Fields to exclude from duplication matching. Fields can be excluded from the log `body` or `attributes`. These fields will not be present in the emitted aggregated log. Nested fields must be `.` delimited. If a field contains a `.` it can be escaped by using a `\` see [example config](#example-config-with-excluded-fields).<br><br>**Note**: The entire `body` cannot be excluded. If the body is a map then fields within it can be excluded. |
| Field | Type | Default | Description |
| --- | --- | --- | --- |
| interval | duration | `10s` | The interval at which logs are aggregated. The counter will reset after each interval. |
| conditions | []string | `[]` | A slice of [OTTL] expressions used to evaluate which log records are deduped. All paths in the [log context] are available to reference. All [converters] are available to use. |
| log_count_attribute | string | `log_count` | The name of the count attribute of deduplicated logs that will be added to the emitted aggregated log. |
| timezone | string | `UTC` | The timezone of the `first_observed_timestamp` and `last_observed_timestamp` timestamps on the emitted aggregated log. The available locations depend on the local IANA Time Zone database. [This page](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones) contains many examples, such as `America/New_York`. |
| exclude_fields | []string | `[]` | Fields to exclude from duplication matching. Fields can be excluded from the log `body` or `attributes`. These fields will not be present in the emitted aggregated log. Nested fields must be `.` delimited. If a field contains a `.` it can be escaped by using a `\` see [example config](#example-config-with-excluded-fields).<br><br>**Note**: The entire `body` cannot be excluded. If the body is a map then fields within it can be excluded. |

[OTTL]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.109.0/pkg/ottl#readme
[converters]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.109.0/pkg/ottl/ottlfuncs/README.md#converters
[log context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.109.0/pkg/ottl/contexts/ottllog/README.md

### Example Config
The following config is an example configuration for the log deduplication processor. It is configured with an aggregation interval of `60 seconds`, a timezone of `America/Los_Angeles`, and a log count attribute of `dedup_count`. It has no fields being excluded.
Expand Down Expand Up @@ -82,3 +86,30 @@ service:
processors: [logdedup]
exporters: [googlecloud]
```


### Example Config with Conditions
The following config is an example configuration that only performs the deduping process on telemetry where Attribute `ID` equals `1` OR where Resource Attribute `service.name` equals `my-service`:

```yaml
receivers:
filelog:
include: [./example/*.log]
processors:
logdedup:
conditions:
- attributes["ID"] == 1
- resource.attributes["service.name"] == "my-service"
interval: 60s
log_count_attribute: dedup_count
timezone: 'America/Los_Angeles'
exporters:
googlecloud:
service:
pipelines:
logs:
receivers: [filelog]
processors: [logdedup]
exporters: [googlecloud]
```
2 changes: 2 additions & 0 deletions processor/logdedupprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Config struct {
Interval time.Duration `mapstructure:"interval"`
Timezone string `mapstructure:"timezone"`
ExcludeFields []string `mapstructure:"exclude_fields"`
Conditions []string `mapstructure:"conditions"`
}

// createDefaultConfig returns the default config for the processor.
Expand All @@ -53,6 +54,7 @@ func createDefaultConfig() component.Config {
Interval: defaultInterval,
Timezone: defaultTimezone,
ExcludeFields: []string{},
Conditions: []string{},
}
}

Expand Down
1 change: 1 addition & 0 deletions processor/logdedupprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func TestValidateConfig(t *testing.T) {
LogCountAttribute: defaultLogCountAttribute,
Interval: defaultInterval,
Timezone: defaultTimezone,
Conditions: []string{},
ExcludeFields: []string{"body.thing", "attributes.otherthing"},
},
expectedErr: nil,
Expand Down
24 changes: 23 additions & 1 deletion processor/logdedupprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/logdedupprocessor/internal/metadata"
)

Expand All @@ -30,5 +32,25 @@ func createLogsProcessor(_ context.Context, settings processor.Settings, cfg com
return nil, fmt.Errorf("invalid config type: %+v", cfg)
}

return newProcessor(processorCfg, consumer, settings)
processor, err := newProcessor(processorCfg, consumer, settings)
if err != nil {
return nil, fmt.Errorf("error creating processor: %w", err)
}

if len(processorCfg.Conditions) == 0 {
processor.conditions = nil
} else {
conditions, err := filterottl.NewBoolExprForLog(
processorCfg.Conditions,
filterottl.StandardLogFuncs(),
ottl.PropagateError,
settings.TelemetrySettings,
)
if err != nil {
return nil, fmt.Errorf("invalid condition: %w", err)
}
processor.conditions = conditions
}

return processor, nil
}
31 changes: 31 additions & 0 deletions processor/logdedupprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,37 @@ func TestCreateLogs(t *testing.T) {
cfg: nil,
expectedErr: "invalid config type",
},
{
name: "valid custom condition",
cfg: &Config{
LogCountAttribute: defaultLogCountAttribute,
Interval: defaultInterval,
Timezone: defaultTimezone,
ExcludeFields: []string{},
Conditions: []string{"false"},
},
},
{
name: "valid multiple conditions",
cfg: &Config{
LogCountAttribute: defaultLogCountAttribute,
Interval: defaultInterval,
Timezone: defaultTimezone,
ExcludeFields: []string{},
Conditions: []string{"false", `(attributes["ID"] == 1)`},
},
},
{
name: "invalid condition",
cfg: &Config{
LogCountAttribute: defaultLogCountAttribute,
Interval: defaultInterval,
Timezone: defaultTimezone,
ExcludeFields: []string{},
Conditions: []string{"x"},
},
expectedErr: "invalid condition",
},
}

for _, tc := range testCases {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 33 additions & 2 deletions processor/logdedupprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/logde
go 1.22.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter v0.111.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.111.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.111.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.111.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.111.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.111.0
Expand All @@ -26,36 +30,63 @@ require (
)

require (
github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/antchfx/xmlquery v1.4.1 // indirect
github.com/antchfx/xpath v1.3.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/elastic/go-grok v0.3.1 // indirect
github.com/elastic/lunes v0.1.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-viper/mapstructure/v2 v2.1.0 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.1 // indirect
github.com/magefile/mage v1.15.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.111.0 // indirect
github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6 // indirect
go.opentelemetry.io/collector/component/componentstatus v0.111.0 // indirect
go.opentelemetry.io/collector/consumer/consumerprofiles v0.111.0 // indirect
go.opentelemetry.io/collector/internal/globalsignal v0.111.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.111.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.111.0 // indirect
go.opentelemetry.io/collector/pipeline v0.111.0 // indirect
go.opentelemetry.io/collector/processor/processorprofiles v0.111.0 // indirect
go.opentelemetry.io/collector/semconv v0.111.0 // indirect
go.opentelemetry.io/otel v1.30.0 // indirect
go.opentelemetry.io/otel/sdk v1.30.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
google.golang.org/grpc v1.67.1 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl => ../../pkg/ottl

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter => ../../internal/filter

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal
Loading

0 comments on commit a4b60c7

Please sign in to comment.