Skip to content

Commit

Permalink
[receiver/receiver_creator] Add support for logs' hints
Browse files Browse the repository at this point in the history
Signed-off-by: ChrsMark <[email protected]>
  • Loading branch information
ChrsMark committed Dec 10, 2024
1 parent 040cb47 commit 9a21436
Show file tree
Hide file tree
Showing 8 changed files with 656 additions and 15 deletions.
27 changes: 27 additions & 0 deletions .chloggen/f_hints_logs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receivercreator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for starting logs' collection based on provided k8s annotations' hints

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34427]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
103 changes: 100 additions & 3 deletions receiver/receivercreator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,8 @@ receiver_creator/metrics:
# ignore_receivers: []
```

Find bellow the supported annotations that user can define to automatically enable receivers to start collecting metrics signals from the target Pods/containers.
Find bellow the supported annotations that user can define to automatically enable receivers to start
collecting metrics signals from the target Pods/containers.

### Supported metrics annotations

Expand Down Expand Up @@ -511,11 +512,82 @@ The current implementation relies on the implementation of `k8sobserver` extensi
the [pod_endpoint](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.111.0/extension/observer/k8sobserver/pod_endpoint.go).
The hints are evaluated per container by extracting the annotations from each [`Port` endpoint](#Port) that is emitted.

### Supported logs annotations

This feature enables `filelog` receiver along with the `container` parser in order to collect logs from the discovered
Pods.

#### Enable/disable discovery

`io.opentelemetry.discovery.logs/enabled` (Required. Example: `"true"`)

By default `"false"`.

The following configuration can be used:

```yaml
receiver_creator/logs:
watch_observers: [ k8s_observer ]
discovery:
enabled: true
```
#### Define configuration
The default configuration for the `filelog` receiver is the following:

```yaml
include:
- /var/log/pods/`pod.namespace`_`pod.name`_`pod.uid`/`container_name`/*.log
include_file_name: false
include_file_path: true
operators:
- id: container-parser
type: container
```
This default can be extended using the respective annotation:
`io.opentelemetry.discovery.logs/config`

**Example:**

```yaml
io.opentelemetry.discovery.logs/config: |
include_file_name: true
max_log_size: "2MiB"
operators:
- type: regex_parser
regex: "^(?P<time>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<sev>[A-Z]*) (?P<msg>.*)$"
```

Note that individual settings are overridden by the configuration provided by the hints while the operators list
is extended keeping first the `container` parser.


#### Support multiple target containers

Users can target the annotation to a specific container by suffixing it with the name of that container:
`io.opentelemetry.discovery.logs.<container_name>/endpoint`.
For example:
```yaml
io.opentelemetry.discovery.logs.busybox/config: |
max_log_size: "3MiB"
operators:
- id: some
type: add
field: attributes.tag
value: hints
```
where `busybox` is the name of the target container.

If a Pod is annotated with both container level hints and pod level hints the container level hints have priority and
the Pod level hints are used as a fallback (see detailed example bellow).

The current implementation relies on the implementation of `k8sobserver` extension and specifically
the [pod_endpoint](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.111.0/extension/observer/k8sobserver/pod_endpoint.go).
The hints are evaluated per container by extracting the annotations from each [`Pod Container` endpoint](#Pod Container) that is emitted.


### Examples

#### Metrics example
#### Metrics and Logs example

Collector's configuration:
```yaml
Expand All @@ -525,12 +597,22 @@ receivers:
discovery:
enabled: true
receivers:
receiver_creator/logs:
watch_observers: [ k8s_observer ]
discovery:
enabled: true
receivers:
service:
extensions: [ k8s_observer]
pipelines:
metrics:
receivers: [ receiver_creator ]
receivers: [ receiver_creator/metrics ]
processors: []
exporters: [ debug ]
logs:
receivers: [ receiver_creator/logs ]
processors: []
exporters: [ debug ]
```
Expand Down Expand Up @@ -600,6 +682,21 @@ spec:
endpoint: "http://`endpoint`/nginx_status"
collection_interval: "30s"
timeout: "20s"

# redis pod container logs hints
io.opentelemetry.discovery.logs.redis/enabled: "true"
io.opentelemetry.discovery.logs.redis/config: |
max_log_size: "4MiB"
operators:
- id: some
type: add
field: attributes.tag
value: logs_hints
# nginx pod container logs hints
io.opentelemetry.discovery.logs.webserver/enabled: "true"
io.opentelemetry.discovery.logs.webserver/config: |
max_log_size: "3MiB"
spec:
volumes:
- name: nginx-conf
Expand Down
37 changes: 37 additions & 0 deletions receiver/receivercreator/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,40 @@ func (*nopWithoutEndpointFactory) CreateTraces(
cfg: cfg,
}, nil
}

type nopWithFilelogConfig struct {
Include []string `mapstructure:"include"`
IncludeFileName bool `mapstructure:"include_file_name"`
IncludeFilePath bool `mapstructure:"include_file_path"`
Operators []any `mapstructure:"operators"`
}

type nopWithFilelogFactory struct {
rcvr.Factory
}

type nopWithFilelogReceiver struct {
mockComponent
consumer.Logs
consumer.Metrics
consumer.Traces
rcvr.Settings
cfg component.Config
}

func (*nopWithFilelogFactory) CreateDefaultConfig() component.Config {
return &nopWithFilelogConfig{}
}

func (*nopWithFilelogFactory) CreateLogs(
_ context.Context,
rcs rcvr.Settings,
cfg component.Config,
nextConsumer consumer.Logs,
) (rcvr.Logs, error) {
return &nopWithEndpointReceiver{
Logs: nextConsumer,
Settings: rcs,
cfg: cfg,
}, nil
}
103 changes: 99 additions & 4 deletions receiver/receivercreator/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@ const (

// hint suffix for metrics
otelMetricsHints = otelHints + ".metrics"
otelLogsHints = otelHints + ".logs"

// hints definitions
discoveryEnabledHint = "enabled"
scraperHint = "scraper"
configHint = "config"

logsReceiver = "filelog"
defaultLogPathPattern = "/var/log/pods/%s_%s_%s/%s/*.log"
filelogOperatorsConfigKey = "operators"
)

// k8sHintsBuilder creates configurations from hints provided as Pod's annotations.
Expand Down Expand Up @@ -57,7 +62,7 @@ func (builder *k8sHintsBuilder) createReceiverTemplateFromHints(env observer.End
return nil, fmt.Errorf("could not get endpoint type: %v", zap.Any("env", env))
}

if endpointType != string(observer.PortType) {
if endpointType != string(observer.PortType) && endpointType != string(observer.PodContainerType) {
return nil, nil
}

Expand All @@ -72,7 +77,14 @@ func (builder *k8sHintsBuilder) createReceiverTemplateFromHints(env observer.End
return nil, nil
}

return builder.createScraper(pod.Annotations, env)
switch endpointType {
case string(observer.PortType):
return builder.createScraper(pod.Annotations, env)
case string(observer.PodContainerType):
return builder.createLogsReceiver(pod.Annotations, env)
default:
return nil, nil
}
}

func (builder *k8sHintsBuilder) createScraper(
Expand All @@ -91,7 +103,7 @@ func (builder *k8sHintsBuilder) createScraper(
port = p.Port
pod := p.Pod

if !discoveryMetricsEnabled(annotations, otelMetricsHints, fmt.Sprint(port)) {
if !discoveryEnabled(annotations, otelMetricsHints, fmt.Sprint(port)) {
return nil, nil
}

Expand All @@ -118,6 +130,48 @@ func (builder *k8sHintsBuilder) createScraper(
return &recTemplate, err
}

func (builder *k8sHintsBuilder) createLogsReceiver(
annotations map[string]string,
env observer.EndpointEnv,
) (*receiverTemplate, error) {
if _, ok := builder.ignoreReceivers[logsReceiver]; ok {
// receiver is ignored
return nil, nil
}

var containerName string
var c observer.PodContainer
err := mapstructure.Decode(env, &c)
if err != nil {
return nil, fmt.Errorf("could not extract pod's container: %v", zap.Any("env", env))
}
if c.Name == "" {
return nil, fmt.Errorf("could not extract container name: %v", zap.Any("container", c))
}
containerName = c.Name
pod := c.Pod

if !discoveryEnabled(annotations, otelLogsHints, containerName) {
return nil, nil
}

subreceiverKey := logsReceiver
builder.logger.Debug("handling added hinted receiver", zap.Any("subreceiverKey", subreceiverKey))

userConfMap := createLogsConfig(
annotations,
containerName,
pod.UID,
pod.Name,
pod.Namespace,
builder.logger)

recTemplate, err := newReceiverTemplate(fmt.Sprintf("%v/%v_%v", subreceiverKey, pod.UID, containerName), userConfMap)
recTemplate.signals = receiverSignals{false, true, false}

return &recTemplate, err
}

func getScraperConfFromAnnotations(
annotations map[string]string,
defaultEndpoint, scopeSuffix string,
Expand Down Expand Up @@ -149,6 +203,47 @@ func getScraperConfFromAnnotations(
return conf, nil
}

func createLogsConfig(
annotations map[string]string,
containerName, podUID, podName, namespace string,
logger *zap.Logger,
) userConfigMap {
scopeSuffix := containerName
logPath := fmt.Sprintf(defaultLogPathPattern, namespace, podName, podUID, containerName)
cont := []any{map[string]any{"id": "container-parser", "type": "container"}}
defaultConfMap := userConfigMap{
"include": []string{logPath},
"include_file_path": true,
"include_file_name": false,
filelogOperatorsConfigKey: cont,
}

configStr, found := getHintAnnotation(annotations, otelLogsHints, configHint, scopeSuffix)
if !found || configStr == "" {
return defaultConfMap
}

userConf := make(map[string]any)
if err := yaml.Unmarshal([]byte(configStr), &userConf); err != nil {
logger.Debug("could not unmarshal configuration from hint", zap.Error(err))
}

for k, v := range userConf {
if k == filelogOperatorsConfigKey {
vlist, ok := v.([]any)
if !ok {
logger.Debug("could not parse operators configuration from hint", zap.Any("config", userConf))
}
vlist = append(cont, vlist...)
defaultConfMap[k] = vlist
} else {
defaultConfMap[k] = v
}
}

return defaultConfMap
}

func getHintAnnotation(annotations map[string]string, hintBase string, hintKey string, suffix string) (string, bool) {
// try to scope the hint more on container level by suffixing
// with .<port> in case of Port event or # TODO: .<container_name> in case of Pod Container event
Expand All @@ -162,7 +257,7 @@ func getHintAnnotation(annotations map[string]string, hintBase string, hintKey s
return podLevelHint, ok
}

func discoveryMetricsEnabled(annotations map[string]string, hintBase string, scopeSuffix string) bool {
func discoveryEnabled(annotations map[string]string, hintBase string, scopeSuffix string) bool {
enabledHint, found := getHintAnnotation(annotations, hintBase, discoveryEnabledHint, scopeSuffix)
if !found {
return false
Expand Down
Loading

0 comments on commit 9a21436

Please sign in to comment.