Skip to content

Commit

Permalink
Merge pull request #348 from bonitoo-io/feat/influx_enterpise
Browse files Browse the repository at this point in the history
feat: added consistency params support
  • Loading branch information
vlastahajek authored Aug 25, 2022
2 parents d369c06 + eb1a2ab commit 38ada92
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
## [unreleased]
### Features
- [#348](https://github.com/influxdata/influxdb-client-go/pull/348) Added `write.Options.Consitency` parameter to support InfluxDB Enterprise.

## 2.9.2 [2022-07-29]
### Bug fixes
Expand Down
30 changes: 30 additions & 0 deletions api/write/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,27 @@ type Options struct {
maxRetryTime uint
// The base for the exponential retry delay
exponentialBase uint
// InfluxDB Enterprise write consistency as explained in https://docs.influxdata.com/enterprise_influxdb/v1.9/concepts/clustering/#write-consistency
consistency Consistency
}

const (
// ConsistencyOne requires at least one data node acknowledged a write.
ConsistencyOne Consistency = "one"

// ConsistencyAll requires all data nodes to acknowledge a write.
ConsistencyAll Consistency = "all"

// ConsistencyQuorum requires a quorum of data nodes to acknowledge a write.
ConsistencyQuorum Consistency = "quorum"

// ConsistencyAny allows for hinted hand off, potentially no write happened yet.
ConsistencyAny Consistency = "any"
)

// Consistency defines enum for allows consistency values for InfluxDB Enterprise, as explained https://docs.influxdata.com/enterprise_influxdb/v1.9/concepts/clustering/#write-consistency
type Consistency string

// BatchSize returns size of batch
func (o *Options) BatchSize() uint {
return o.batchSize
Expand Down Expand Up @@ -162,6 +181,17 @@ func (o *Options) DefaultTags() map[string]string {
return o.defaultTags
}

// Consistency returns consistency for param value
func (o *Options) Consistency() Consistency {
return o.consistency
}

// SetConsistency allows setting InfluxDB Enterprise write consistency, as explained in https://docs.influxdata.com/enterprise_influxdb/v1.9/concepts/clustering/#write-consistency */
func (o *Options) SetConsistency(consistency Consistency) *Options {
o.consistency = consistency
return o
}

// DefaultOptions returns Options object with default values
func DefaultOptions() *Options {
return &Options{batchSize: 5_000, flushInterval: 1_000, precision: time.Nanosecond, useGZip: false, retryBufferLimit: 50_000, defaultTags: make(map[string]string),
Expand Down
5 changes: 4 additions & 1 deletion api/write/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestDefaultOptions(t *testing.T) {
assert.EqualValues(t, 125_000, opts.MaxRetryInterval())
assert.EqualValues(t, 180_000, opts.MaxRetryTime())
assert.EqualValues(t, 2, opts.ExponentialBase())
assert.EqualValues(t, "", opts.Consistency())
assert.Len(t, opts.DefaultTags(), 0)
}

Expand All @@ -40,7 +41,8 @@ func TestSettingsOptions(t *testing.T) {
SetExponentialBase(3).
SetMaxRetryTime(200_000).
AddDefaultTag("a", "1").
AddDefaultTag("b", "2")
AddDefaultTag("b", "2").
SetConsistency(write.ConsistencyOne)
assert.EqualValues(t, 5, opts.BatchSize())
assert.EqualValues(t, true, opts.UseGZip())
assert.EqualValues(t, 5000, opts.FlushInterval())
Expand All @@ -51,5 +53,6 @@ func TestSettingsOptions(t *testing.T) {
assert.EqualValues(t, 150_000, opts.MaxRetryInterval())
assert.EqualValues(t, 200_000, opts.MaxRetryTime())
assert.EqualValues(t, 3, opts.ExponentialBase())
assert.EqualValues(t, "one", opts.Consistency())
assert.Len(t, opts.DefaultTags(), 2)
}
3 changes: 3 additions & 0 deletions internal/write/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func NewService(org string, bucket string, httpService http2.Service, options *w
params.Set("org", org)
params.Set("bucket", bucket)
params.Set("precision", precisionToString(options.Precision()))
if options.Consistency() != "" {
params.Set("consistency", string(options.Consistency()))
}
u.RawQuery = params.Encode()
writeURL := u.String()
return &Service{
Expand Down
12 changes: 12 additions & 0 deletions internal/write/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,3 +647,15 @@ func TestFlush(t *testing.T) {
assert.Len(t, hs.Lines(), 5)
assert.Equal(t, 0, srv.retryQueue.list.Len())
}

func TestConsistencyParam(t *testing.T) {
hs := test.NewTestService(t, "http://localhost:8888")
opts := write.DefaultOptions().SetConsistency(write.ConsistencyQuorum)
srv := NewService("org", "buc", hs, opts)

require.Equal(t, "http://localhost:8888/api/v2/write?bucket=buc&consistency=quorum&org=org&precision=ns", srv.WriteURL())
opts = write.DefaultOptions()
srv = NewService("org", "buc", hs, opts)

require.Equal(t, "http://localhost:8888/api/v2/write?bucket=buc&org=org&precision=ns", srv.WriteURL())
}

0 comments on commit 38ada92

Please sign in to comment.