Skip to content

Commit

Permalink
feat: Adding possibility to set default tags
Browse files Browse the repository at this point in the history
  • Loading branch information
vlastahajek committed Jun 19, 2020
1 parent 6214dcb commit 0f9ebbe
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## 1.3.0 [in progress]
### Features
1. [#131](https://github.com/influxdata/influxdb-client-go/pull/131) Labels API
1. [#136](https://github.com/influxdata/influxdb-client-go/pull/136) Possibility to specify default tags

### Bug fixes
1. [#132](https://github.com/influxdata/influxdb-client-go/pull/132) Handle unsupported write type as string instead of generating panic
Expand Down
20 changes: 19 additions & 1 deletion api/write/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Options struct {
precision time.Duration
// Whether to use GZip compression in requests. Default false
useGZip bool
// Tags added to each point during writing. If a point already has a tag with the same key, it is left unchanged.
defaultTags map[string]string
}

// BatchSize returns size of batch
Expand Down Expand Up @@ -104,7 +106,23 @@ func (o *Options) SetUseGZip(useGZip bool) *Options {
return o
}

// AddDefaultTag adds a default tag. DefaultTags are added to each written point.
// If a tag with the same key already exist it is overwritten.
// If a point already defines such a tag, it is left unchanged.
func (o *Options) AddDefaultTag(key, value string) *Options {
o.DefaultTags()[key] = value
return o
}

// DefaultTags returns set of default tags
func (o *Options) DefaultTags() map[string]string {
if o.defaultTags == nil {
o.defaultTags = make(map[string]string)
}
return o.defaultTags
}

// DefaultOptions returns Options object with default values
func DefaultOptions() *Options {
return &Options{batchSize: 5000, maxRetries: 3, retryInterval: 1000, flushInterval: 1000, precision: time.Nanosecond, useGZip: false, retryBufferLimit: 10000}
return &Options{batchSize: 5000, maxRetries: 3, retryInterval: 1000, flushInterval: 1000, precision: time.Nanosecond, useGZip: false, retryBufferLimit: 10000, defaultTags: make(map[string]string)}
}
6 changes: 5 additions & 1 deletion api/write/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestDefaultOptions(t *testing.T) {
assert.Equal(t, uint(10000), opts.RetryBufferLimit())
assert.Equal(t, uint(1000), opts.RetryInterval())
assert.Equal(t, uint(3), opts.MaxRetries())
assert.Len(t, opts.DefaultTags(), 0)
}

func TestSettingsOptions(t *testing.T) {
Expand All @@ -31,12 +32,15 @@ func TestSettingsOptions(t *testing.T) {
SetPrecision(time.Millisecond).
SetRetryBufferLimit(5).
SetRetryInterval(5000).
SetMaxRetries(7)
SetMaxRetries(7).
AddDefaultTag("a", "1").
AddDefaultTag("b", "2")
assert.Equal(t, uint(5), opts.BatchSize())
assert.Equal(t, true, opts.UseGZip())
assert.Equal(t, uint(5000), opts.FlushInterval())
assert.Equal(t, time.Millisecond, opts.Precision())
assert.Equal(t, uint(5), opts.RetryBufferLimit())
assert.Equal(t, uint(5000), opts.RetryInterval())
assert.Equal(t, uint(7), opts.MaxRetries())
assert.Len(t, opts.DefaultTags(), 2)
}
19 changes: 19 additions & 0 deletions api/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,25 @@ func genRecords(num int) []string {
return lines
}

func TestWriteApiWriteDefaultTag(t *testing.T) {
service := newTestService(t, "http://localhost:8888")
opts := write.DefaultOptions().
SetBatchSize(1)
opts.AddDefaultTag("dft", "a")
writeApi := NewWriteApiImpl("my-org", "my-bucket", service, opts)
point := write.NewPoint("test",
map[string]string{
"vendor": "AWS",
},
map[string]interface{}{
"mem_free": 1234567,
}, time.Unix(60, 60))
writeApi.WritePoint(point)
writeApi.Close()
require.Len(t, service.Lines(), 1)
assert.Equal(t, "test,dft=a,vendor=AWS mem_free=1234567i 60000000060", service.Lines()[0])
}

func TestWriteApiImpl_Write(t *testing.T) {
service := newTestService(t, "http://localhost:8888")
writeApi := NewWriteApiImpl("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
Expand Down
60 changes: 59 additions & 1 deletion internal/write/writeService.go → internal/write/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"net/http"
"net/url"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -150,21 +151,78 @@ func (w *Service) WriteBatch(ctx context.Context, batch *Batch) error {
return nil
}

type pointWithDefaultTags struct {
point *write.Point
defaultTags map[string]string
}

// Name returns the name of measurement of a point.
func (p *pointWithDefaultTags) Name() string {
return p.point.Name()
}

// Time is the timestamp of a Point.
func (p *pointWithDefaultTags) Time() time.Time {
return p.point.Time()
}

// FieldList returns a slice containing the fields of a Point.
func (p *pointWithDefaultTags) FieldList() []*lp.Field {
return p.point.FieldList()
}

func (p *pointWithDefaultTags) TagList() []*lp.Tag {
tags := make([]*lp.Tag, 0, len(p.point.TagList())+len(p.defaultTags))
tags = append(tags, p.point.TagList()...)
for k, v := range p.defaultTags {
if !existTag(p.point.TagList(), k) {
tags = append(tags, &lp.Tag{
Key: k,
Value: v,
})
}
}
sort.Slice(tags, func(i, j int) bool { return tags[i].Key < tags[j].Key })
return tags
}

func existTag(tags []*lp.Tag, key string) bool {
for _, tag := range tags {
if key == tag.Key {
return true
}
}
return false
}

func (w *Service) EncodePoints(points ...*write.Point) (string, error) {
var buffer bytes.Buffer
e := lp.NewEncoder(&buffer)
e.SetFieldTypeSupport(lp.UintSupport)
e.FailOnFieldErr(true)
e.SetPrecision(w.writeOptions.Precision())
for _, point := range points {
_, err := e.Encode(point)
_, err := e.Encode(w.pointToEncode(point))
if err != nil {
return "", err
}
}
return buffer.String(), nil
}

func (w *Service) pointToEncode(point *write.Point) lp.Metric {
var m lp.Metric
if len(w.writeOptions.DefaultTags()) > 0 {
m = &pointWithDefaultTags{
point: point,
defaultTags: w.writeOptions.DefaultTags(),
}
} else {
m = point
}
return m
}

func (w *Service) WriteUrl() (string, error) {
if w.url == "" {
u, err := url.Parse(w.httpService.ServerApiUrl())
Expand Down
52 changes: 52 additions & 0 deletions internal/write/service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2020 InfluxData, Inc. All rights reserved.
// Use of this source code is governed by MIT
// license that can be found in the LICENSE file.

package write

import (
"github.com/influxdata/influxdb-client-go/api/write"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
)

func TestAddDefaultTags(t *testing.T) {
opts := write.DefaultOptions()
assert.Len(t, opts.DefaultTags(), 0)

opts.AddDefaultTag("dt1", "val1")
opts.AddDefaultTag("zdt", "val2")
srv := NewService("org", "buc", nil, opts)

p := write.NewPointWithMeasurement("test")
p.AddTag("id", "101")

p.AddField("float32", float32(80.0))

s, err := srv.EncodePoints(p)
require.Nil(t, err)
assert.Equal(t, "test,dt1=val1,id=101,zdt=val2 float32=80\n", s)
assert.Len(t, p.TagList(), 1)

p = write.NewPointWithMeasurement("x")
p.AddTag("xt", "1")
p.AddField("i", 1)

s, err = srv.EncodePoints(p)
require.Nil(t, err)
assert.Equal(t, "x,dt1=val1,xt=1,zdt=val2 i=1i\n", s)
assert.Len(t, p.TagList(), 1)

p = write.NewPointWithMeasurement("d")
p.AddTag("id", "1")
// do not overwrite point tag
p.AddTag("zdt", "val10")
p.AddField("i", -1)

s, err = srv.EncodePoints(p)
require.Nil(t, err)
assert.Equal(t, "d,dt1=val1,id=1,zdt=val10 i=-1i\n", s)

assert.Len(t, p.TagList(), 2)
}
8 changes: 8 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ func (o *Options) HttpOptions() *http.Options {
return o.httpOptions
}

// AddDefaultTag adds a default tag. DefaultTags are added to each written point.
// If a tag with the same key already exist it is overwritten.
// If a point already defines such a tag, it is left unchanged
func (o *Options) AddDefaultTag(key, value string) *Options {
o.WriteOptions().AddDefaultTag(key, value)
return o
}

// DefaultOptions returns Options object with default values
func DefaultOptions() *Options {
return &Options{logLevel: 0, writeOptions: write.DefaultOptions(), httpOptions: http.DefaultOptions()}
Expand Down
51 changes: 48 additions & 3 deletions options_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package influxdb2
package influxdb2_test

import (
"context"
"crypto/tls"
influxdb2 "github.com/influxdata/influxdb-client-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"net/http"
Expand All @@ -11,6 +13,49 @@ import (
"time"
)

func TestDefaultOptions(t *testing.T) {
opts := influxdb2.DefaultOptions()
assert.Equal(t, uint(5000), opts.BatchSize())
assert.Equal(t, false, opts.UseGZip())
assert.Equal(t, uint(1000), opts.FlushInterval())
assert.Equal(t, time.Nanosecond, opts.Precision())
assert.Equal(t, uint(10000), opts.RetryBufferLimit())
assert.Equal(t, uint(1000), opts.RetryInterval())
assert.Equal(t, uint(3), opts.MaxRetries())
assert.Equal(t, (*tls.Config)(nil), opts.TlsConfig())
assert.Equal(t, uint(20), opts.HttpRequestTimeout())
assert.Equal(t, uint(0), opts.LogLevel())
}

func TestSettingsOptions(t *testing.T) {
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
opts := influxdb2.DefaultOptions().
SetBatchSize(5).
SetUseGZip(true).
SetFlushInterval(5000).
SetPrecision(time.Millisecond).
SetRetryBufferLimit(5).
SetRetryInterval(5000).
SetMaxRetries(7).
SetTlsConfig(tlsConfig).
SetHttpRequestTimeout(50).
SetLogLevel(3).
AddDefaultTag("t", "a")
assert.Equal(t, uint(5), opts.BatchSize())
assert.Equal(t, true, opts.UseGZip())
assert.Equal(t, uint(5000), opts.FlushInterval())
assert.Equal(t, time.Millisecond, opts.Precision())
assert.Equal(t, uint(5), opts.RetryBufferLimit())
assert.Equal(t, uint(5000), opts.RetryInterval())
assert.Equal(t, uint(7), opts.MaxRetries())
assert.Equal(t, tlsConfig, opts.TlsConfig())
assert.Equal(t, uint(50), opts.HttpRequestTimeout())
assert.Equal(t, uint(3), opts.LogLevel())
assert.Len(t, opts.WriteOptions().DefaultTags(), 1)
}

func TestTimeout(t *testing.T) {
response := `,result,table,_start,_stop,_time,_value,_field,_measurement,a,b,
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf
Expand All @@ -32,14 +77,14 @@ func TestTimeout(t *testing.T) {
}
}))
defer server.Close()
client := NewClientWithOptions(server.URL, "a", DefaultOptions().SetHttpRequestTimeout(1))
client := influxdb2.NewClientWithOptions(server.URL, "a", influxdb2.DefaultOptions().SetHttpRequestTimeout(1))
queryApi := client.QueryApi("org")

_, err := queryApi.QueryRaw(context.Background(), "flux", nil)
require.NotNil(t, err)
assert.True(t, strings.Contains(err.Error(), "Client.Timeout exceeded"))

client = NewClientWithOptions(server.URL, "a", DefaultOptions().SetHttpRequestTimeout(5))
client = influxdb2.NewClientWithOptions(server.URL, "a", influxdb2.DefaultOptions().SetHttpRequestTimeout(5))
queryApi = client.QueryApi("org")

result, err := queryApi.QueryRaw(context.Background(), "flux", nil)
Expand Down

0 comments on commit 0f9ebbe

Please sign in to comment.