Skip to content

Commit

Permalink
Merge pull request #768 from ksdpmx/kinesis
Browse files Browse the repository at this point in the history
feat: add AWS Kinesis Data Streams output plugin for Fluent Bit
  • Loading branch information
benjaminhuo authored Jun 2, 2023
2 parents 473a7d5 + 2f9ce89 commit 60cf1e8
Show file tree
Hide file tree
Showing 12 changed files with 587 additions and 6 deletions.
2 changes: 2 additions & 0 deletions apis/fluentbit/v1alpha2/clusteroutput_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type OutputSpec struct {
DataDog *output.DataDog `json:"datadog,omitempty"`
// Firehose defines Firehose Output configuration.
Firehose *output.Firehose `json:"firehose,omitempty"`
// Kinesis defines Kinesis Output configuration.
Kinesis *output.Kinesis `json:"kinesis,omitempty"`
// Stackdriver defines Stackdriver Output Configuration
Stackdriver *output.Stackdriver `json:"stackdriver,omitempty"`
// Splunk defines Splunk Output Configuration
Expand Down
78 changes: 78 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/output/kinesis_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package output

import (
"fmt"
"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/params"
)

// +kubebuilder:object:generate:=true

// The Kinesis output plugin, allows to ingest your records into AWS Kinesis. <br />
// It uses the new high performance and highly efficient kinesis plugin is called kinesis_streams instead of the older Golang Fluent Bit plugin released in 2019.
// https://docs.fluentbit.io/manual/pipeline/outputs/kinesis <br />
// https://github.com/aws/amazon-kinesis-streams-for-fluent-bit <br />
type Kinesis struct {
// The AWS region.
Region string `json:"region"`
// The name of the Kinesis Streams Delivery stream that you want log records sent to.
Stream string `json:"stream"`
// Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis.
TimeKey string `json:"timeKey,omitempty"`
// strftime compliant format string for the timestamp; for example, the default is '%Y-%m-%dT%H:%M:%S'. Supports millisecond precision with '%3N' and supports nanosecond precision with '%9N' and '%L'; for example, adding '%3N' to support millisecond '%Y-%m-%dT%H:%M:%S.%3N'. This option is used with time_key.
TimeKeyFormat string `json:"timeKeyFormat,omitempty"`
// By default, the whole log record will be sent to Kinesis. If you specify a key name with this option, then only the value of that key will be sent to Kinesis. For example, if you are using the Fluentd Docker log driver, you can specify log_key log and only the log message will be sent to Kinesis.
LogKey string `json:"logKey,omitempty"`
// ARN of an IAM role to assume (for cross account access).
RoleARN string `json:"roleARN,omitempty"`
// Specify a custom endpoint for the Kinesis API.
Endpoint string `json:"endpoint,omitempty"`
// Custom endpoint for the STS API.
STSEndpoint string `json:"stsEndpoint,omitempty"`
// Immediately retry failed requests to AWS services once. This option does not affect the normal Fluent Bit retry mechanism with backoff. Instead, it enables an immediate retry with no delay for networking errors, which may help improve throughput when there are transient/random networking issues. This option defaults to true.
AutoRetryRequests *bool `json:"autoRetryRequests,omitempty"`
// Specify an external ID for the STS API, can be used with the role_arn parameter if your role requires an external ID.
ExternalID string `json:"externalID,omitempty"`
}

// Name implement Section() method
func (*Kinesis) Name() string {
return "kinesis_streams"
}

// Params implement Section() method
func (k *Kinesis) Params(_ plugins.SecretLoader) (*params.KVs, error) {
kvs := params.NewKVs()
if k.Region != "" {
kvs.Insert("region", k.Region)
}
if k.Stream != "" {
kvs.Insert("stream", k.Stream)
}
if k.TimeKey != "" {
kvs.Insert("time_key", k.TimeKey)
}
if k.TimeKeyFormat != "" {
kvs.Insert("time_key_format", k.TimeKeyFormat)
}
if k.LogKey != "" {
kvs.Insert("log_key", k.LogKey)
}
if k.RoleARN != "" {
kvs.Insert("role_arn", k.RoleARN)
}
if k.Endpoint != "" {
kvs.Insert("endpoint", k.Endpoint)
}
if k.STSEndpoint != "" {
kvs.Insert("sts_endpoint", k.STSEndpoint)
}
if k.AutoRetryRequests != nil {
kvs.Insert("auto_retry_requests", fmt.Sprint(*k.AutoRetryRequests))
}
if k.ExternalID != "" {
kvs.Insert("external_id", k.ExternalID)
}

return kvs, nil
}
43 changes: 43 additions & 0 deletions apis/fluentbit/v1alpha2/plugins/output/kinesis_types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package output

import (
"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins"
"github.com/fluent/fluent-operator/v2/apis/fluentbit/v1alpha2/plugins/params"
"github.com/onsi/gomega"
"testing"
)

func TestOutput_Kinesis_Params(t *testing.T) {
g := gomega.NewWithT(t)

sl := plugins.NewSecretLoader(nil, "test namespace")

ki := Kinesis{
Region: "us-east-1",
Stream: "test_stream",
TimeKey: "test_time_key",
TimeKeyFormat: "%Y-%m-%dT%H:%M:%S.%3N",
LogKey: "test_time_key",
RoleARN: "arn:aws:iam:test",
Endpoint: "test_endpoint",
STSEndpoint: "test_sts_endpoint",
AutoRetryRequests: ptrBool(true),
ExternalID: "test_external_id",
}

expected := params.NewKVs()
expected.Insert("region", "us-east-1")
expected.Insert("stream", "test_stream")
expected.Insert("time_key", "test_time_key")
expected.Insert("time_key_format", "%Y-%m-%dT%H:%M:%S.%3N")
expected.Insert("log_key", "test_time_key")
expected.Insert("role_arn", "arn:aws:iam:test")
expected.Insert("endpoint", "test_endpoint")
expected.Insert("sts_endpoint", "test_sts_endpoint")
expected.Insert("auto_retry_requests", "true")
expected.Insert("external_id", "test_external_id")

kvs, err := ki.Params(sl)
g.Expect(err).NotTo(gomega.HaveOccurred())
g.Expect(kvs).To(gomega.Equal(expected))
}
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,61 @@ spec:
will be used.
type: string
type: object
kinesis:
description: Kinesis defines Kinesis Output configuration.
properties:
autoRetryRequests:
description: Immediately retry failed requests to AWS services
once. This option does not affect the normal Fluent Bit retry
mechanism with backoff. Instead, it enables an immediate retry
with no delay for networking errors, which may help improve
throughput when there are transient/random networking issues.
This option defaults to true.
type: boolean
endpoint:
description: Specify a custom endpoint for the Kinesis API.
type: string
externalID:
description: Specify an external ID for the STS API, can be used
with the role_arn parameter if your role requires an external
ID.
type: string
logKey:
description: By default, the whole log record will be sent to
Kinesis. If you specify a key name with this option, then only
the value of that key will be sent to Kinesis. For example,
if you are using the Fluentd Docker log driver, you can specify
log_key log and only the log message will be sent to Kinesis.
type: string
region:
description: The AWS region.
type: string
roleARN:
description: ARN of an IAM role to assume (for cross account access).
type: string
stream:
description: The name of the Kinesis Streams Delivery stream that
you want log records sent to.
type: string
stsEndpoint:
description: Custom endpoint for the STS API.
type: string
timeKey:
description: Add the timestamp to the record under this key. By
default the timestamp from Fluent Bit will not be added to records
sent to Kinesis.
type: string
timeKeyFormat:
description: strftime compliant format string for the timestamp;
for example, the default is '%Y-%m-%dT%H:%M:%S'. Supports millisecond
precision with '%3N' and supports nanosecond precision with
'%9N' and '%L'; for example, adding '%3N' to support millisecond
'%Y-%m-%dT%H:%M:%S.%3N'. This option is used with time_key.
type: string
required:
- region
- stream
type: object
logLevel:
description: 'Set the plugin''s logging verbosity level. Allowed values
are: off, error, warn, info, debug and trace, Defaults to the SERVICE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,61 @@ spec:
will be used.
type: string
type: object
kinesis:
description: Kinesis defines Kinesis Output configuration.
properties:
autoRetryRequests:
description: Immediately retry failed requests to AWS services
once. This option does not affect the normal Fluent Bit retry
mechanism with backoff. Instead, it enables an immediate retry
with no delay for networking errors, which may help improve
throughput when there are transient/random networking issues.
This option defaults to true.
type: boolean
endpoint:
description: Specify a custom endpoint for the Kinesis API.
type: string
externalID:
description: Specify an external ID for the STS API, can be used
with the role_arn parameter if your role requires an external
ID.
type: string
logKey:
description: By default, the whole log record will be sent to
Kinesis. If you specify a key name with this option, then only
the value of that key will be sent to Kinesis. For example,
if you are using the Fluentd Docker log driver, you can specify
log_key log and only the log message will be sent to Kinesis.
type: string
region:
description: The AWS region.
type: string
roleARN:
description: ARN of an IAM role to assume (for cross account access).
type: string
stream:
description: The name of the Kinesis Streams Delivery stream that
you want log records sent to.
type: string
stsEndpoint:
description: Custom endpoint for the STS API.
type: string
timeKey:
description: Add the timestamp to the record under this key. By
default the timestamp from Fluent Bit will not be added to records
sent to Kinesis.
type: string
timeKeyFormat:
description: strftime compliant format string for the timestamp;
for example, the default is '%Y-%m-%dT%H:%M:%S'. Supports millisecond
precision with '%3N' and supports nanosecond precision with
'%9N' and '%L'; for example, adding '%3N' to support millisecond
'%Y-%m-%dT%H:%M:%S.%3N'. This option is used with time_key.
type: string
required:
- region
- stream
type: object
logLevel:
description: 'Set the plugin''s logging verbosity level. Allowed values
are: off, error, warn, info, debug and trace, Defaults to the SERVICE
Expand Down
12 changes: 6 additions & 6 deletions cmd/doc-gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type DocumentsLocation struct {

// Inspired by coreos/prometheus-operator: https://github.com/coreos/prometheus-operator
func main() {
var pluginsLoactions = []DocumentsLocation{
var pluginsLocations = []DocumentsLocation{
{
path: fluentbitPluginPath,
name: "fluentbit",
Expand All @@ -65,9 +65,9 @@ func main() {
name: "fluentd",
},
}
plugins(pluginsLoactions)
plugins(pluginsLocations)

var crdsLoactions = []DocumentsLocation{
var crdsLocations = []DocumentsLocation{
{
path: fluentbitCrdsPath,
name: "fluentbit",
Expand All @@ -77,7 +77,7 @@ func main() {
name: "fluentd",
},
}
crds(crdsLoactions)
crds(crdsLocations)
}

func plugins(docsLocations []DocumentsLocation) {
Expand Down Expand Up @@ -140,8 +140,8 @@ func plugins(docsLocations []DocumentsLocation) {
}
}

func crds(docsLoactions []DocumentsLocation) {
for _, dl := range docsLoactions {
func crds(docsLocations []DocumentsLocation) {
for _, dl := range docsLocations {
var srcs []string
err := filepath.Walk(dl.path, func(path string, info os.FileInfo, err error) error {
if err != nil {
Expand Down
55 changes: 55 additions & 0 deletions config/crd/bases/fluentbit.fluent.io_clusteroutputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,61 @@ spec:
will be used.
type: string
type: object
kinesis:
description: Kinesis defines Kinesis Output configuration.
properties:
autoRetryRequests:
description: Immediately retry failed requests to AWS services
once. This option does not affect the normal Fluent Bit retry
mechanism with backoff. Instead, it enables an immediate retry
with no delay for networking errors, which may help improve
throughput when there are transient/random networking issues.
This option defaults to true.
type: boolean
endpoint:
description: Specify a custom endpoint for the Kinesis API.
type: string
externalID:
description: Specify an external ID for the STS API, can be used
with the role_arn parameter if your role requires an external
ID.
type: string
logKey:
description: By default, the whole log record will be sent to
Kinesis. If you specify a key name with this option, then only
the value of that key will be sent to Kinesis. For example,
if you are using the Fluentd Docker log driver, you can specify
log_key log and only the log message will be sent to Kinesis.
type: string
region:
description: The AWS region.
type: string
roleARN:
description: ARN of an IAM role to assume (for cross account access).
type: string
stream:
description: The name of the Kinesis Streams Delivery stream that
you want log records sent to.
type: string
stsEndpoint:
description: Custom endpoint for the STS API.
type: string
timeKey:
description: Add the timestamp to the record under this key. By
default the timestamp from Fluent Bit will not be added to records
sent to Kinesis.
type: string
timeKeyFormat:
description: strftime compliant format string for the timestamp;
for example, the default is '%Y-%m-%dT%H:%M:%S'. Supports millisecond
precision with '%3N' and supports nanosecond precision with
'%9N' and '%L'; for example, adding '%3N' to support millisecond
'%Y-%m-%dT%H:%M:%S.%3N'. This option is used with time_key.
type: string
required:
- region
- stream
type: object
logLevel:
description: 'Set the plugin''s logging verbosity level. Allowed values
are: off, error, warn, info, debug and trace, Defaults to the SERVICE
Expand Down
Loading

0 comments on commit 60cf1e8

Please sign in to comment.