Skip to content

Commit

Permalink
#2: Implemented flush_interval
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar committed Aug 8, 2019
1 parent d732462 commit 832a661
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 7 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ The [WriteApiClient](https://github.com/bonitoo-io/influxdb-client-python/blob/m
| --- | --- | --- |
| [**write_type**](#write_type) | how the client writes data; allowed values: `batching`, `asynchronous`, `synchronous`| `batching` |
| **batch_size** | the number of data point to collect in batch | `1000` |
| **flush_interval** | the number of milliseconds before the batch is written | `1000` |

##### write_type
* `batching` - data are writes in batches defined by `batch_size`, `flush_interval`, ...
Expand Down
14 changes: 8 additions & 6 deletions influxdb2/client/write_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# coding: utf-8
import logging
import time
from datetime import timedelta
from enum import Enum
from time import sleep

import rx
from rx import operators as ops, Observable
Expand All @@ -24,8 +25,8 @@ class WriteType(Enum):

class WriteOptions(object):

def __init__(self, write_type=WriteType.batching, batch_size=1_000, flush_interval=None, jitter_interval=None,
retry_interval=None, buffer_limit=None, write_scheduler=None) -> None:
def __init__(self, write_type=WriteType.batching, batch_size=1_000, flush_interval=1_000, jitter_interval=None,
retry_interval=None, buffer_limit=None, write_scheduler=NewThreadScheduler()) -> None:
self.write_type = write_type
self.batch_size = batch_size
self.flush_interval = flush_interval
Expand Down Expand Up @@ -118,9 +119,10 @@ def __init__(self, service, write_options=WriteOptions()) -> None:
if self._write_options.write_type is WriteType.batching:
self._subject = Subject()

observable = self._subject.pipe(ops.observe_on(NewThreadScheduler()))
observable = self._subject.pipe(ops.observe_on(self._write_options.write_scheduler))
self._disposable = observable\
.pipe(ops.window_with_count(write_options.batch_size),
.pipe(ops.window_with_time_or_count(count=write_options.batch_size,
timespan=timedelta(milliseconds=write_options.flush_interval)),
ops.flat_map(lambda v: _window_to_group(v)),
ops.map(mapper=lambda x: self._retryable(x)),
ops.merge_all()) \
Expand Down Expand Up @@ -172,7 +174,7 @@ def __del__(self):
self._subject.dispose()
self._subject = None
# TODO remove sleep
time.sleep(2)
sleep(2)
if self._disposable:
self._disposable.dispose()
self._disposable = None
Expand Down
25 changes: 24 additions & 1 deletion influxdb2_test/test_WriteApiBatching.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def setUp(self) -> None:
header_value="Token my-token")

self._write_client = WriteApiClient(service=WriteService(api_client=self._api_client),
write_options=WriteOptions(batch_size=2))
write_options=WriteOptions(batch_size=2, flush_interval=5_000))

def tearDown(self) -> None:
pass
Expand Down Expand Up @@ -133,6 +133,29 @@ def test_batch_size_group_by(self):

pass

def test_flush_interval(self):
httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=204)

self._write_client.write("my-bucket", "my-org",
["h2o_feet,location=coyote_creek level\\ water_level=1.0 1",
"h2o_feet,location=coyote_creek level\\ water_level=2.0 2"])

time.sleep(1)
self.assertEqual(1, len(httpretty.httpretty.latest_requests))

self._write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek level\\ water_level=3.0 3")

time.sleep(2)

self.assertEqual(1, len(httpretty.httpretty.latest_requests))

time.sleep(3)

self.assertEqual(2, len(httpretty.httpretty.latest_requests))

self.assertEqual("h2o_feet,location=coyote_creek level\\ water_level=3.0 3",
httpretty.httpretty.latest_requests[1].parsed_body)

def test_recover_from_error(self):
httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=204)
httpretty.register_uri(httpretty.POST, uri="http://localhost/write", status=400)
Expand Down

0 comments on commit 832a661

Please sign in to comment.