From 0f9ebbed21d23ce6850ba75d2a0bc8feb730f104 Mon Sep 17 00:00:00 2001 From: vlastahajek Date: Fri, 19 Jun 2020 09:14:21 +0200 Subject: [PATCH] feat: Adding possibility to set default tags --- CHANGELOG.md | 1 + api/write/options.go | 20 ++++++- api/write/options_test.go | 6 +- api/write_test.go | 19 ++++++ .../write/{writeService.go => service.go} | 60 ++++++++++++++++++- internal/write/service_test.go | 52 ++++++++++++++++ options.go | 8 +++ options_test.go | 51 +++++++++++++++- 8 files changed, 211 insertions(+), 6 deletions(-) rename internal/write/{writeService.go => service.go} (78%) create mode 100644 internal/write/service_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e38255a..75e53d80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## 1.3.0 [in progress] ### Features 1. [#131](https://github.com/influxdata/influxdb-client-go/pull/131) Labels API +1. [#136](https://github.com/influxdata/influxdb-client-go/pull/136) Possibility to specify default tags ### Bug fixes 1. [#132](https://github.com/influxdata/influxdb-client-go/pull/132) Handle unsupported write type as string instead of generating panic diff --git a/api/write/options.go b/api/write/options.go index fd3174b6..db14cd50 100644 --- a/api/write/options.go +++ b/api/write/options.go @@ -25,6 +25,8 @@ type Options struct { precision time.Duration // Whether to use GZip compression in requests. Default false useGZip bool + // Tags added to each point during writing. If a point already has a tag with the same key, it is left unchanged. + defaultTags map[string]string } // BatchSize returns size of batch @@ -104,7 +106,23 @@ func (o *Options) SetUseGZip(useGZip bool) *Options { return o } +// AddDefaultTag adds a default tag. DefaultTags are added to each written point. +// If a tag with the same key already exist it is overwritten. +// If a point already defines such a tag, it is left unchanged. +func (o *Options) AddDefaultTag(key, value string) *Options { + o.DefaultTags()[key] = value + return o +} + +// DefaultTags returns set of default tags +func (o *Options) DefaultTags() map[string]string { + if o.defaultTags == nil { + o.defaultTags = make(map[string]string) + } + return o.defaultTags +} + // DefaultOptions returns Options object with default values func DefaultOptions() *Options { - return &Options{batchSize: 5000, maxRetries: 3, retryInterval: 1000, flushInterval: 1000, precision: time.Nanosecond, useGZip: false, retryBufferLimit: 10000} + return &Options{batchSize: 5000, maxRetries: 3, retryInterval: 1000, flushInterval: 1000, precision: time.Nanosecond, useGZip: false, retryBufferLimit: 10000, defaultTags: make(map[string]string)} } diff --git a/api/write/options_test.go b/api/write/options_test.go index 974a744f..aa08d232 100644 --- a/api/write/options_test.go +++ b/api/write/options_test.go @@ -21,6 +21,7 @@ func TestDefaultOptions(t *testing.T) { assert.Equal(t, uint(10000), opts.RetryBufferLimit()) assert.Equal(t, uint(1000), opts.RetryInterval()) assert.Equal(t, uint(3), opts.MaxRetries()) + assert.Len(t, opts.DefaultTags(), 0) } func TestSettingsOptions(t *testing.T) { @@ -31,7 +32,9 @@ func TestSettingsOptions(t *testing.T) { SetPrecision(time.Millisecond). SetRetryBufferLimit(5). SetRetryInterval(5000). - SetMaxRetries(7) + SetMaxRetries(7). + AddDefaultTag("a", "1"). + AddDefaultTag("b", "2") assert.Equal(t, uint(5), opts.BatchSize()) assert.Equal(t, true, opts.UseGZip()) assert.Equal(t, uint(5000), opts.FlushInterval()) @@ -39,4 +42,5 @@ func TestSettingsOptions(t *testing.T) { assert.Equal(t, uint(5), opts.RetryBufferLimit()) assert.Equal(t, uint(5000), opts.RetryInterval()) assert.Equal(t, uint(7), opts.MaxRetries()) + assert.Len(t, opts.DefaultTags(), 2) } diff --git a/api/write_test.go b/api/write_test.go index 7ae0cab0..65e4db75 100644 --- a/api/write_test.go +++ b/api/write_test.go @@ -181,6 +181,25 @@ func genRecords(num int) []string { return lines } +func TestWriteApiWriteDefaultTag(t *testing.T) { + service := newTestService(t, "http://localhost:8888") + opts := write.DefaultOptions(). + SetBatchSize(1) + opts.AddDefaultTag("dft", "a") + writeApi := NewWriteApiImpl("my-org", "my-bucket", service, opts) + point := write.NewPoint("test", + map[string]string{ + "vendor": "AWS", + }, + map[string]interface{}{ + "mem_free": 1234567, + }, time.Unix(60, 60)) + writeApi.WritePoint(point) + writeApi.Close() + require.Len(t, service.Lines(), 1) + assert.Equal(t, "test,dft=a,vendor=AWS mem_free=1234567i 60000000060", service.Lines()[0]) +} + func TestWriteApiImpl_Write(t *testing.T) { service := newTestService(t, "http://localhost:8888") writeApi := NewWriteApiImpl("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5)) diff --git a/internal/write/writeService.go b/internal/write/service.go similarity index 78% rename from internal/write/writeService.go rename to internal/write/service.go index 8f0ff76c..64ec8ffe 100644 --- a/internal/write/writeService.go +++ b/internal/write/service.go @@ -10,6 +10,7 @@ import ( "io" "net/http" "net/url" + "sort" "strings" "sync" "time" @@ -150,6 +151,50 @@ func (w *Service) WriteBatch(ctx context.Context, batch *Batch) error { return nil } +type pointWithDefaultTags struct { + point *write.Point + defaultTags map[string]string +} + +// Name returns the name of measurement of a point. +func (p *pointWithDefaultTags) Name() string { + return p.point.Name() +} + +// Time is the timestamp of a Point. +func (p *pointWithDefaultTags) Time() time.Time { + return p.point.Time() +} + +// FieldList returns a slice containing the fields of a Point. +func (p *pointWithDefaultTags) FieldList() []*lp.Field { + return p.point.FieldList() +} + +func (p *pointWithDefaultTags) TagList() []*lp.Tag { + tags := make([]*lp.Tag, 0, len(p.point.TagList())+len(p.defaultTags)) + tags = append(tags, p.point.TagList()...) + for k, v := range p.defaultTags { + if !existTag(p.point.TagList(), k) { + tags = append(tags, &lp.Tag{ + Key: k, + Value: v, + }) + } + } + sort.Slice(tags, func(i, j int) bool { return tags[i].Key < tags[j].Key }) + return tags +} + +func existTag(tags []*lp.Tag, key string) bool { + for _, tag := range tags { + if key == tag.Key { + return true + } + } + return false +} + func (w *Service) EncodePoints(points ...*write.Point) (string, error) { var buffer bytes.Buffer e := lp.NewEncoder(&buffer) @@ -157,7 +202,7 @@ func (w *Service) EncodePoints(points ...*write.Point) (string, error) { e.FailOnFieldErr(true) e.SetPrecision(w.writeOptions.Precision()) for _, point := range points { - _, err := e.Encode(point) + _, err := e.Encode(w.pointToEncode(point)) if err != nil { return "", err } @@ -165,6 +210,19 @@ func (w *Service) EncodePoints(points ...*write.Point) (string, error) { return buffer.String(), nil } +func (w *Service) pointToEncode(point *write.Point) lp.Metric { + var m lp.Metric + if len(w.writeOptions.DefaultTags()) > 0 { + m = &pointWithDefaultTags{ + point: point, + defaultTags: w.writeOptions.DefaultTags(), + } + } else { + m = point + } + return m +} + func (w *Service) WriteUrl() (string, error) { if w.url == "" { u, err := url.Parse(w.httpService.ServerApiUrl()) diff --git a/internal/write/service_test.go b/internal/write/service_test.go new file mode 100644 index 00000000..5783df6a --- /dev/null +++ b/internal/write/service_test.go @@ -0,0 +1,52 @@ +// Copyright 2020 InfluxData, Inc. All rights reserved. +// Use of this source code is governed by MIT +// license that can be found in the LICENSE file. + +package write + +import ( + "github.com/influxdata/influxdb-client-go/api/write" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" +) + +func TestAddDefaultTags(t *testing.T) { + opts := write.DefaultOptions() + assert.Len(t, opts.DefaultTags(), 0) + + opts.AddDefaultTag("dt1", "val1") + opts.AddDefaultTag("zdt", "val2") + srv := NewService("org", "buc", nil, opts) + + p := write.NewPointWithMeasurement("test") + p.AddTag("id", "101") + + p.AddField("float32", float32(80.0)) + + s, err := srv.EncodePoints(p) + require.Nil(t, err) + assert.Equal(t, "test,dt1=val1,id=101,zdt=val2 float32=80\n", s) + assert.Len(t, p.TagList(), 1) + + p = write.NewPointWithMeasurement("x") + p.AddTag("xt", "1") + p.AddField("i", 1) + + s, err = srv.EncodePoints(p) + require.Nil(t, err) + assert.Equal(t, "x,dt1=val1,xt=1,zdt=val2 i=1i\n", s) + assert.Len(t, p.TagList(), 1) + + p = write.NewPointWithMeasurement("d") + p.AddTag("id", "1") + // do not overwrite point tag + p.AddTag("zdt", "val10") + p.AddField("i", -1) + + s, err = srv.EncodePoints(p) + require.Nil(t, err) + assert.Equal(t, "d,dt1=val1,id=1,zdt=val10 i=-1i\n", s) + + assert.Len(t, p.TagList(), 2) +} diff --git a/options.go b/options.go index 242c77d5..0ca2ad69 100644 --- a/options.go +++ b/options.go @@ -148,6 +148,14 @@ func (o *Options) HttpOptions() *http.Options { return o.httpOptions } +// AddDefaultTag adds a default tag. DefaultTags are added to each written point. +// If a tag with the same key already exist it is overwritten. +// If a point already defines such a tag, it is left unchanged +func (o *Options) AddDefaultTag(key, value string) *Options { + o.WriteOptions().AddDefaultTag(key, value) + return o +} + // DefaultOptions returns Options object with default values func DefaultOptions() *Options { return &Options{logLevel: 0, writeOptions: write.DefaultOptions(), httpOptions: http.DefaultOptions()} diff --git a/options_test.go b/options_test.go index 4ff4e4a3..b743b65d 100644 --- a/options_test.go +++ b/options_test.go @@ -1,7 +1,9 @@ -package influxdb2 +package influxdb2_test import ( "context" + "crypto/tls" + influxdb2 "github.com/influxdata/influxdb-client-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "net/http" @@ -11,6 +13,49 @@ import ( "time" ) +func TestDefaultOptions(t *testing.T) { + opts := influxdb2.DefaultOptions() + assert.Equal(t, uint(5000), opts.BatchSize()) + assert.Equal(t, false, opts.UseGZip()) + assert.Equal(t, uint(1000), opts.FlushInterval()) + assert.Equal(t, time.Nanosecond, opts.Precision()) + assert.Equal(t, uint(10000), opts.RetryBufferLimit()) + assert.Equal(t, uint(1000), opts.RetryInterval()) + assert.Equal(t, uint(3), opts.MaxRetries()) + assert.Equal(t, (*tls.Config)(nil), opts.TlsConfig()) + assert.Equal(t, uint(20), opts.HttpRequestTimeout()) + assert.Equal(t, uint(0), opts.LogLevel()) +} + +func TestSettingsOptions(t *testing.T) { + tlsConfig := &tls.Config{ + InsecureSkipVerify: true, + } + opts := influxdb2.DefaultOptions(). + SetBatchSize(5). + SetUseGZip(true). + SetFlushInterval(5000). + SetPrecision(time.Millisecond). + SetRetryBufferLimit(5). + SetRetryInterval(5000). + SetMaxRetries(7). + SetTlsConfig(tlsConfig). + SetHttpRequestTimeout(50). + SetLogLevel(3). + AddDefaultTag("t", "a") + assert.Equal(t, uint(5), opts.BatchSize()) + assert.Equal(t, true, opts.UseGZip()) + assert.Equal(t, uint(5000), opts.FlushInterval()) + assert.Equal(t, time.Millisecond, opts.Precision()) + assert.Equal(t, uint(5), opts.RetryBufferLimit()) + assert.Equal(t, uint(5000), opts.RetryInterval()) + assert.Equal(t, uint(7), opts.MaxRetries()) + assert.Equal(t, tlsConfig, opts.TlsConfig()) + assert.Equal(t, uint(50), opts.HttpRequestTimeout()) + assert.Equal(t, uint(3), opts.LogLevel()) + assert.Len(t, opts.WriteOptions().DefaultTags(), 1) +} + func TestTimeout(t *testing.T) { response := `,result,table,_start,_stop,_time,_value,_field,_measurement,a,b, ,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf @@ -32,14 +77,14 @@ func TestTimeout(t *testing.T) { } })) defer server.Close() - client := NewClientWithOptions(server.URL, "a", DefaultOptions().SetHttpRequestTimeout(1)) + client := influxdb2.NewClientWithOptions(server.URL, "a", influxdb2.DefaultOptions().SetHttpRequestTimeout(1)) queryApi := client.QueryApi("org") _, err := queryApi.QueryRaw(context.Background(), "flux", nil) require.NotNil(t, err) assert.True(t, strings.Contains(err.Error(), "Client.Timeout exceeded")) - client = NewClientWithOptions(server.URL, "a", DefaultOptions().SetHttpRequestTimeout(5)) + client = influxdb2.NewClientWithOptions(server.URL, "a", influxdb2.DefaultOptions().SetHttpRequestTimeout(5)) queryApi = client.QueryApi("org") result, err := queryApi.QueryRaw(context.Background(), "flux", nil)