diff --git a/CHANGELOG.md b/CHANGELOG.md index 022bc2ab..f62acf58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Features 1. [#203](https://github.com/influxdata/influxdb-client-python/issues/219): Bind query parameters +1. [#225](https://github.com/influxdata/influxdb-client-python/pull/225): Exponential random backoff retry strategy ### Bug Fixes 1. [#222](https://github.com/influxdata/influxdb-client-python/pull/222): Pass configured timeout to HTTP client diff --git a/README.rst b/README.rst index d0c1ce60..2de2f180 100644 --- a/README.rst +++ b/README.rst @@ -256,17 +256,20 @@ The batching is configurable by ``write_options``\ : - the number of milliseconds to increase the batch flush interval by a random amount - ``0`` * - **retry_interval** - - the number of milliseconds to retry unsuccessful write. The retry interval is used when the InfluxDB server does not specify "Retry-After" header. + - the number of milliseconds to retry first unsuccessful write. The next retry delay is computed using exponential random backoff. The retry interval is used when the InfluxDB server does not specify "Retry-After" header. - ``5000`` + * - **max_retry_time** + - maximum total retry timeout in milliseconds. + - ``180_000`` * - **max_retries** - the number of max retries when write fails - - ``3`` + - ``5`` * - **max_retry_delay** - the maximum delay between each retry attempt in milliseconds - - ``180_000`` + - ``125_000`` * - **exponential_base** - - the base for the exponential retry delay, the next delay is computed as ``retry_interval * exponential_base^(attempts-1) + random(jitter_interval)`` - - ``5`` + - the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval ``retry_interval * exponential_base^(attempts-1)`` and ``retry_interval * exponential_base^(attempts)``. Example for ``retry_interval=5_000, exponential_base=2, max_retry_delay=125_000, total=5`` Retry delays are random distributed values within the ranges of ``[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]`` + - ``2`` .. code-block:: python diff --git a/examples/import_data_set_sync_batching.py b/examples/import_data_set_sync_batching.py index f5e548a5..4ab37e10 100644 --- a/examples/import_data_set_sync_batching.py +++ b/examples/import_data_set_sync_batching.py @@ -30,7 +30,7 @@ def csv_to_generator(csv_file_path): """ Define Retry strategy - 3 attempts => 2, 4, 8 """ -retries = WritesRetry(total=3, backoff_factor=1, exponential_base=2) +retries = WritesRetry(total=3, retry_interval=1, exponential_base=2) with InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org', retries=retries) as client: """ diff --git a/influxdb_client/client/write/retry.py b/influxdb_client/client/write/retry.py index 9c002ddc..136b8c1d 100644 --- a/influxdb_client/client/write/retry.py +++ b/influxdb_client/client/write/retry.py @@ -1,10 +1,12 @@ """Implementation for Retry strategy during HTTP requests.""" import logging +from datetime import datetime, timedelta from itertools import takewhile from random import random from urllib3 import Retry +from urllib3.exceptions import MaxRetryError, ResponseError from influxdb_client.client.exceptions import InfluxDBError @@ -16,27 +18,49 @@ class WritesRetry(Retry): Writes retry configuration. :param int jitter_interval: random milliseconds when retrying writes - :param int max_retry_delay: maximum delay when retrying write - :param int exponential_base: base for the exponential retry delay, the next delay is computed as - `backoff_factor * exponential_base^(attempts-1) + random(jitter_interval)` + :param num max_retry_delay: maximum delay when retrying write in seconds + :param int max_retry_time: maximum total retry timeout in seconds, attempt after this timout throws MaxRetryError + :param int total: maximum number of retries + :param num retry_interval: initial first retry delay range in seconds + :param int exponential_base: base for the exponential retry delay, + + The next delay is computed as random value between range + `retry_interval * exponential_base^(attempts-1)` and `retry_interval * exponential_base^(attempts) + + Example: for retry_interval=5, exponential_base=2, max_retry_delay=125, total=5 + retry delays are random distributed values within the ranges of + [5-10, 10-20, 20-40, 40-80, 80-125] + """ - def __init__(self, jitter_interval=0, max_retry_delay=180, exponential_base=5, **kw): + def __init__(self, jitter_interval=0, max_retry_delay=125, exponential_base=2, max_retry_time=180, total=5, + retry_interval=5, **kw): """Initialize defaults.""" super().__init__(**kw) self.jitter_interval = jitter_interval + self.total = total + self.retry_interval = retry_interval self.max_retry_delay = max_retry_delay + self.max_retry_time = max_retry_time self.exponential_base = exponential_base + self.retry_timeout = datetime.now() + timedelta(seconds=max_retry_time) def new(self, **kw): """Initialize defaults.""" if 'jitter_interval' not in kw: kw['jitter_interval'] = self.jitter_interval + if 'retry_interval' not in kw: + kw['retry_interval'] = self.retry_interval if 'max_retry_delay' not in kw: kw['max_retry_delay'] = self.max_retry_delay + if 'max_retry_time' not in kw: + kw['max_retry_time'] = self.max_retry_time if 'exponential_base' not in kw: kw['exponential_base'] = self.exponential_base - return super().new(**kw) + + new = super().new(**kw) + new.retry_timeout = self.retry_timeout + return new def is_retry(self, method, status_code, has_retry_after=False): """is_retry doesn't require retry_after header. If there is not Retry-After we will use backoff.""" @@ -58,8 +82,21 @@ def get_backoff_time(self): if consecutive_errors_len < 0: return 0 - backoff_value = self.backoff_factor * (self.exponential_base ** consecutive_errors_len) + self._jitter_delay() - return min(self.max_retry_delay, backoff_value) + range_start = self.retry_interval + range_stop = self.retry_interval * self.exponential_base + + i = 1 + while i <= consecutive_errors_len: + i += 1 + range_start = range_stop + range_stop = range_stop * self.exponential_base + if range_stop > self.max_retry_delay: + break + + if range_stop > self.max_retry_delay: + range_stop = self.max_retry_delay + + return range_start + (range_stop - range_start) * self._random() def get_retry_after(self, response): """Get the value of Retry-After header and append random jitter delay.""" @@ -70,6 +107,9 @@ def get_retry_after(self, response): def increment(self, method=None, url=None, response=None, error=None, _pool=None, _stacktrace=None): """Return a new Retry object with incremented retry counters.""" + if self.retry_timeout < datetime.now(): + raise MaxRetryError(_pool, url, error or ResponseError("max_retry_time exceeded")) + new_retry = super().increment(method, url, response, error, _pool, _stacktrace) if response is not None: @@ -89,3 +129,6 @@ def increment(self, method=None, url=None, response=None, error=None, _pool=None def _jitter_delay(self): return self.jitter_interval * random() + + def _random(self): + return random() diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index ca058dc0..403a7ea3 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -38,9 +38,10 @@ def __init__(self, write_type: WriteType = WriteType.batching, batch_size=1_000, flush_interval=1_000, jitter_interval=0, retry_interval=5_000, - max_retries=3, - max_retry_delay=180_000, - exponential_base=5, + max_retries=5, + max_retry_delay=125_000, + max_retry_time=180_000, + exponential_base=2, write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None: """ Create write api configuration. @@ -51,10 +52,10 @@ def __init__(self, write_type: WriteType = WriteType.batching, :param jitter_interval: this is primarily to avoid large write spikes for users running a large number of client instances ie, a jitter of 5s and flush duration 10s means flushes will happen every 10-15s. :param retry_interval: the time to wait before retry unsuccessful write - :param max_retries: the number of max retries when write fails + :param max_retries: the number of max retries when write fails, 0 means retry is disabled :param max_retry_delay: the maximum delay between each retry attempt in milliseconds - :param exponential_base: base for the exponential retry delay, the next delay is computed as - `retry_interval * exponential_base^(attempts-1) + random(jitter_interval)` + :param max_retry_time: total timeout for all retry attempts in milliseconds, if 0 retry is disabled + :param exponential_base: base for the exponential retry delay :param write_scheduler: """ self.write_type = write_type @@ -64,6 +65,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, self.retry_interval = retry_interval self.max_retries = max_retries self.max_retry_delay = max_retry_delay + self.max_retry_time = max_retry_time self.exponential_base = exponential_base self.write_scheduler = write_scheduler @@ -71,9 +73,10 @@ def to_retry_strategy(self): """Create a Retry strategy from write options.""" return WritesRetry( total=self.max_retries, - backoff_factor=self.retry_interval / 1_000, + retry_interval=self.retry_interval / 1_000, jitter_interval=self.jitter_interval / 1_000, max_retry_delay=self.max_retry_delay / 1_000, + max_retry_time=self.max_retry_time / 1_000, exponential_base=self.exponential_base, method_whitelist=["POST"]) @@ -362,12 +365,7 @@ def _http(self, batch_item: _BatchItem): logger.debug("Write time series data into InfluxDB: %s", batch_item) - retry = WritesRetry( - total=self._write_options.max_retries, - backoff_factor=self._write_options.retry_interval / 1_000, - jitter_interval=self._write_options.jitter_interval / 1_000, - max_retry_delay=self._write_options.max_retry_delay / 1_000, - method_whitelist=["POST"]) + retry = self._write_options.to_retry_strategy() self._post_write(False, batch_item.key.bucket, batch_item.key.org, batch_item.data, batch_item.key.precision, urlopen_kw={'retries': retry}) diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index 2c64c4ec..61cb5f22 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -198,7 +198,7 @@ def test_retry_interval(self): time.sleep(1) self.assertEqual(1, len(httpretty.httpretty.latest_requests), msg="first request immediately") - time.sleep(1.5) + time.sleep(3) self.assertEqual(2, len(httpretty.httpretty.latest_requests), msg="second request after delay_interval") time.sleep(3) @@ -238,6 +238,38 @@ def test_retry_interval_max_retries(self): self.assertEqual(6, len(httpretty.httpretty.latest_requests)) + def test_retry_disabled_max_retries(self): + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=429, + adding_headers={'Retry-After': '1'}) + + self._write_client.close() + self._write_client = WriteApi(influxdb_client=self.influxdb_client, + write_options=WriteOptions(max_retries=0,batch_size=2, flush_interval=1_000)) + + self._write_client.write("my-bucket", "my-org", + ["h2o_feet,location=coyote_creek level\\ water_level=1 1", + "h2o_feet,location=coyote_creek level\\ water_level=2 2"]) + + time.sleep(2) + + self.assertEqual(1, len(httpretty.httpretty.latest_requests)) + + def test_retry_disabled_max_retry_time(self): + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=429, + adding_headers={'Retry-After': '1'}) + + self._write_client.close() + self._write_client = WriteApi(influxdb_client=self.influxdb_client, + write_options=WriteOptions(max_retry_time=0,batch_size=2, flush_interval=1_000)) + + self._write_client.write("my-bucket", "my-org", + ["h2o_feet,location=coyote_creek level\\ water_level=1 1", + "h2o_feet,location=coyote_creek level\\ water_level=2 2"]) + + time.sleep(5) + + self.assertEqual(1, len(httpretty.httpretty.latest_requests)) + def test_recover_from_error(self): httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=400) diff --git a/tests/test_WriteOptions.py b/tests/test_WriteOptions.py index 187813da..911e51f7 100644 --- a/tests/test_WriteOptions.py +++ b/tests/test_WriteOptions.py @@ -7,11 +7,11 @@ class TestWriteOptions(unittest.TestCase): def test_default(self): retry = WriteOptions().to_retry_strategy() - self.assertEqual(retry.total, 3) - self.assertEqual(retry.backoff_factor, 5) - self.assertEqual(retry.jitter_interval, 0) - self.assertEqual(retry.max_retry_delay, 180) - self.assertEqual(retry.exponential_base, 5) + self.assertEqual(retry.total, 5) + self.assertEqual(retry.retry_interval, 5) + self.assertEqual(retry.max_retry_time, 180) + self.assertEqual(retry.max_retry_delay, 125) + self.assertEqual(retry.exponential_base, 2) self.assertEqual(retry.method_whitelist, ["POST"]) def test_custom(self): @@ -21,8 +21,7 @@ def test_custom(self): .to_retry_strategy() self.assertEqual(retry.total, 5) - self.assertEqual(retry.backoff_factor, 0.5) - self.assertEqual(retry.jitter_interval, 2) + self.assertEqual(retry.retry_interval, 0.5) self.assertEqual(retry.max_retry_delay, 7.5) self.assertEqual(retry.exponential_base, 2) self.assertEqual(retry.method_whitelist, ["POST"]) diff --git a/tests/test_WritesRetry.py b/tests/test_WritesRetry.py index f278c051..88ceb91e 100644 --- a/tests/test_WritesRetry.py +++ b/tests/test_WritesRetry.py @@ -1,4 +1,5 @@ import unittest +import time from urllib3 import HTTPResponse from urllib3.exceptions import MaxRetryError @@ -6,28 +7,58 @@ from influxdb_client.client.write.retry import WritesRetry +class NonRandomMinWritesRetry(WritesRetry): + def _random(self): + return 0 + + +class NonRandomMaxWritesRetry(WritesRetry): + def _random(self): + return 1 + + class TestWritesRetry(unittest.TestCase): def test_copy(self): - retry = WritesRetry(jitter_interval=123, exponential_base=3, max_retry_delay=145) - self.assertEqual(retry.jitter_interval, 123) + retry = WritesRetry(exponential_base=3, max_retry_delay=145, total=10) self.assertEqual(retry.max_retry_delay, 145) self.assertEqual(retry.exponential_base, 3) self.assertEqual(retry.total, 10) retry = retry.increment() - self.assertEqual(retry.jitter_interval, 123) self.assertEqual(retry.max_retry_delay, 145) self.assertEqual(retry.exponential_base, 3) self.assertEqual(retry.total, 9) retry = retry.increment() - self.assertEqual(retry.jitter_interval, 123) self.assertEqual(retry.max_retry_delay, 145) self.assertEqual(retry.exponential_base, 3) self.assertEqual(retry.total, 8) - def test_backoff(self): - retry = WritesRetry(total=5, backoff_factor=1, max_retry_delay=550) + def test_backoff_max_time(self): + retry = NonRandomMinWritesRetry(max_retry_time=2) + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 0) + + retry = retry.increment() + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 5) + + retry = retry.increment() + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 10) + + time.sleep(3) + + with self.assertRaises(MaxRetryError) as cm: + retry.increment() + exception = cm.exception + print(exception) + + self.assertEqual("max_retry_time exceeded", exception.reason.args[0]) + + def test_backoff_start_range(self): + retry = NonRandomMinWritesRetry(total=5, retry_interval=1, exponential_base=2, + max_retry_delay=550) self.assertEqual(retry.total, 5) self.assertEqual(retry.is_exhausted(), False) self.assertEqual(retry.get_backoff_time(), 0) @@ -40,22 +71,61 @@ def test_backoff(self): retry = retry.increment() self.assertEqual(retry.total, 3) self.assertEqual(retry.is_exhausted(), False) - self.assertEqual(retry.get_backoff_time(), 5) + self.assertEqual(retry.get_backoff_time(), 2) + + retry = retry.increment() + self.assertEqual(retry.total, 2) + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 4) + + retry = retry.increment() + self.assertEqual(retry.total, 1) + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 8) + + retry = retry.increment() + self.assertEqual(retry.total, 0) + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 16) + + with self.assertRaises(MaxRetryError) as cm: + retry.increment() + exception = cm.exception + + self.assertEqual("too many error responses", exception.reason.args[0]) + + def test_backoff_stop_range(self): + retry = NonRandomMaxWritesRetry(total=5, retry_interval=5, exponential_base=2, + max_retry_delay=550) + + self.assertEqual(retry.total, 5) + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 0) + + retry = retry.increment() + self.assertEqual(retry.total, 4) + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 10) + + retry = retry.increment() + self.assertEqual(retry.total, 3) + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 20) retry = retry.increment() self.assertEqual(retry.total, 2) self.assertEqual(retry.is_exhausted(), False) - self.assertEqual(retry.get_backoff_time(), 25) + self.assertEqual(retry.get_backoff_time(), 40) retry = retry.increment() self.assertEqual(retry.total, 1) self.assertEqual(retry.is_exhausted(), False) - self.assertEqual(retry.get_backoff_time(), 125) + self.assertEqual(retry.get_backoff_time(), 80) retry = retry.increment() self.assertEqual(retry.total, 0) self.assertEqual(retry.is_exhausted(), False) - self.assertEqual(retry.get_backoff_time(), 550) + self.assertEqual(retry.get_backoff_time(), 160) with self.assertRaises(MaxRetryError) as cm: retry.increment() @@ -64,27 +134,27 @@ def test_backoff(self): self.assertEqual("too many error responses", exception.reason.args[0]) def test_backoff_max(self): - retry = WritesRetry(total=5, backoff_factor=1, max_retry_delay=15)\ - .increment()\ - .increment()\ - .increment()\ - .increment()\ + retry = WritesRetry(total=5, retry_interval=1, max_retry_delay=15) \ + .increment() \ + .increment() \ + .increment() \ + .increment() \ .increment() - self.assertEqual(retry.get_backoff_time(), 15) + self.assertLessEqual(retry.get_backoff_time(), 15) - def test_backoff_jitter(self): - retry = WritesRetry(total=5, backoff_factor=4, jitter_interval=2).increment() + def test_backoff_increment(self): + retry = WritesRetry(total=5, retry_interval=4).increment() self.assertEqual(retry.total, 4) self.assertEqual(retry.is_exhausted(), False) backoff_time = retry.get_backoff_time() self.assertGreater(backoff_time, 4) - self.assertLessEqual(backoff_time, 6) + self.assertLessEqual(backoff_time, 8) def test_backoff_exponential_base(self): - retry = WritesRetry(total=5, backoff_factor=2, exponential_base=2) + retry = NonRandomMinWritesRetry(total=5, retry_interval=2, exponential_base=2) retry = retry.increment() self.assertEqual(retry.get_backoff_time(), 2) @@ -145,7 +215,7 @@ def test_logging(self): response.headers.add('Retry-After', '63') with self.assertLogs('influxdb_client.client.write.retry', level='WARNING') as cm: - WritesRetry(total=5, backoff_factor=1, max_retry_delay=15) \ + WritesRetry(total=5, retry_interval=1, max_retry_delay=15) \ .increment(response=response) \ .increment(error=Exception("too many requests")) \ .increment(url='http://localhost:9999')