Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/receiver_creator] Add support for enabling logs' collecting from K8s hints #36581

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/f_hints_logs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receivercreator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for starting logs' collection based on provided k8s annotations' hints

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34427]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
99 changes: 96 additions & 3 deletions receiver/receivercreator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,8 @@ receiver_creator/metrics:
# ignore_receivers: []
```

Find bellow the supported annotations that user can define to automatically enable receivers to start collecting metrics signals from the target Pods/containers.
Find bellow the supported annotations that user can define to automatically enable receivers to start
collecting metrics and logs signals from the target Pods/containers.

### Supported metrics annotations

Expand Down Expand Up @@ -511,11 +512,76 @@ The current implementation relies on the implementation of `k8sobserver` extensi
the [pod_endpoint](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.111.0/extension/observer/k8sobserver/pod_endpoint.go).
The hints are evaluated per container by extracting the annotations from each [`Port` endpoint](#Port) that is emitted.

### Supported logs annotations

This feature enables `filelog` receiver in order to collect logs from the discovered Pods.

#### Enable/disable discovery

`io.opentelemetry.discovery.logs/enabled` (Required. Example: `"true"`)

By default `"false"`.

#### Define configuration

The default configuration for the `filelog` receiver is the following:

```yaml
include:
- /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log
include_file_name: false
include_file_path: true
operators:
- id: container-parser
type: container
```
This default can be extended or overridden using the respective annotation:
`io.opentelemetry.discovery.logs/config`

**Example:**

```yaml
io.opentelemetry.discovery.logs/config: |
include_file_name: true
max_log_size: "2MiB"
operators:
- type: container
id: container-parser
- type: regex_parser
regex: "^(?P<time>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$"
```

`include` cannot be overridden and is fixed to discovered container's log file path.

#### Support multiple target containers

Users can target the annotation to a specific container by suffixing it with the name of that container:
`io.opentelemetry.discovery.logs.<container_name>/endpoint`.
For example:
```yaml
io.opentelemetry.discovery.logs.busybox/config: |
max_log_size: "3MiB"
operators:
- type: container
id: container-parser
- id: some
type: add
field: attributes.tag
value: hints
```
where `busybox` is the name of the target container.

If a Pod is annotated with both container level hints and pod level hints the container level hints have priority and
the Pod level hints are used as a fallback (see detailed example bellow).

The current implementation relies on the implementation of `k8sobserver` extension and specifically
the [pod_endpoint](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.111.0/extension/observer/k8sobserver/pod_endpoint.go).
The hints are evaluated per container by extracting the annotations from each [`Pod Container` endpoint](#Pod Container) that is emitted.


### Examples

#### Metrics example
#### Metrics and Logs example

Collector's configuration:
```yaml
Expand All @@ -525,12 +591,22 @@ receivers:
discovery:
enabled: true
receivers:

receiver_creator/logs:
watch_observers: [ k8s_observer ]
discovery:
enabled: true
receivers:

service:
extensions: [ k8s_observer]
pipelines:
metrics:
receivers: [ receiver_creator ]
receivers: [ receiver_creator/metrics ]
processors: []
exporters: [ debug ]
logs:
receivers: [ receiver_creator/logs ]
processors: []
exporters: [ debug ]
```
Expand Down Expand Up @@ -600,6 +676,23 @@ spec:
endpoint: "http://`endpoint`/nginx_status"
collection_interval: "30s"
timeout: "20s"

# redis pod container logs hints
io.opentelemetry.discovery.logs.redis/enabled: "true"
io.opentelemetry.discovery.logs.redis/config: |
max_log_size: "4MiB"
operators:
- type: container
id: container-parser
- id: some
type: add
field: attributes.tag
value: logs_hints

# nginx pod container logs hints
io.opentelemetry.discovery.logs.webserver/enabled: "true"
io.opentelemetry.discovery.logs.webserver/config: |
max_log_size: "3MiB"
spec:
volumes:
- name: nginx-conf
Expand Down
37 changes: 37 additions & 0 deletions receiver/receivercreator/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,40 @@ func (*nopWithoutEndpointFactory) CreateTraces(
cfg: cfg,
}, nil
}

type nopWithFilelogConfig struct {
Include []string `mapstructure:"include"`
IncludeFileName bool `mapstructure:"include_file_name"`
IncludeFilePath bool `mapstructure:"include_file_path"`
Operators []any `mapstructure:"operators"`
}

type nopWithFilelogFactory struct {
rcvr.Factory
}

type nopWithFilelogReceiver struct {
mockComponent
consumer.Logs
consumer.Metrics
consumer.Traces
rcvr.Settings
cfg component.Config
}

func (*nopWithFilelogFactory) CreateDefaultConfig() component.Config {
return &nopWithFilelogConfig{}
}

func (*nopWithFilelogFactory) CreateLogs(
_ context.Context,
rcs rcvr.Settings,
cfg component.Config,
nextConsumer consumer.Logs,
) (rcvr.Logs, error) {
return &nopWithEndpointReceiver{
Logs: nextConsumer,
Settings: rcs,
cfg: cfg,
}, nil
}
98 changes: 94 additions & 4 deletions receiver/receivercreator/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ const (

// hint suffix for metrics
otelMetricsHints = otelHints + ".metrics"
otelLogsHints = otelHints + ".logs"

// hints definitions
discoveryEnabledHint = "enabled"
scraperHint = "scraper"
configHint = "config"

logsReceiver = "filelog"
defaultLogPathPattern = "/var/log/pods/%s_%s_%s/%s/*.log"
)

// k8sHintsBuilder creates configurations from hints provided as Pod's annotations.
Expand Down Expand Up @@ -57,7 +61,7 @@ func (builder *k8sHintsBuilder) createReceiverTemplateFromHints(env observer.End
return nil, fmt.Errorf("could not get endpoint type: %v", zap.Any("env", env))
}

if endpointType != string(observer.PortType) {
if endpointType != string(observer.PortType) && endpointType != string(observer.PodContainerType) {
return nil, nil
}

Expand All @@ -72,7 +76,14 @@ func (builder *k8sHintsBuilder) createReceiverTemplateFromHints(env observer.End
return nil, nil
}

return builder.createScraper(pod.Annotations, env)
switch endpointType {
case string(observer.PortType):
return builder.createScraper(pod.Annotations, env)
case string(observer.PodContainerType):
return builder.createLogsReceiver(pod.Annotations, env)
default:
return nil, nil
}
}

func (builder *k8sHintsBuilder) createScraper(
Expand All @@ -91,7 +102,7 @@ func (builder *k8sHintsBuilder) createScraper(
port = p.Port
pod := p.Pod

if !discoveryMetricsEnabled(annotations, otelMetricsHints, fmt.Sprint(port)) {
if !discoveryEnabled(annotations, otelMetricsHints, fmt.Sprint(port)) {
return nil, nil
}

Expand All @@ -118,6 +129,48 @@ func (builder *k8sHintsBuilder) createScraper(
return &recTemplate, err
}

func (builder *k8sHintsBuilder) createLogsReceiver(
annotations map[string]string,
env observer.EndpointEnv,
) (*receiverTemplate, error) {
if _, ok := builder.ignoreReceivers[logsReceiver]; ok {
// receiver is ignored
return nil, nil
}

var containerName string
var c observer.PodContainer
err := mapstructure.Decode(env, &c)
if err != nil {
return nil, fmt.Errorf("could not extract pod's container: %v", zap.Any("env", env))
}
if c.Name == "" {
return nil, fmt.Errorf("could not extract container name: %v", zap.Any("container", c))
}
containerName = c.Name
pod := c.Pod

if !discoveryEnabled(annotations, otelLogsHints, containerName) {
return nil, nil
}

subreceiverKey := logsReceiver
builder.logger.Debug("handling added hinted receiver", zap.Any("subreceiverKey", subreceiverKey))

userConfMap := createLogsConfig(
annotations,
containerName,
pod.UID,
pod.Name,
pod.Namespace,
builder.logger)

recTemplate, err := newReceiverTemplate(fmt.Sprintf("%v/%v_%v", subreceiverKey, pod.UID, containerName), userConfMap)
recTemplate.signals = receiverSignals{false, true, false}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid using positional values in struct initialization. It's easy to break by changing the struct's fields and unclear what false, true, false is. I see it's done for metrics as well. Please file a separate PR to fix metrics


return &recTemplate, err
}

func getScraperConfFromAnnotations(
annotations map[string]string,
defaultEndpoint, scopeSuffix string,
Expand Down Expand Up @@ -149,6 +202,43 @@ func getScraperConfFromAnnotations(
return conf, nil
}

func createLogsConfig(
annotations map[string]string,
containerName, podUID, podName, namespace string,
logger *zap.Logger,
) userConfigMap {
scopeSuffix := containerName
logPath := fmt.Sprintf(defaultLogPathPattern, namespace, podName, podUID, containerName)
cont := []any{map[string]any{"id": "container-parser", "type": "container"}}
defaultConfMap := userConfigMap{
"include": []string{logPath},
"include_file_path": true,
"include_file_name": false,
"operators": cont,
}

configStr, found := getHintAnnotation(annotations, otelLogsHints, configHint, scopeSuffix)
if !found || configStr == "" {
return defaultConfMap
}

userConf := make(map[string]any)
if err := yaml.Unmarshal([]byte(configStr), &userConf); err != nil {
logger.Debug("could not unmarshal configuration from hint", zap.Error(err))
}

for k, v := range userConf {
if k == "include" {
// path cannot be other than the one of the target container
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log here to warn users?

} else {
defaultConfMap[k] = v
dmitryax marked this conversation as resolved.
Show resolved Hide resolved
}
}

return defaultConfMap
}

func getHintAnnotation(annotations map[string]string, hintBase string, hintKey string, suffix string) (string, bool) {
// try to scope the hint more on container level by suffixing
// with .<port> in case of Port event or # TODO: .<container_name> in case of Pod Container event
Expand All @@ -162,7 +252,7 @@ func getHintAnnotation(annotations map[string]string, hintBase string, hintKey s
return podLevelHint, ok
}

func discoveryMetricsEnabled(annotations map[string]string, hintBase string, scopeSuffix string) bool {
func discoveryEnabled(annotations map[string]string, hintBase string, scopeSuffix string) bool {
enabledHint, found := getHintAnnotation(annotations, hintBase, discoveryEnabledHint, scopeSuffix)
if !found {
return false
Expand Down
Loading
Loading