From 832a6612763ac1ab1c99915e7158e2e46d6576a1 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Thu, 8 Aug 2019 07:37:07 +0200 Subject: [PATCH] #2: Implemented flush_interval --- README.md | 1 + influxdb2/client/write_api.py | 14 ++++++++------ influxdb2_test/test_WriteApiBatching.py | 25 ++++++++++++++++++++++++- 3 files changed, 33 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index fe924843..34ed9450 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,7 @@ The [WriteApiClient](https://github.com/bonitoo-io/influxdb-client-python/blob/m | --- | --- | --- | | [**write_type**](#write_type) | how the client writes data; allowed values: `batching`, `asynchronous`, `synchronous`| `batching` | | **batch_size** | the number of data point to collect in batch | `1000` | +| **flush_interval** | the number of milliseconds before the batch is written | `1000` | ##### write_type * `batching` - data are writes in batches defined by `batch_size`, `flush_interval`, ... diff --git a/influxdb2/client/write_api.py b/influxdb2/client/write_api.py index 56f6a959..e8b6e4f3 100644 --- a/influxdb2/client/write_api.py +++ b/influxdb2/client/write_api.py @@ -1,7 +1,8 @@ # coding: utf-8 import logging -import time +from datetime import timedelta from enum import Enum +from time import sleep import rx from rx import operators as ops, Observable @@ -24,8 +25,8 @@ class WriteType(Enum): class WriteOptions(object): - def __init__(self, write_type=WriteType.batching, batch_size=1_000, flush_interval=None, jitter_interval=None, - retry_interval=None, buffer_limit=None, write_scheduler=None) -> None: + def __init__(self, write_type=WriteType.batching, batch_size=1_000, flush_interval=1_000, jitter_interval=None, + retry_interval=None, buffer_limit=None, write_scheduler=NewThreadScheduler()) -> None: self.write_type = write_type self.batch_size = batch_size self.flush_interval = flush_interval @@ -118,9 +119,10 @@ def __init__(self, service, write_options=WriteOptions()) -> None: if self._write_options.write_type is WriteType.batching: self._subject = Subject() - observable = self._subject.pipe(ops.observe_on(NewThreadScheduler())) + observable = self._subject.pipe(ops.observe_on(self._write_options.write_scheduler)) self._disposable = observable\ - .pipe(ops.window_with_count(write_options.batch_size), + .pipe(ops.window_with_time_or_count(count=write_options.batch_size, + timespan=timedelta(milliseconds=write_options.flush_interval)), ops.flat_map(lambda v: _window_to_group(v)), ops.map(mapper=lambda x: self._retryable(x)), ops.merge_all()) \ @@ -172,7 +174,7 @@ def __del__(self): self._subject.dispose() self._subject = None # TODO remove sleep - time.sleep(2) + sleep(2) if self._disposable: self._disposable.dispose() self._disposable = None diff --git a/influxdb2_test/test_WriteApiBatching.py b/influxdb2_test/test_WriteApiBatching.py index 7ae9d072..39bc44d0 100644 --- a/influxdb2_test/test_WriteApiBatching.py +++ b/influxdb2_test/test_WriteApiBatching.py @@ -34,7 +34,7 @@ def setUp(self) -> None: header_value="Token my-token") self._write_client = WriteApiClient(service=WriteService(api_client=self._api_client), - write_options=WriteOptions(batch_size=2)) + write_options=WriteOptions(batch_size=2, flush_interval=5_000)) def tearDown(self) -> None: pass @@ -133,6 +133,29 @@ def test_batch_size_group_by(self): pass + def test_flush_interval(self): + httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=204) + + self._write_client.write("my-bucket", "my-org", + ["h2o_feet,location=coyote_creek level\\ water_level=1.0 1", + "h2o_feet,location=coyote_creek level\\ water_level=2.0 2"]) + + time.sleep(1) + self.assertEqual(1, len(httpretty.httpretty.latest_requests)) + + self._write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek level\\ water_level=3.0 3") + + time.sleep(2) + + self.assertEqual(1, len(httpretty.httpretty.latest_requests)) + + time.sleep(3) + + self.assertEqual(2, len(httpretty.httpretty.latest_requests)) + + self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=3.0 3", + httpretty.httpretty.latest_requests[1].parsed_body) + def test_recover_from_error(self): httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=204) httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=400)