Skip to content

Commit

Permalink
Allow to export configurable set of topic configuration keys
Browse files Browse the repository at this point in the history
Added a new configuration parameter to allow defining the
set of topic configuration keys that should be exported as label pairs
in kafka_topic_info

By default only `cleanup.policy` configuration is exported

See https://kafka.apache.org/documentation/#topicconfigs
for the list of topic level configs that can be defined
  • Loading branch information
amuraru committed Jun 22, 2021
1 parent 1a777bf commit 77249c3
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 13 deletions.
4 changes: 4 additions & 0 deletions charts/kminion/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ kminion:
# # IgnoredTopics are regex strings of topic names that shall be ignored/skipped when exporting metrics. Ignored topics
# # take precedence over allowed topics.
# ignoredTopics: [ ]
# # infoMetric is a configuration object for the kminion_kafka_topic_info metric
# infoMetric:
# # ConfigKeys are set of strings of Topic configs that you want to have exported as part of the metric
# configKeys: ["cleanup.policy"]
# logDirs:
# # Enabled specifies whether log dirs shall be scraped and exported or not. This should be disabled for clusters prior
# # to version 1.0.0 as describing log dirs was not supported back then.
Expand Down
4 changes: 4 additions & 0 deletions docs/reference-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ minion:
# IgnoredTopics are regex strings of topic names that shall be ignored/skipped when exporting metrics. Ignored topics
# take precedence over allowed topics.
ignoredTopics: [ ]
# infoMetric is a configuration object for the kminion_kafka_topic_info metric
infoMetric:
# ConfigKeys are set of strings of Topic configs that you want to have exported as part of the metric
configKeys: ["cleanup.policy"]
logDirs:
# Enabled specifies whether log dirs shall be scraped and exported or not. This should be disabled for clusters prior
# to version 1.0.0 as describing log dirs was not supported back then.
Expand Down
15 changes: 14 additions & 1 deletion minion/config_topic_config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package minion

import "fmt"
import (
"fmt"
)

const (
TopicGranularityTopic string = "topic"
Expand All @@ -18,6 +20,16 @@ type TopicConfig struct {
// IgnoredTopics are regex strings of topic names that shall be ignored/skipped when exporting metrics. Ignored topics
// take precedence over allowed topics.
IgnoredTopics []string `koanf:"ignoredTopics"`

// InfoMetric configures how the kafka_topic_info metric is populated
InfoMetric InfoMetricConfig `koanf:"exporter"`
}

type InfoMetricConfig struct {
// ConfigKeys configures optional topic configuration keys that should be exported
// as prometheus metric labels.
// By default "topic_name", "partition_count", "replication_factor" and "cleanup.policy" are exported
ConfigKeys []string `koanf:"infoMetric"`
}

// Validate if provided TopicConfig is valid.
Expand Down Expand Up @@ -50,4 +62,5 @@ func (c *TopicConfig) Validate() error {
func (c *TopicConfig) SetDefaults() {
c.Granularity = TopicGranularityPartition
c.AllowedTopics = []string{"/.*/"}
c.InfoMetric = InfoMetricConfig{ConfigKeys: []string{"cleanup.policy"}}
}
1 change: 0 additions & 1 deletion minion/describe_topic_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ func (s *Service) GetTopicConfigs(ctx context.Context) (*kmsg.DescribeConfigsRes
resourceReq := kmsg.NewDescribeConfigsRequestResource()
resourceReq.ResourceType = kmsg.ConfigResourceTypeTopic
resourceReq.ResourceName = topic.Topic
resourceReq.ConfigNames = []string{"cleanup.policy"}
req.Resources = append(req.Resources, resourceReq)
}

Expand Down
27 changes: 17 additions & 10 deletions prometheus/collect_topic_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package prometheus

import (
"context"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kerr"
"go.uber.org/zap"
"strconv"
)

func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Metric) bool {
Expand Down Expand Up @@ -55,7 +56,7 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me
e.logger.Warn("failed to get metadata of a specific topic",
zap.String("topic_name", topic.Topic),
zap.Error(typedErr))
return false
continue
}
partitionCount := len(topic.Partitions)
replicationFactor := -1
Expand All @@ -64,20 +65,26 @@ func (e *Exporter) collectTopicInfo(ctx context.Context, ch chan<- prometheus.Me
replicationFactor = len(topic.Partitions[0].Replicas)
}

cleanupPolicy, exists := configsByTopic[topic.Topic]["cleanup.policy"]
if !exists {
cleanupPolicy = "N/A"
var labelsValues []string
labelsValues = append(labelsValues, topic.Topic)
labelsValues = append(labelsValues, strconv.Itoa(partitionCount))
labelsValues = append(labelsValues, strconv.Itoa(replicationFactor))
for _, key := range e.minionSvc.Cfg.Topics.InfoMetric.ConfigKeys {
labelsValues = append(labelsValues, getOrDefault(configsByTopic[topic.Topic], key, "N/A"))
}

ch <- prometheus.MustNewConstMetric(
e.topicInfo,
prometheus.GaugeValue,
float64(1),
topic.Topic,
strconv.Itoa(partitionCount),
strconv.Itoa(replicationFactor),
cleanupPolicy,
labelsValues...,
)
}
return isOk
}

func getOrDefault(m map[string]string, key string, defaultValue string) string {
if value, exists := m[key]; exists {
return value
}
return defaultValue
}
8 changes: 7 additions & 1 deletion prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package prometheus
import (
"context"
"os"
"strings"
"time"

"github.com/cloudhut/kminion/v2/minion"
Expand Down Expand Up @@ -98,10 +99,15 @@ func (e *Exporter) InitializeMetrics() {

// Topic / Partition metrics
// Topic info
var labels = []string{"topic_name", "partition_count", "replication_factor"}
for _, key := range e.minionSvc.Cfg.Topics.InfoMetric.ConfigKeys {
// prometheus does not allow . in label keys
labels = append(labels, strings.ReplaceAll(key, ".", "_"))
}
e.topicInfo = prometheus.NewDesc(
prometheus.BuildFQName(e.cfg.Namespace, "kafka", "topic_info"),
"Info labels for a given topic",
[]string{"topic_name", "partition_count", "replication_factor", "cleanup_policy"},
labels,
nil,
)
// Partition Low Water Mark
Expand Down

0 comments on commit 77249c3

Please sign in to comment.