Skip to content

Commit

Permalink
WIP: fluentd metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
erhudy committed Jun 26, 2023
1 parent ab72254 commit 09aceaa
Show file tree
Hide file tree
Showing 18 changed files with 297 additions and 100 deletions.
9 changes: 9 additions & 0 deletions apis/fluentd/v1alpha1/fluentd_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ type FluentdSpec struct {
SecurityContext *corev1.PodSecurityContext `json:"securityContext,omitempty"`
// SchedulerName represents the desired scheduler for fluentd pods.
SchedulerName string `json:"schedulerName,omitempty"`
// EnablePrometheusMetrics will enable Prometheus metrics from fluentd.
EnablePrometheusMetrics bool `json:"enablePrometheusMetrics,omitempty"`
// MetricsPort is the port that the Prometheus metrics listener will be exposed on if
// metrics are enabled, default is 2021
// +kubebuilder:validation:Minimum:=1
// +kubebuilder:validation:Maximum:=65535
MetricsPort *int32 `json:"metricsPort,omitempty"`
// MetricsBind is the host for metrics to listen on, default is "0.0.0.0"
MetricsBind *string `json:"metricsBind,omitempty"`
}

// FluentDService the service of the FluentD
Expand Down
48 changes: 44 additions & 4 deletions apis/fluentd/v1alpha1/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins/output"
"github.com/fluent/fluent-operator/v2/apis/fluentd/v1alpha1/plugins/params"
fluentdRouter "github.com/fluent/fluent-operator/v2/pkg/fluentd/router"

"github.com/fluent/fluent-operator/v2/pkg/constants"
)

// +kubebuilder:object:generate=false
Expand All @@ -30,6 +32,7 @@ type Renderer interface {
type PluginResources struct {
InputPlugins []params.PluginStore
MainRouterPlugins params.PluginStore
MonitoringFilters []params.PluginStore
LabelPluginResources []params.PluginStore
}

Expand All @@ -45,13 +48,39 @@ type CfgResources struct {
}

// NewGlobalPluginResources represents a combined global fluentd resources
func NewGlobalPluginResources(globalId string) *PluginResources {
func NewGlobalPluginResources(globalId string, enablePrometheusMonitoring bool, metricsPort *int32, metricsBind *string) *PluginResources {
globalMainRouter := fluentdRouter.NewGlobalRouter(globalId)
return &PluginResources{
pluginResources := &PluginResources{
InputPlugins: make([]params.PluginStore, 0),
MainRouterPlugins: *globalMainRouter,
MonitoringFilters: make([]params.PluginStore, 0),
LabelPluginResources: make([]params.PluginStore, 0),
}
if enablePrometheusMonitoring {
incomingMonitoringFilter := fluentdRouter.NewIncomingMonitoringFilter()
outgoingMonitoringMatch := fluentdRouter.NewOutgoingMonitoringMatch()

var determinedMetricsPort int32
if metricsPort == nil {
determinedMetricsPort = constants.DefaultMetricsPort
} else {
determinedMetricsPort = *metricsPort
}
var determinedMetricsBind string
if metricsBind == nil {
determinedMetricsBind = constants.DefaultBind
} else {
determinedMetricsBind = *metricsBind
}

pluginResources.MonitoringFilters = append(pluginResources.MonitoringFilters, *incomingMonitoringFilter, *outgoingMonitoringMatch)

outputSources := fluentdRouter.NewMetricsExposeSources(determinedMetricsPort, determinedMetricsBind)
for _, s := range outputSources {
pluginResources.MonitoringFilters = append(pluginResources.MonitoringFilters, *s)
}
}
return pluginResources
}

func NewCfgResources() *CfgResources {
Expand Down Expand Up @@ -222,7 +251,7 @@ func (pgr *PluginResources) WithCfgResources(cfgRouteLabel string, r *CfgResourc
}

cfgLabelPlugin := params.NewPluginStore("label")
cfgLabelPlugin.InsertPairs("tag", cfgRouteLabel)
cfgLabelPlugin.Tag = cfgRouteLabel

// insert filter plugins of this fluentd config
for _, filter := range r.FilterPlugins {
Expand All @@ -240,7 +269,7 @@ func (pgr *PluginResources) WithCfgResources(cfgRouteLabel string, r *CfgResourc
return nil
}

func (pgr *PluginResources) RenderMainConfig(enableMultiWorkers bool) (string, error) {
func (pgr *PluginResources) RenderMainConfig(enableMultiWorkers bool, enablePrometheusMetrics bool) (string, error) {
if len(pgr.InputPlugins) == 0 && len(pgr.LabelPluginResources) == 0 {
return "", fmt.Errorf("no plugins detect")
}
Expand All @@ -256,6 +285,17 @@ func (pgr *PluginResources) RenderMainConfig(enableMultiWorkers bool) (string, e
buf.WriteString(pluginStore.String())
}

// sort monitoring plugins
if enablePrometheusMetrics {
monitoringPlugins := ByHashcode(pgr.MonitoringFilters)
for _, pluginStore := range monitoringPlugins {
if enableMultiWorkers {
pluginStore.SetIgnorePath()
}
buf.WriteString(pluginStore.String())
}
}

// sort main routers
childRouters := ByRouteLabelsPointers(pgr.MainRouterPlugins.Childs)
pgr.MainRouterPlugins.Childs = childRouters
Expand Down
2 changes: 1 addition & 1 deletion apis/fluentd/v1alpha1/plugins/common/buffer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (b *Buffer) Params(_ plugins.SecretLoader) (*params.PluginStore, error) {
ps.InsertPairs("timekey_wait", fmt.Sprint(*b.TimeKeyWait))
}

ps.InsertPairs("tag", b.Tag)
ps.Tag = b.Tag

if b.ChunkLimitSize != nil {
ps.InsertPairs("chunk_limit_size", *b.ChunkLimitSize)
Expand Down
11 changes: 6 additions & 5 deletions apis/fluentd/v1alpha1/plugins/filter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ func (f *Filter) Name() string {
}

func (f *Filter) Params(loader plugins.SecretLoader) (*params.PluginStore, error) {
ps := params.NewPluginStore(f.Name())
var ps *params.PluginStore
if f.Tag != nil {
ps = params.NewPluginStoreWithTag(f.Name(), fmt.Sprint(*f.Tag))
} else {
ps = params.NewPluginStore(f.Name())
}

if f.Id != nil {
ps.InsertPairs("@id", fmt.Sprint(*f.Id))
Expand All @@ -64,10 +69,6 @@ func (f *Filter) Params(loader plugins.SecretLoader) (*params.PluginStore, error
ps.InsertPairs("@log_level", fmt.Sprint(*f.LogLevel))
}

if f.Tag != nil {
ps.InsertPairs("tag", fmt.Sprint(*f.Tag))
}

if f.Grep != nil {
ps.InsertType(string(params.GrepFilterType))
return f.grepPlugin(ps, loader), nil
Expand Down
4 changes: 2 additions & 2 deletions apis/fluentd/v1alpha1/plugins/input/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (i *Input) tailPlugin(parent *params.PluginStore, loader plugins.SecretLoad
parent.InsertChilds(childs...)

if tailModel.Tag != "" {
parent.InsertPairs("tag", fmt.Sprint(tailModel.Tag))
parent.Tag = fmt.Sprint(tailModel.Tag)
}

if tailModel.Path != "" {
Expand Down Expand Up @@ -232,7 +232,7 @@ func (i *Input) forwardPlugin(parent *params.PluginStore, loader plugins.SecretL
}

if forwardModel.Tag != nil {
parent.InsertPairs("tag", fmt.Sprint(*forwardModel.Tag))
parent.Tag = fmt.Sprint(*forwardModel.Tag)
}

if forwardModel.AddTagPrefix != nil {
Expand Down
13 changes: 8 additions & 5 deletions apis/fluentd/v1alpha1/plugins/output/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ func (o *Output) Name() string {
}

func (o *Output) Params(loader plugins.SecretLoader) (*params.PluginStore, error) {
ps := params.NewPluginStore(o.Name())
var ps *params.PluginStore

if o.Tag != nil {
ps = params.NewPluginStoreWithTag(o.Name(), fmt.Sprint(*o.Tag))
} else {
ps = params.NewPluginStore(o.Name())
}

childs := make([]*params.PluginStore, 0)

ps.InsertPairs("@id", fmt.Sprint(*o.Id))
Expand All @@ -85,10 +92,6 @@ func (o *Output) Params(loader plugins.SecretLoader) (*params.PluginStore, error
ps.InsertPairs("@label", fmt.Sprint(*o.Label))
}

if o.Tag != nil {
ps.InsertPairs("tag", fmt.Sprint(*o.Tag))
}

if o.BufferSection.Buffer != nil {
child, _ := o.BufferSection.Buffer.Params(loader)
childs = append(childs, child)
Expand Down
31 changes: 19 additions & 12 deletions apis/fluentd/v1alpha1/plugins/params/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
type PluginStore struct {
// The plugin name
Name string
// Plugin routing tag
Tag string
// The key-value pairs
Store map[string]string
// The child plugins mounted here
Expand All @@ -30,6 +32,15 @@ func NewPluginStore(name string) *PluginStore {
}
}

func NewPluginStoreWithTag(name string, tag string) *PluginStore {
return &PluginStore{
Name: name,
Tag: tag,
Store: make(map[string]string),
Childs: make([]*PluginStore, 0),
}
}

func (ps *PluginStore) InsertPairs(key, value string) {
ps.Store[key] = value
}
Expand Down Expand Up @@ -60,10 +71,10 @@ func (ps *PluginStore) InsertChilds(childs ...*PluginStore) {

// The total hash string for this plugin store.
func (ps *PluginStore) Hash() string {
c := NewPluginStore(ps.Name)
c := NewPluginStoreWithTag(ps.Name, ps.Tag)

for k, v := range ps.Store {
if k == "@id" || k == "tag" {
if k == "@id" {
continue
}
c.Store[k] = v
Expand All @@ -75,7 +86,7 @@ func (ps *PluginStore) Hash() string {

// Returns tag value
func (ps *PluginStore) GetTag() string {
return ps.Store["tag"]
return ps.Tag
}

// Returns the @label value string of this plugin store.
Expand Down Expand Up @@ -131,18 +142,18 @@ func (ps *PluginStore) processHead(buf *bytes.Buffer) {
var head string
switch PluginName(ps.Name) {
case BufferPlugin:
tag, ok := ps.Store[BufferTag]
if ok {
tag := ps.Tag
if tag != "" {
head = ps.headFmtSprintf(tag)
}
case MatchPlugin:
head = ps.headFmtSprintf(ps.Store[MatchTag])
head = ps.headFmtSprintf(ps.Tag)
case FilterPlugin:
head = ps.headFmtSprintf(ps.Store[FilterTag])
head = ps.headFmtSprintf(ps.Tag)
case TransportPlugin:
head = ps.headFmtSprintf(ps.Store[ProtocolName])
case LabelPlugin:
head = ps.headFmtSprintf(ps.Store[LabelTag])
head = ps.headFmtSprintf(ps.Tag)
default:
head = fmt.Sprintf("%s<%s>\n", ps.PrefixWhitespaces, ps.Name)
}
Expand All @@ -155,10 +166,6 @@ func (ps *PluginStore) processBody(buf *bytes.Buffer) {

keys := make([]string, 0, len(ps.Store))
for k := range ps.Store {
// Don't add tag unless it is an input plugin
if k == "tag" && ps.Name != "source" {
continue
}
if ps.Name == string(BufferPlugin) && ps.IgnorePath {
continue
}
Expand Down
Loading

0 comments on commit 09aceaa

Please sign in to comment.