Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add implicit batching to writeAPIBlocking #350

Merged
merged 2 commits into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
## [unreleased]
### Features
- [#348](https://github.com/influxdata/influxdb-client-go/pull/348) Added `write.Options.Consitency` parameter to support InfluxDB Enterprise.
- [#350](https://github.com/influxdata/influxdb-client-go/pull/350) Added support for implicit batching to `WriteAPIBlocking`. It's off by default, enabled by `EnableBatching()`.

### Bug fixes
- [#349](https://github.com/influxdata/influxdb-client-go/pull/349) Skip retrying on specific write errors (mostly partial write error).

### Breaking change
- [#350](https://github.com/influxdata/influxdb-client-go/pull/350) Interface `WriteAPIBlocking` is extend with `EnableBatching()` and `Flush()`.

## 2.9.2 [2022-07-29]
### Bug fixes
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ func main() {
```

### Blocking write client
Blocking write client writes given point(s) synchronously. It doesn't have implicit batching. Batch is created from given set of points.
Blocking write client writes given point(s) synchronously. It doesn't do implicit batching. Batch is created from given set of points.
Implicit batching can be enabled with `WriteAPIBlocking.EnableBatching()`.

```go
package main
Expand Down
98 changes: 63 additions & 35 deletions api/writeAPIBlocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,73 +7,90 @@ package api
import (
"context"
"strings"
"sync"

http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
"github.com/influxdata/influxdb-client-go/v2/api/write"
iwrite "github.com/influxdata/influxdb-client-go/v2/internal/write"
)

// WriteAPIBlocking offers blocking methods for writing time series data synchronously into an InfluxDB server.
// It doesn't implicitly create batches of points. It is intended to use for writing less frequent data, such as a weather sensing, or if there is a need to have explicit control of failed batches.
// It doesn't implicitly create batches of points by default. Batches are created from array of points/records.
//
// WriteAPIBlocking can be used concurrently.
// When using multiple goroutines for writing, use a single WriteAPIBlocking instance in all goroutines.
//
// To add implicit batching, use a wrapper, such as:
// type writer struct {
// batch []*write.Point
// writeAPI api.WriteAPIBlocking
// batchSize int
// }
// Implicit batching is enabled with EnableBatching(). In this mode, each call to WritePoint or WriteRecord adds a line
// to internal buffer. If length ot the buffer is equal to the batch-size (set in write.Options), the buffer is sent to the server
// and the result of the operation is returned.
// When a point is written to the buffer, nil error is always returned.
// Flush() can be used to trigger sending of batch when it doesn't have the batch-size.
//
// func (w *writer) CurrentBatch() []*write.Point {
// return w.batch
// }
//
// func newWriter(writeAPI api.WriteAPIBlocking, batchSize int) *writer {
// return &writer{
// batch: make([]*write.Point, 0, batchSize),
// writeAPI: writeAPI,
// batchSize: batchSize,
// }
// }
// Synchronous writing is intended to use for writing less frequent data, such as a weather sensing, or if there is a need to have explicit control of failed batches.

//
// func (w *writer) write(ctx context.Context, p *write.Point) error {
// w.batch = append(w.batch, p)
// if len(w.batch) == w.batchSize {
// err := w.writeAPI.WritePoint(ctx, w.batch...)
// if err != nil {
// return err
// }
// w.batch = w.batch[:0]
// }
// return nil
// }
// WriteAPIBlocking can be used concurrently.
// When using multiple goroutines for writing, use a single WriteAPIBlocking instance in all goroutines.
type WriteAPIBlocking interface {
// WriteRecord writes line protocol record(s) into bucket.
// WriteRecord writes without implicit batching. Batch is created from given number of records.
// WriteRecord writes lines without implicit batching by default, batch is created from given number of records.
// Automatic batching can be enabled by EnableBatching()
// Individual arguments can also be batches (multiple records separated by newline).
// Non-blocking alternative is available in the WriteAPI interface
WriteRecord(ctx context.Context, line ...string) error
// WritePoint data point into bucket.
// WritePoint writes without implicit batching. Batch is created from given number of points
// WriteRecord writes points without implicit batching by default, batch is created from given number of points.
// Automatic batching can be enabled by EnableBatching().
// Non-blocking alternative is available in the WriteAPI interface
WritePoint(ctx context.Context, point ...*write.Point) error
// EnableBatching turns on implicit batching
// Batch size is controlled via write.Options
EnableBatching()
// Flush forces write of buffer if batching is enabled, even buffer doesn't have the batch-size.
Flush(ctx context.Context) error
}

// writeAPIBlocking implements WriteAPIBlocking interface
type writeAPIBlocking struct {
service *iwrite.Service
writeOptions *write.Options
batching bool
batch []string
mu sync.Mutex
}

// NewWriteAPIBlocking creates new instance of blocking write client for writing data to bucket belonging to org
func NewWriteAPIBlocking(org string, bucket string, service http2.Service, writeOptions *write.Options) WriteAPIBlocking {
return &writeAPIBlocking{service: iwrite.NewService(org, bucket, service, writeOptions), writeOptions: writeOptions}
}

// NewWriteAPIBlockingWithBatching creates new instance of blocking write client for writing data to bucket belonging to org with batching enabled
func NewWriteAPIBlockingWithBatching(org string, bucket string, service http2.Service, writeOptions *write.Options) WriteAPIBlocking {
api := &writeAPIBlocking{service: iwrite.NewService(org, bucket, service, writeOptions), writeOptions: writeOptions}
api.EnableBatching()
return api
}

func (w *writeAPIBlocking) EnableBatching() {
w.mu.Lock()
defer w.mu.Unlock()
if !w.batching {
w.batching = true
w.batch = make([]string, 0, w.writeOptions.BatchSize())
}
}

func (w *writeAPIBlocking) write(ctx context.Context, line string) error {
err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.MaxRetryTime()))
w.mu.Lock()
defer w.mu.Unlock()
body := line
if w.batching {
w.batch = append(w.batch, line)
if len(w.batch) == int(w.writeOptions.BatchSize()) {
body = strings.Join(w.batch, "\n")
w.batch = w.batch[:0]
} else {
return nil
}
}
err := w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime()))
if err != nil {
return err
}
Expand All @@ -94,3 +111,14 @@ func (w *writeAPIBlocking) WritePoint(ctx context.Context, point ...*write.Point
}
return w.write(ctx, line)
}

func (w *writeAPIBlocking) Flush(ctx context.Context) error {
w.mu.Lock()
defer w.mu.Unlock()
if w.batching && len(w.batch) > 0 {
body := strings.Join(w.batch, "\n")
w.batch = w.batch[:0]
return w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime()))
}
return nil
}
40 changes: 39 additions & 1 deletion api/writeAPIBlocking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,20 @@ func TestWriteRecord(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
lines := test.GenRecords(10)
for _, line := range lines {
err := writeAPI.WriteRecord(context.Background(), line)
require.Nil(t, err)
}
require.Len(t, service.Lines(), 10)
require.Equal(t, 10, service.Requests())
for i, l := range lines {
assert.Equal(t, l, service.Lines()[i])
}
service.Close()

err := writeAPI.WriteRecord(context.Background(), lines...)
require.Nil(t, err)
require.Len(t, service.Lines(), 10)
require.Equal(t, 1, service.Requests())
for i, l := range lines {
assert.Equal(t, l, service.Lines()[i])
}
Expand Down Expand Up @@ -120,3 +131,30 @@ func TestWriteErrors(t *testing.T) {
require.Equal(t, 10, errors)

}

func TestWriteBatchIng(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPIBlockingWithBatching("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
lines := test.GenRecords(10)
for i, line := range lines {
err := writeAPI.WriteRecord(context.Background(), line)
require.Nil(t, err)
if i == 4 || i == 9 {
assert.Equal(t, 1, service.Requests())
require.Len(t, service.Lines(), 5)

service.Close()
}
}

for i := 0; i < 4; i++ {
err := writeAPI.WriteRecord(context.Background(), lines[i])
require.Nil(t, err)
}
assert.Equal(t, 0, service.Requests())
require.Len(t, service.Lines(), 0)
err := writeAPI.Flush(context.Background())
require.Nil(t, err)
assert.Equal(t, 1, service.Requests())
require.Len(t, service.Lines(), 4)
}
8 changes: 8 additions & 0 deletions internal/test/http_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type HTTPService struct {
requestHandler func(url string, body io.Reader) error
replyError *http2.Error
lock sync.Mutex
requests int
}

// WasGzip returns true of request was in GZip format
Expand Down Expand Up @@ -67,6 +68,11 @@ func (t *HTTPService) HTTPClient() *http.Client {
return nil
}

// Requests returns number of requests
func (t *HTTPService) Requests() int {
return t.requests
}

// Close clears instance
func (t *HTTPService) Close() {
t.lock.Lock()
Expand All @@ -76,6 +82,7 @@ func (t *HTTPService) Close() {
t.wasGzip = false
t.replyError = nil
t.requestHandler = nil
t.requests = 0
t.lock.Unlock()
}

Expand Down Expand Up @@ -116,6 +123,7 @@ func (t *HTTPService) DoHTTPRequestWithResponse(_ *http.Request, _ http2.Request
// DoPostRequest reads http request, validates URL and stores data in the request
func (t *HTTPService) DoPostRequest(_ context.Context, url string, body io.Reader, requestCallback http2.RequestCallback, _ http2.ResponseCallback) *http2.Error {
req, err := http.NewRequest("POST", url, nil)
t.requests++
if err != nil {
return http2.NewError(err)
}
Expand Down