Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: WriteAPIs management #175

Merged
merged 4 commits into from
Aug 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
### Breaking changes
1. [#173](https://github.com/influxdata/influxdb-client-go/pull/173) Removed deprecated API.
1. [#174](https://github.com/influxdata/influxdb-client-go/pull/174) Removed orgs labels API cause [it has been removed from the server API](https://github.com/influxdata/influxdb/pull/19104)
1. [#175](https://github.com/influxdata/influxdb-client-go/pull/175) Removed WriteAPI.Close()

### Features
1. [#165](https://github.com/influxdata/influxdb-client-go/pull/165) Allow overriding the http.Client for the http service.

### Bug fixes
1. [#175](https://github.com/influxdata/influxdb-client-go/pull/175) Fixed WriteAPIs management. Keeping single instance for each org and bucket pair.

## 1.4.0 [2020-07-17]
### Breaking changes
1. [#156](https://github.com/influxdata/influxdb-client-go/pull/156) Fixing Go naming and code style violations:
Expand Down
34 changes: 13 additions & 21 deletions api/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@ type WriteAPI interface {
WritePoint(point *write.Point)
// Flush forces all pending writes from the buffer to be sent
Flush()
// Flushes all pending writes and stop async processes. After this the Write client cannot be used
Close()
// Errors returns a channel for reading errors which occurs during async writes.
// Must be called before performing any writes for errors to be collected.
// The chan is unbuffered and must be drained or the writer will block.
Errors() <-chan error
}

// writeAPI provides main implementation for WriteAPI
type writeAPI struct {
// WriteAPIImpl provides main implementation for WriteAPI
type WriteAPIImpl struct {
service *iwrite.Service
writeBuffer []string

Expand All @@ -56,8 +54,8 @@ type writeBuffInfoReq struct {
writeBuffLen int
}

func NewWriteAPI(org string, bucket string, service http.Service, writeOptions *write.Options) *writeAPI {
w := &writeAPI{
func NewWriteAPI(org string, bucket string, service http.Service, writeOptions *write.Options) *WriteAPIImpl {
w := &WriteAPIImpl{
service: iwrite.NewService(org, bucket, service, writeOptions),
writeBuffer: make([]string, 0, writeOptions.BatchSize()+1),
writeCh: make(chan *iwrite.Batch),
Expand All @@ -77,19 +75,19 @@ func NewWriteAPI(org string, bucket string, service http.Service, writeOptions *
return w
}

func (w *writeAPI) Errors() <-chan error {
func (w *WriteAPIImpl) Errors() <-chan error {
if w.errCh == nil {
w.errCh = make(chan error)
}
return w.errCh
}

func (w *writeAPI) Flush() {
func (w *WriteAPIImpl) Flush() {
w.bufferFlush <- struct{}{}
w.waitForFlushing()
}

func (w *writeAPI) waitForFlushing() {
func (w *WriteAPIImpl) waitForFlushing() {
for {
w.bufferInfoCh <- writeBuffInfoReq{}
writeBuffInfo := <-w.bufferInfoCh
Expand All @@ -108,10 +106,9 @@ func (w *writeAPI) waitForFlushing() {
log.Log.Info("Waiting buffer is flushed")
time.Sleep(time.Millisecond)
}
//time.Sleep(time.Millisecond)
}

func (w *writeAPI) bufferProc() {
func (w *WriteAPIImpl) bufferProc() {
log.Log.Info("Buffer proc started")
ticker := time.NewTicker(time.Duration(w.writeOptions.FlushInterval()) * time.Millisecond)
x:
Expand Down Expand Up @@ -139,20 +136,16 @@ x:
w.doneCh <- struct{}{}
}

func (w *writeAPI) flushBuffer() {
func (w *WriteAPIImpl) flushBuffer() {
if len(w.writeBuffer) > 0 {
//go func(lines []string) {
log.Log.Info("sending batch")
batch := iwrite.NewBatch(buffer(w.writeBuffer), w.writeOptions.RetryInterval())
w.writeCh <- batch
// lines = lines[:0]
//}(w.writeBuffer)
//w.writeBuffer = make([]string,0, w.service.clientImpl.Options.BatchSize+1)
w.writeBuffer = w.writeBuffer[:0]
}
}

func (w *writeAPI) writeProc() {
func (w *WriteAPIImpl) writeProc() {
log.Log.Info("Write proc started")
x:
for {
Expand All @@ -174,7 +167,7 @@ x:
w.doneCh <- struct{}{}
}

func (w *writeAPI) Close() {
func (w *WriteAPIImpl) Close() {
if w.writeCh != nil {
// Flush outstanding metrics
w.Flush()
Expand Down Expand Up @@ -203,14 +196,13 @@ func (w *writeAPI) Close() {
}
}

func (w *writeAPI) WriteRecord(line string) {
func (w *WriteAPIImpl) WriteRecord(line string) {
b := []byte(line)
b = append(b, 0xa)
w.bufferCh <- string(b)
}

func (w *writeAPI) WritePoint(point *write.Point) {
//w.bufferCh <- point.ToLineProtocol(w.service.clientImpl.Options().Precision)
func (w *WriteAPIImpl) WritePoint(point *write.Point) {
line, err := w.service.EncodePoints(point)
if err != nil {
log.Log.Errorf("point encoding error: %s\n", err.Error())
Expand Down
70 changes: 46 additions & 24 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,19 @@ type Client interface {

// clientImpl implements Client interface
type clientImpl struct {
serverURL string
options *Options
writeAPIs []api.WriteAPI
lock sync.Mutex
httpService ihttp.Service
apiClient *domain.ClientWithResponses
authAPI api.AuthorizationsAPI
orgAPI api.OrganizationsAPI
usersAPI api.UsersAPI
deleteAPI api.DeleteAPI
bucketsAPI api.BucketsAPI
labelsAPI api.LabelsAPI
serverURL string
options *Options
writeAPIs map[string]api.WriteAPI
syncWriteAPIs map[string]api.WriteAPIBlocking
lock sync.Mutex
httpService ihttp.Service
apiClient *domain.ClientWithResponses
authAPI api.AuthorizationsAPI
orgAPI api.OrganizationsAPI
usersAPI api.UsersAPI
deleteAPI api.DeleteAPI
bucketsAPI api.BucketsAPI
labelsAPI api.LabelsAPI
}

// NewClient creates Client for connecting to given serverURL with provided authentication token, with the default options.
Expand All @@ -93,11 +94,12 @@ func NewClientWithOptions(serverURL string, authToken string, options *Options)
}
service := ihttp.NewService(normServerURL, "Token "+authToken, options.httpOptions)
client := &clientImpl{
serverURL: serverURL,
options: options,
writeAPIs: make([]api.WriteAPI, 0, 5),
httpService: service,
apiClient: domain.NewClientWithResponses(service),
serverURL: serverURL,
options: options,
writeAPIs: make(map[string]api.WriteAPI, 5),
syncWriteAPIs: make(map[string]api.WriteAPIBlocking, 5),
httpService: service,
apiClient: domain.NewClientWithResponses(service),
}
log.Log.SetDebugLevel(client.Options().LogLevel())
log.Log.Infof("Using URL '%s', token '%s'", serverURL, authToken)
Expand Down Expand Up @@ -165,20 +167,40 @@ func (c *clientImpl) Health(ctx context.Context) (*domain.HealthCheck, error) {
return response.JSON200, nil
}

func createKey(org, bucket string) string {
return org + "\t" + bucket
}

func (c *clientImpl) WriteAPI(org, bucket string) api.WriteAPI {
w := api.NewWriteAPI(org, bucket, c.httpService, c.options.writeOptions)
c.writeAPIs = append(c.writeAPIs, w)
return w
c.lock.Lock()
defer c.lock.Unlock()
key := createKey(org, bucket)
if _, ok := c.writeAPIs[key]; !ok {
w := api.NewWriteAPI(org, bucket, c.httpService, c.options.writeOptions)
c.writeAPIs[key] = w
}
return c.writeAPIs[key]
}

func (c *clientImpl) WriteAPIBlocking(org, bucket string) api.WriteAPIBlocking {
w := api.NewWriteAPIBlocking(org, bucket, c.httpService, c.options.writeOptions)
return w
c.lock.Lock()
defer c.lock.Unlock()
key := createKey(org, bucket)
if _, ok := c.syncWriteAPIs[key]; !ok {
w := api.NewWriteAPIBlocking(org, bucket, c.httpService, c.options.writeOptions)
c.syncWriteAPIs[key] = w
}
return c.syncWriteAPIs[key]
}

func (c *clientImpl) Close() {
for _, w := range c.writeAPIs {
w.Close()
for key, w := range c.writeAPIs {
wa := w.(*api.WriteAPIImpl)
wa.Close()
delete(c.writeAPIs, key)
}
for key := range c.syncWriteAPIs {
delete(c.syncWriteAPIs, key)
}
}

Expand Down
31 changes: 31 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,37 @@ func TestUrls(t *testing.T) {
}
}

func TestWriteAPIManagement(t *testing.T) {
data := []struct {
org string
bucket string
expectedCout int
}{
{"o1", "b1", 1},
{"o1", "b2", 2},
{"o1", "b1", 2},
{"o2", "b1", 3},
{"o2", "b2", 4},
{"o1", "b2", 4},
{"o1", "b3", 5},
{"o2", "b2", 5},
}
c := NewClient("http://localhost", "x").(*clientImpl)
for i, d := range data {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
w := c.WriteAPI(d.org, d.bucket)
assert.NotNil(t, w)
assert.Len(t, c.writeAPIs, d.expectedCout)
wb := c.WriteAPIBlocking(d.org, d.bucket)
assert.NotNil(t, wb)
assert.Len(t, c.syncWriteAPIs, d.expectedCout)
})
}
c.Close()
assert.Len(t, c.writeAPIs, 0)
assert.Len(t, c.syncWriteAPIs, 0)
}

func TestUserAgent(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(100 * time.Millisecond)
Expand Down