From 62622e4b9d096a95be56ac71355978c02b30bb3a Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Thu, 15 Apr 2021 13:18:56 +0200 Subject: [PATCH 01/13] feat: implement full jitter retry strategy --- README.rst | 2 +- influxdb_client/client/write/retry.py | 22 ++++++---------------- influxdb_client/client/write_api.py | 1 - tests/test_WritesRetry.py | 27 +++++++++------------------ 4 files changed, 16 insertions(+), 36 deletions(-) diff --git a/README.rst b/README.rst index d0c1ce60..746889b0 100644 --- a/README.rst +++ b/README.rst @@ -265,7 +265,7 @@ The batching is configurable by ``write_options``\ : - the maximum delay between each retry attempt in milliseconds - ``180_000`` * - **exponential_base** - - the base for the exponential retry delay, the next delay is computed as ``retry_interval * exponential_base^(attempts-1) + random(jitter_interval)`` + - the base for the exponential retry delay, the next delay is computed using Full Jitter formula ``retry_interval * exponential_base^(attempts-1) * random()`` - ``5`` diff --git a/influxdb_client/client/write/retry.py b/influxdb_client/client/write/retry.py index 9c002ddc..370a8377 100644 --- a/influxdb_client/client/write/retry.py +++ b/influxdb_client/client/write/retry.py @@ -15,23 +15,19 @@ class WritesRetry(Retry): """ Writes retry configuration. - :param int jitter_interval: random milliseconds when retrying writes :param int max_retry_delay: maximum delay when retrying write :param int exponential_base: base for the exponential retry delay, the next delay is computed as - `backoff_factor * exponential_base^(attempts-1) + random(jitter_interval)` + `backoff_factor * exponential_base^(attempts-1) * random()` """ - def __init__(self, jitter_interval=0, max_retry_delay=180, exponential_base=5, **kw): + def __init__(self, max_retry_delay=180, exponential_base=5, **kw): """Initialize defaults.""" super().__init__(**kw) - self.jitter_interval = jitter_interval self.max_retry_delay = max_retry_delay self.exponential_base = exponential_base def new(self, **kw): """Initialize defaults.""" - if 'jitter_interval' not in kw: - kw['jitter_interval'] = self.jitter_interval if 'max_retry_delay' not in kw: kw['max_retry_delay'] = self.max_retry_delay if 'exponential_base' not in kw: @@ -58,16 +54,10 @@ def get_backoff_time(self): if consecutive_errors_len < 0: return 0 - backoff_value = self.backoff_factor * (self.exponential_base ** consecutive_errors_len) + self._jitter_delay() + # Full Jitter strategy + backoff_value = self.backoff_factor * (self.exponential_base ** consecutive_errors_len) * self._random() return min(self.max_retry_delay, backoff_value) - def get_retry_after(self, response): - """Get the value of Retry-After header and append random jitter delay.""" - retry_after = super().get_retry_after(response) - if retry_after: - retry_after += self._jitter_delay() - return retry_after - def increment(self, method=None, url=None, response=None, error=None, _pool=None, _stacktrace=None): """Return a new Retry object with incremented retry counters.""" new_retry = super().increment(method, url, response, error, _pool, _stacktrace) @@ -87,5 +77,5 @@ def increment(self, method=None, url=None, response=None, error=None, _pool=None return new_retry - def _jitter_delay(self): - return self.jitter_interval * random() + def _random(self): + return random() diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index ca058dc0..98c5cb35 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -72,7 +72,6 @@ def to_retry_strategy(self): return WritesRetry( total=self.max_retries, backoff_factor=self.retry_interval / 1_000, - jitter_interval=self.jitter_interval / 1_000, max_retry_delay=self.max_retry_delay / 1_000, exponential_base=self.exponential_base, method_whitelist=["POST"]) diff --git a/tests/test_WritesRetry.py b/tests/test_WritesRetry.py index f278c051..dcf86038 100644 --- a/tests/test_WritesRetry.py +++ b/tests/test_WritesRetry.py @@ -6,28 +6,29 @@ from influxdb_client.client.write.retry import WritesRetry +class NonRandomWritesRetry(WritesRetry): + def _random(self): + return 1 + class TestWritesRetry(unittest.TestCase): def test_copy(self): - retry = WritesRetry(jitter_interval=123, exponential_base=3, max_retry_delay=145) - self.assertEqual(retry.jitter_interval, 123) + retry = WritesRetry(exponential_base=3, max_retry_delay=145) self.assertEqual(retry.max_retry_delay, 145) self.assertEqual(retry.exponential_base, 3) self.assertEqual(retry.total, 10) retry = retry.increment() - self.assertEqual(retry.jitter_interval, 123) self.assertEqual(retry.max_retry_delay, 145) self.assertEqual(retry.exponential_base, 3) self.assertEqual(retry.total, 9) retry = retry.increment() - self.assertEqual(retry.jitter_interval, 123) self.assertEqual(retry.max_retry_delay, 145) self.assertEqual(retry.exponential_base, 3) self.assertEqual(retry.total, 8) def test_backoff(self): - retry = WritesRetry(total=5, backoff_factor=1, max_retry_delay=550) + retry = NonRandomWritesRetry(total=5, backoff_factor=1, max_retry_delay=550) self.assertEqual(retry.total, 5) self.assertEqual(retry.is_exhausted(), False) self.assertEqual(retry.get_backoff_time(), 0) @@ -74,17 +75,16 @@ def test_backoff_max(self): self.assertEqual(retry.get_backoff_time(), 15) def test_backoff_jitter(self): - retry = WritesRetry(total=5, backoff_factor=4, jitter_interval=2).increment() + retry = WritesRetry(total=5, backoff_factor=4).increment() self.assertEqual(retry.total, 4) self.assertEqual(retry.is_exhausted(), False) backoff_time = retry.get_backoff_time() - self.assertGreater(backoff_time, 4) - self.assertLessEqual(backoff_time, 6) + self.assertLessEqual(backoff_time, 4) def test_backoff_exponential_base(self): - retry = WritesRetry(total=5, backoff_factor=2, exponential_base=2) + retry = NonRandomWritesRetry(total=5, backoff_factor=2, exponential_base=2) retry = retry.increment() self.assertEqual(retry.get_backoff_time(), 2) @@ -105,15 +105,6 @@ def test_get_retry_after(self): retry = WritesRetry() self.assertEqual(retry.get_retry_after(response), 5) - def test_get_retry_after_jitter(self): - response = HTTPResponse() - response.headers.add('Retry-After', '5') - - retry = WritesRetry(jitter_interval=2) - retry_after = retry.get_retry_after(response) - self.assertGreater(retry_after, 5) - self.assertLessEqual(retry_after, 7) - def test_is_retry(self): retry = WritesRetry(method_whitelist=["POST"]) From d642c740cb23f1d1d507f413f8e9d507fafab65b Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Thu, 15 Apr 2021 14:30:50 +0200 Subject: [PATCH 02/13] feat: implement full jitter retry strategy --- influxdb_client/client/write_api.py | 1 - tests/test_WriteOptions.py | 2 -- 2 files changed, 3 deletions(-) diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 98c5cb35..8ad2abfa 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -364,7 +364,6 @@ def _http(self, batch_item: _BatchItem): retry = WritesRetry( total=self._write_options.max_retries, backoff_factor=self._write_options.retry_interval / 1_000, - jitter_interval=self._write_options.jitter_interval / 1_000, max_retry_delay=self._write_options.max_retry_delay / 1_000, method_whitelist=["POST"]) diff --git a/tests/test_WriteOptions.py b/tests/test_WriteOptions.py index 187813da..7745d3d5 100644 --- a/tests/test_WriteOptions.py +++ b/tests/test_WriteOptions.py @@ -9,7 +9,6 @@ def test_default(self): self.assertEqual(retry.total, 3) self.assertEqual(retry.backoff_factor, 5) - self.assertEqual(retry.jitter_interval, 0) self.assertEqual(retry.max_retry_delay, 180) self.assertEqual(retry.exponential_base, 5) self.assertEqual(retry.method_whitelist, ["POST"]) @@ -22,7 +21,6 @@ def test_custom(self): self.assertEqual(retry.total, 5) self.assertEqual(retry.backoff_factor, 0.5) - self.assertEqual(retry.jitter_interval, 2) self.assertEqual(retry.max_retry_delay, 7.5) self.assertEqual(retry.exponential_base, 2) self.assertEqual(retry.method_whitelist, ["POST"]) From 659e93430d5bbb26f414d09e7cf83f032e1f1e49 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Thu, 15 Apr 2021 15:03:12 +0200 Subject: [PATCH 03/13] feat: implement full jitter retry strategy --- influxdb_client/client/write_api.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 8ad2abfa..4f8693ab 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -361,11 +361,7 @@ def _http(self, batch_item: _BatchItem): logger.debug("Write time series data into InfluxDB: %s", batch_item) - retry = WritesRetry( - total=self._write_options.max_retries, - backoff_factor=self._write_options.retry_interval / 1_000, - max_retry_delay=self._write_options.max_retry_delay / 1_000, - method_whitelist=["POST"]) + retry = self._write_options.to_retry_strategy() self._post_write(False, batch_item.key.bucket, batch_item.key.org, batch_item.data, batch_item.key.precision, urlopen_kw={'retries': retry}) From 56bb02d8c74f9f2b8f7bc3de2288027ed54f9aba Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Tue, 20 Apr 2021 14:22:06 +0200 Subject: [PATCH 04/13] feat: implement full jitter retry strategy --- README.rst | 14 +++++--- influxdb_client/client/write/retry.py | 48 ++++++++++++++++++++++---- influxdb_client/client/write_api.py | 18 +++++++--- tests/test_WriteApiBatching.py | 32 +++++++++++++++++ tests/test_WritesRetry.py | 49 ++++++++++++++++++++------- 5 files changed, 134 insertions(+), 27 deletions(-) diff --git a/README.rst b/README.rst index 746889b0..c2e16227 100644 --- a/README.rst +++ b/README.rst @@ -256,17 +256,23 @@ The batching is configurable by ``write_options``\ : - the number of milliseconds to increase the batch flush interval by a random amount - ``0`` * - **retry_interval** - - the number of milliseconds to retry unsuccessful write. The retry interval is used when the InfluxDB server does not specify "Retry-After" header. + - the number of milliseconds to retry first unsuccessful write. The next retry delay is computed using Full Jitter formula. The retry interval is used when the InfluxDB server does not specify "Retry-After" header. - ``5000`` + * - **max_retry_time** + - maximum total retry timout in milliseconds. + - ``180_000`` * - **max_retries** - the number of max retries when write fails - - ``3`` + - ``10`` * - **max_retry_delay** - the maximum delay between each retry attempt in milliseconds - - ``180_000`` + - ``150_000`` + * - **min_retry_delay** + - the minimum delay between each retry attempt in milliseconds + - ``1_000`` * - **exponential_base** - the base for the exponential retry delay, the next delay is computed using Full Jitter formula ``retry_interval * exponential_base^(attempts-1) * random()`` - - ``5`` + - ``2`` .. code-block:: python diff --git a/influxdb_client/client/write/retry.py b/influxdb_client/client/write/retry.py index 370a8377..8e21a48d 100644 --- a/influxdb_client/client/write/retry.py +++ b/influxdb_client/client/write/retry.py @@ -1,10 +1,12 @@ """Implementation for Retry strategy during HTTP requests.""" import logging +from datetime import datetime, timedelta from itertools import takewhile from random import random from urllib3 import Retry +from urllib3.exceptions import MaxRetryError, ResponseError from influxdb_client.client.exceptions import InfluxDBError @@ -15,24 +17,44 @@ class WritesRetry(Retry): """ Writes retry configuration. - :param int max_retry_delay: maximum delay when retrying write + :param int max_retry_time: maximum total retry timout in seconds, attempt after this timout throws MaxRetryError + :param int total: maximum number of retries + :param num backoff_factor: initial first retry delay range in seconds + :param num max_retry_delay: maximum delay when retrying write in seconds + :param num min_retry_delay: minimum delay when retrying write in seconds :param int exponential_base: base for the exponential retry delay, the next delay is computed as `backoff_factor * exponential_base^(attempts-1) * random()` """ - def __init__(self, max_retry_delay=180, exponential_base=5, **kw): + def __init__(self, max_retry_time=180, total=10, backoff_factor=5, max_retry_delay=125, min_retry_delay=1, + exponential_base=2, **kw): """Initialize defaults.""" super().__init__(**kw) + self.total = total + self.backoff_factor = backoff_factor self.max_retry_delay = max_retry_delay + self.min_retry_delay = min_retry_delay + self.max_retry_time = max_retry_time self.exponential_base = exponential_base + self.retry_timeout = datetime.now() + timedelta(seconds=max_retry_time) def new(self, **kw): """Initialize defaults.""" if 'max_retry_delay' not in kw: kw['max_retry_delay'] = self.max_retry_delay + + if 'min_retry_delay' not in kw: + kw['min_retry_delay'] = self.min_retry_delay + + if 'max_retry_time' not in kw: + kw['max_retry_time'] = self.max_retry_time + if 'exponential_base' not in kw: kw['exponential_base'] = self.exponential_base - return super().new(**kw) + + new = super().new(**kw) + new.retry_timeout = self.retry_timeout + return new def is_retry(self, method, status_code, has_retry_after=False): """is_retry doesn't require retry_after header. If there is not Retry-After we will use backoff.""" @@ -54,12 +76,26 @@ def get_backoff_time(self): if consecutive_errors_len < 0: return 0 - # Full Jitter strategy - backoff_value = self.backoff_factor * (self.exponential_base ** consecutive_errors_len) * self._random() - return min(self.max_retry_delay, backoff_value) + delay_range = self.backoff_factor + i = 1 + while i <= consecutive_errors_len: + i += 1 + delay_range = delay_range * self.exponential_base + if delay_range > self.max_retry_delay: + break + + delay = delay_range * self._random() + # at least min_retry_delay + delay = max(self.min_retry_delay, delay) + # at most max_retry_delay + delay = min(self.max_retry_delay, delay) + return delay def increment(self, method=None, url=None, response=None, error=None, _pool=None, _stacktrace=None): """Return a new Retry object with incremented retry counters.""" + if self.retry_timeout < datetime.now(): + raise MaxRetryError(_pool, url, error or ResponseError("max_retry_time exceeded")) + new_retry = super().increment(method, url, response, error, _pool, _stacktrace) if response is not None: diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 4f8693ab..951383d8 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -38,9 +38,11 @@ def __init__(self, write_type: WriteType = WriteType.batching, batch_size=1_000, flush_interval=1_000, jitter_interval=0, retry_interval=5_000, - max_retries=3, - max_retry_delay=180_000, - exponential_base=5, + max_retries=10, + max_retry_delay=150_000, + min_retry_delay=1_000, + max_retry_time=180_000, + exponential_base=2, write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None: """ Create write api configuration. @@ -51,8 +53,10 @@ def __init__(self, write_type: WriteType = WriteType.batching, :param jitter_interval: this is primarily to avoid large write spikes for users running a large number of client instances ie, a jitter of 5s and flush duration 10s means flushes will happen every 10-15s. :param retry_interval: the time to wait before retry unsuccessful write - :param max_retries: the number of max retries when write fails + :param max_retries: the number of max retries when write fails, 0 means retry is disabled :param max_retry_delay: the maximum delay between each retry attempt in milliseconds + :param min_retry_delay: the minimum delay between each retry attempt in milliseconds + :param max_retry_time: total timeout for all retry attempts in milliseconds, if 0 retry is disabled :param exponential_base: base for the exponential retry delay, the next delay is computed as `retry_interval * exponential_base^(attempts-1) + random(jitter_interval)` :param write_scheduler: @@ -64,6 +68,8 @@ def __init__(self, write_type: WriteType = WriteType.batching, self.retry_interval = retry_interval self.max_retries = max_retries self.max_retry_delay = max_retry_delay + self.min_retry_delay = min_retry_delay + self.max_retry_time = max_retry_time self.exponential_base = exponential_base self.write_scheduler = write_scheduler @@ -73,8 +79,10 @@ def to_retry_strategy(self): total=self.max_retries, backoff_factor=self.retry_interval / 1_000, max_retry_delay=self.max_retry_delay / 1_000, + min_retry_delay=self.min_retry_delay / 1_000, + max_retry_time=self.max_retry_time / 1000, exponential_base=self.exponential_base, - method_whitelist=["POST"]) + allowed_methods=["POST"]) def __getstate__(self): """Return a dict of attributes that you want to pickle.""" diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index 2c64c4ec..876ee864 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -238,6 +238,38 @@ def test_retry_interval_max_retries(self): self.assertEqual(6, len(httpretty.httpretty.latest_requests)) + def test_retry_disabled_max_retries(self): + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=429, + adding_headers={'Retry-After': '1'}) + + self._write_client.close() + self._write_client = WriteApi(influxdb_client=self.influxdb_client, + write_options=WriteOptions(max_retries=0,batch_size=2, flush_interval=1_000)) + + self._write_client.write("my-bucket", "my-org", + ["h2o_feet,location=coyote_creek level\\ water_level=1 1", + "h2o_feet,location=coyote_creek level\\ water_level=2 2"]) + + time.sleep(2) + + self.assertEqual(1, len(httpretty.httpretty.latest_requests)) + + def test_retry_disabled_max_retry_time(self): + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=429, + adding_headers={'Retry-After': '1'}) + + self._write_client.close() + self._write_client = WriteApi(influxdb_client=self.influxdb_client, + write_options=WriteOptions(max_retry_time=0,batch_size=2, flush_interval=1_000)) + + self._write_client.write("my-bucket", "my-org", + ["h2o_feet,location=coyote_creek level\\ water_level=1 1", + "h2o_feet,location=coyote_creek level\\ water_level=2 2"]) + + time.sleep(2) + + self.assertEqual(1, len(httpretty.httpretty.latest_requests)) + def test_recover_from_error(self): httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=204) httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/write", status=400) diff --git a/tests/test_WritesRetry.py b/tests/test_WritesRetry.py index dcf86038..b20e69e5 100644 --- a/tests/test_WritesRetry.py +++ b/tests/test_WritesRetry.py @@ -1,4 +1,5 @@ import unittest +import time from urllib3 import HTTPResponse from urllib3.exceptions import MaxRetryError @@ -10,9 +11,10 @@ class NonRandomWritesRetry(WritesRetry): def _random(self): return 1 + class TestWritesRetry(unittest.TestCase): def test_copy(self): - retry = WritesRetry(exponential_base=3, max_retry_delay=145) + retry = WritesRetry(exponential_base=3, max_retry_delay=145, total=10) self.assertEqual(retry.max_retry_delay, 145) self.assertEqual(retry.exponential_base, 3) self.assertEqual(retry.total, 10) @@ -27,8 +29,31 @@ def test_copy(self): self.assertEqual(retry.exponential_base, 3) self.assertEqual(retry.total, 8) + def test_backoff_max_time(self): + retry = NonRandomWritesRetry(max_retry_time=2) + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 0) + + retry = retry.increment() + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 5) + + retry = retry.increment() + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 10) + + time.sleep(3) + + with self.assertRaises(MaxRetryError) as cm: + retry.increment() + exception = cm.exception + print(exception) + + self.assertEqual("max_retry_time exceeded", exception.reason.args[0]) + def test_backoff(self): - retry = NonRandomWritesRetry(total=5, backoff_factor=1, max_retry_delay=550) + retry = NonRandomWritesRetry(total=5, backoff_factor=1, min_retry_delay=1, exponential_base=2, + max_retry_delay=550) self.assertEqual(retry.total, 5) self.assertEqual(retry.is_exhausted(), False) self.assertEqual(retry.get_backoff_time(), 0) @@ -41,22 +66,22 @@ def test_backoff(self): retry = retry.increment() self.assertEqual(retry.total, 3) self.assertEqual(retry.is_exhausted(), False) - self.assertEqual(retry.get_backoff_time(), 5) + self.assertEqual(retry.get_backoff_time(), 2) retry = retry.increment() self.assertEqual(retry.total, 2) self.assertEqual(retry.is_exhausted(), False) - self.assertEqual(retry.get_backoff_time(), 25) + self.assertEqual(retry.get_backoff_time(), 4) retry = retry.increment() self.assertEqual(retry.total, 1) self.assertEqual(retry.is_exhausted(), False) - self.assertEqual(retry.get_backoff_time(), 125) + self.assertEqual(retry.get_backoff_time(), 8) retry = retry.increment() self.assertEqual(retry.total, 0) self.assertEqual(retry.is_exhausted(), False) - self.assertEqual(retry.get_backoff_time(), 550) + self.assertEqual(retry.get_backoff_time(), 16) with self.assertRaises(MaxRetryError) as cm: retry.increment() @@ -65,14 +90,14 @@ def test_backoff(self): self.assertEqual("too many error responses", exception.reason.args[0]) def test_backoff_max(self): - retry = WritesRetry(total=5, backoff_factor=1, max_retry_delay=15)\ - .increment()\ - .increment()\ - .increment()\ - .increment()\ + retry = WritesRetry(total=5, backoff_factor=1, max_retry_delay=15) \ + .increment() \ + .increment() \ + .increment() \ + .increment() \ .increment() - self.assertEqual(retry.get_backoff_time(), 15) + self.assertLessEqual(retry.get_backoff_time(), 15) def test_backoff_jitter(self): retry = WritesRetry(total=5, backoff_factor=4).increment() From 03b5ff4c4b15630b6b1915e2cbee693ec347d9af Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Tue, 20 Apr 2021 14:48:09 +0200 Subject: [PATCH 05/13] feat: fixing default retry settings --- tests/test_WriteOptions.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/test_WriteOptions.py b/tests/test_WriteOptions.py index 7745d3d5..f876abd4 100644 --- a/tests/test_WriteOptions.py +++ b/tests/test_WriteOptions.py @@ -7,10 +7,11 @@ class TestWriteOptions(unittest.TestCase): def test_default(self): retry = WriteOptions().to_retry_strategy() - self.assertEqual(retry.total, 3) + self.assertEqual(retry.total, 10) self.assertEqual(retry.backoff_factor, 5) - self.assertEqual(retry.max_retry_delay, 180) - self.assertEqual(retry.exponential_base, 5) + self.assertEqual(retry.max_retry_time, 180) + self.assertEqual(retry.max_retry_delay, 150) + self.assertEqual(retry.exponential_base, 2) self.assertEqual(retry.method_whitelist, ["POST"]) def test_custom(self): From 5b16095336677cd00c0e8d5c2f1aec254c53dbdf Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Tue, 20 Apr 2021 14:58:21 +0200 Subject: [PATCH 06/13] feat: fixing min_retry_delay in full jitter backoff formula --- influxdb_client/client/write/retry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb_client/client/write/retry.py b/influxdb_client/client/write/retry.py index 8e21a48d..8cf75bcb 100644 --- a/influxdb_client/client/write/retry.py +++ b/influxdb_client/client/write/retry.py @@ -84,7 +84,7 @@ def get_backoff_time(self): if delay_range > self.max_retry_delay: break - delay = delay_range * self._random() + delay = self.min_retry_delay + (delay_range - self.min_retry_delay) * self._random() # at least min_retry_delay delay = max(self.min_retry_delay, delay) # at most max_retry_delay From 34767da8eb298eedc2f887cf3c9462d30166ab38 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Mon, 26 Apr 2021 12:01:23 +0200 Subject: [PATCH 07/13] feat: random exponential backoff retry --- CHANGELOG.md | 1 + README.rst | 9 ++--- influxdb_client/client/write/retry.py | 38 +++++++++--------- influxdb_client/client/write_api.py | 9 +---- tests/test_WriteApiBatching.py | 4 +- tests/test_WriteOptions.py | 2 +- tests/test_WritesRetry.py | 58 +++++++++++++++++++++++---- 7 files changed, 80 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 022bc2ab..f62acf58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Features 1. [#203](https://github.com/influxdata/influxdb-client-python/issues/219): Bind query parameters +1. [#225](https://github.com/influxdata/influxdb-client-python/pull/225): Exponential random backoff retry strategy ### Bug Fixes 1. [#222](https://github.com/influxdata/influxdb-client-python/pull/222): Pass configured timeout to HTTP client diff --git a/README.rst b/README.rst index c2e16227..6bf4e85d 100644 --- a/README.rst +++ b/README.rst @@ -256,7 +256,7 @@ The batching is configurable by ``write_options``\ : - the number of milliseconds to increase the batch flush interval by a random amount - ``0`` * - **retry_interval** - - the number of milliseconds to retry first unsuccessful write. The next retry delay is computed using Full Jitter formula. The retry interval is used when the InfluxDB server does not specify "Retry-After" header. + - the number of milliseconds to retry first unsuccessful write. The next retry delay is computed using exponential random backoff. The retry interval is used when the InfluxDB server does not specify "Retry-After" header. - ``5000`` * - **max_retry_time** - maximum total retry timout in milliseconds. @@ -266,12 +266,9 @@ The batching is configurable by ``write_options``\ : - ``10`` * - **max_retry_delay** - the maximum delay between each retry attempt in milliseconds - - ``150_000`` - * - **min_retry_delay** - - the minimum delay between each retry attempt in milliseconds - - ``1_000`` + - ``125_000`` * - **exponential_base** - - the base for the exponential retry delay, the next delay is computed using Full Jitter formula ``retry_interval * exponential_base^(attempts-1) * random()`` + - the base for the exponential retry delay, the next delay is computed using random exponential backoff. Example for ``retry_interval=5_000, exponential_base=2, max_retry_delay=125_000, total=5`` Retry delays are random distributed values within the ranges of ``[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]`` - ``2`` diff --git a/influxdb_client/client/write/retry.py b/influxdb_client/client/write/retry.py index 8cf75bcb..3e724f97 100644 --- a/influxdb_client/client/write/retry.py +++ b/influxdb_client/client/write/retry.py @@ -21,19 +21,23 @@ class WritesRetry(Retry): :param int total: maximum number of retries :param num backoff_factor: initial first retry delay range in seconds :param num max_retry_delay: maximum delay when retrying write in seconds - :param num min_retry_delay: minimum delay when retrying write in seconds - :param int exponential_base: base for the exponential retry delay, the next delay is computed as - `backoff_factor * exponential_base^(attempts-1) * random()` + :param int exponential_base: base for the exponential retry delay, + + The next delay is computed as random value between range + `backoff_factor * exponential_base^(attempts-1)` and `backoff_factor * exponential_base^(attempts) + + Example: for backoff_factor=5, exponential_base=2, max_retry_delay=125, total=5 + retry delays are random distributed values within the ranges of + [5-10, 10-20, 20-40, 40-80, 80-125] + """ - def __init__(self, max_retry_time=180, total=10, backoff_factor=5, max_retry_delay=125, min_retry_delay=1, - exponential_base=2, **kw): + def __init__(self, max_retry_time=180, total=10, backoff_factor=5, max_retry_delay=125, exponential_base=2, **kw): """Initialize defaults.""" super().__init__(**kw) self.total = total self.backoff_factor = backoff_factor self.max_retry_delay = max_retry_delay - self.min_retry_delay = min_retry_delay self.max_retry_time = max_retry_time self.exponential_base = exponential_base self.retry_timeout = datetime.now() + timedelta(seconds=max_retry_time) @@ -43,9 +47,6 @@ def new(self, **kw): if 'max_retry_delay' not in kw: kw['max_retry_delay'] = self.max_retry_delay - if 'min_retry_delay' not in kw: - kw['min_retry_delay'] = self.min_retry_delay - if 'max_retry_time' not in kw: kw['max_retry_time'] = self.max_retry_time @@ -76,20 +77,21 @@ def get_backoff_time(self): if consecutive_errors_len < 0: return 0 - delay_range = self.backoff_factor + range_start = self.backoff_factor + range_stop = self.backoff_factor * self.exponential_base + i = 1 while i <= consecutive_errors_len: i += 1 - delay_range = delay_range * self.exponential_base - if delay_range > self.max_retry_delay: + range_start = range_stop + range_stop = range_stop * self.exponential_base + if range_stop > self.max_retry_delay: break - delay = self.min_retry_delay + (delay_range - self.min_retry_delay) * self._random() - # at least min_retry_delay - delay = max(self.min_retry_delay, delay) - # at most max_retry_delay - delay = min(self.max_retry_delay, delay) - return delay + if range_stop > self.max_retry_delay: + range_stop = self.max_retry_delay + + return range_start + (range_stop - range_start) * self._random() def increment(self, method=None, url=None, response=None, error=None, _pool=None, _stacktrace=None): """Return a new Retry object with incremented retry counters.""" diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 951383d8..3384a852 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -39,8 +39,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, jitter_interval=0, retry_interval=5_000, max_retries=10, - max_retry_delay=150_000, - min_retry_delay=1_000, + max_retry_delay=125_000, max_retry_time=180_000, exponential_base=2, write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None: @@ -55,10 +54,8 @@ def __init__(self, write_type: WriteType = WriteType.batching, :param retry_interval: the time to wait before retry unsuccessful write :param max_retries: the number of max retries when write fails, 0 means retry is disabled :param max_retry_delay: the maximum delay between each retry attempt in milliseconds - :param min_retry_delay: the minimum delay between each retry attempt in milliseconds :param max_retry_time: total timeout for all retry attempts in milliseconds, if 0 retry is disabled - :param exponential_base: base for the exponential retry delay, the next delay is computed as - `retry_interval * exponential_base^(attempts-1) + random(jitter_interval)` + :param exponential_base: base for the exponential retry delay :param write_scheduler: """ self.write_type = write_type @@ -68,7 +65,6 @@ def __init__(self, write_type: WriteType = WriteType.batching, self.retry_interval = retry_interval self.max_retries = max_retries self.max_retry_delay = max_retry_delay - self.min_retry_delay = min_retry_delay self.max_retry_time = max_retry_time self.exponential_base = exponential_base self.write_scheduler = write_scheduler @@ -79,7 +75,6 @@ def to_retry_strategy(self): total=self.max_retries, backoff_factor=self.retry_interval / 1_000, max_retry_delay=self.max_retry_delay / 1_000, - min_retry_delay=self.min_retry_delay / 1_000, max_retry_time=self.max_retry_time / 1000, exponential_base=self.exponential_base, allowed_methods=["POST"]) diff --git a/tests/test_WriteApiBatching.py b/tests/test_WriteApiBatching.py index 876ee864..61cb5f22 100644 --- a/tests/test_WriteApiBatching.py +++ b/tests/test_WriteApiBatching.py @@ -198,7 +198,7 @@ def test_retry_interval(self): time.sleep(1) self.assertEqual(1, len(httpretty.httpretty.latest_requests), msg="first request immediately") - time.sleep(1.5) + time.sleep(3) self.assertEqual(2, len(httpretty.httpretty.latest_requests), msg="second request after delay_interval") time.sleep(3) @@ -266,7 +266,7 @@ def test_retry_disabled_max_retry_time(self): ["h2o_feet,location=coyote_creek level\\ water_level=1 1", "h2o_feet,location=coyote_creek level\\ water_level=2 2"]) - time.sleep(2) + time.sleep(5) self.assertEqual(1, len(httpretty.httpretty.latest_requests)) diff --git a/tests/test_WriteOptions.py b/tests/test_WriteOptions.py index f876abd4..856e65ee 100644 --- a/tests/test_WriteOptions.py +++ b/tests/test_WriteOptions.py @@ -10,7 +10,7 @@ def test_default(self): self.assertEqual(retry.total, 10) self.assertEqual(retry.backoff_factor, 5) self.assertEqual(retry.max_retry_time, 180) - self.assertEqual(retry.max_retry_delay, 150) + self.assertEqual(retry.max_retry_delay, 125) self.assertEqual(retry.exponential_base, 2) self.assertEqual(retry.method_whitelist, ["POST"]) diff --git a/tests/test_WritesRetry.py b/tests/test_WritesRetry.py index b20e69e5..d4129f1c 100644 --- a/tests/test_WritesRetry.py +++ b/tests/test_WritesRetry.py @@ -7,7 +7,12 @@ from influxdb_client.client.write.retry import WritesRetry -class NonRandomWritesRetry(WritesRetry): +class NonRandomMinWritesRetry(WritesRetry): + def _random(self): + return 0 + + +class NonRandomMaxWritesRetry(WritesRetry): def _random(self): return 1 @@ -30,7 +35,7 @@ def test_copy(self): self.assertEqual(retry.total, 8) def test_backoff_max_time(self): - retry = NonRandomWritesRetry(max_retry_time=2) + retry = NonRandomMinWritesRetry(max_retry_time=2) self.assertEqual(retry.is_exhausted(), False) self.assertEqual(retry.get_backoff_time(), 0) @@ -51,9 +56,9 @@ def test_backoff_max_time(self): self.assertEqual("max_retry_time exceeded", exception.reason.args[0]) - def test_backoff(self): - retry = NonRandomWritesRetry(total=5, backoff_factor=1, min_retry_delay=1, exponential_base=2, - max_retry_delay=550) + def test_backoff_start_range(self): + retry = NonRandomMinWritesRetry(total=5, backoff_factor=1, exponential_base=2, + max_retry_delay=550) self.assertEqual(retry.total, 5) self.assertEqual(retry.is_exhausted(), False) self.assertEqual(retry.get_backoff_time(), 0) @@ -89,6 +94,45 @@ def test_backoff(self): self.assertEqual("too many error responses", exception.reason.args[0]) + def test_backoff_stop_range(self): + retry = NonRandomMaxWritesRetry(total=5, backoff_factor=5, exponential_base=2, + max_retry_delay=550) + + self.assertEqual(retry.total, 5) + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 0) + + retry = retry.increment() + self.assertEqual(retry.total, 4) + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 10) + + retry = retry.increment() + self.assertEqual(retry.total, 3) + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 20) + + retry = retry.increment() + self.assertEqual(retry.total, 2) + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 40) + + retry = retry.increment() + self.assertEqual(retry.total, 1) + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 80) + + retry = retry.increment() + self.assertEqual(retry.total, 0) + self.assertEqual(retry.is_exhausted(), False) + self.assertEqual(retry.get_backoff_time(), 160) + + with self.assertRaises(MaxRetryError) as cm: + retry.increment() + exception = cm.exception + + self.assertEqual("too many error responses", exception.reason.args[0]) + def test_backoff_max(self): retry = WritesRetry(total=5, backoff_factor=1, max_retry_delay=15) \ .increment() \ @@ -106,10 +150,10 @@ def test_backoff_jitter(self): self.assertEqual(retry.is_exhausted(), False) backoff_time = retry.get_backoff_time() - self.assertLessEqual(backoff_time, 4) + self.assertLessEqual(backoff_time, 8) def test_backoff_exponential_base(self): - retry = NonRandomWritesRetry(total=5, backoff_factor=2, exponential_base=2) + retry = NonRandomMinWritesRetry(total=5, backoff_factor=2, exponential_base=2) retry = retry.increment() self.assertEqual(retry.get_backoff_time(), 2) From 9c8977ade77cf7bf31b04bfec47d5da2e5c9330f Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Mon, 26 Apr 2021 14:52:40 +0200 Subject: [PATCH 08/13] feat: random exponential backoff retry --- README.rst | 4 ++-- influxdb_client/client/write/retry.py | 17 ++++++++++------- influxdb_client/client/write_api.py | 6 +++--- tests/test_WriteOptions.py | 6 +++--- tests/test_WritesRetry.py | 12 ++++++------ 5 files changed, 24 insertions(+), 21 deletions(-) diff --git a/README.rst b/README.rst index 6bf4e85d..8529a19b 100644 --- a/README.rst +++ b/README.rst @@ -263,12 +263,12 @@ The batching is configurable by ``write_options``\ : - ``180_000`` * - **max_retries** - the number of max retries when write fails - - ``10`` + - ``5`` * - **max_retry_delay** - the maximum delay between each retry attempt in milliseconds - ``125_000`` * - **exponential_base** - - the base for the exponential retry delay, the next delay is computed using random exponential backoff. Example for ``retry_interval=5_000, exponential_base=2, max_retry_delay=125_000, total=5`` Retry delays are random distributed values within the ranges of ``[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]`` + - the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval ``retry_interval * exponential_base^(attempts-1)`` and ``retry_interval * exponential_base^(attempts)``. Example for ``retry_interval=5_000, exponential_base=2, max_retry_delay=125_000, total=5`` Retry delays are random distributed values within the ranges of ``[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]`` - ``2`` diff --git a/influxdb_client/client/write/retry.py b/influxdb_client/client/write/retry.py index 3e724f97..331bf1e1 100644 --- a/influxdb_client/client/write/retry.py +++ b/influxdb_client/client/write/retry.py @@ -19,24 +19,24 @@ class WritesRetry(Retry): :param int max_retry_time: maximum total retry timout in seconds, attempt after this timout throws MaxRetryError :param int total: maximum number of retries - :param num backoff_factor: initial first retry delay range in seconds + :param num retry_interval: initial first retry delay range in seconds :param num max_retry_delay: maximum delay when retrying write in seconds :param int exponential_base: base for the exponential retry delay, The next delay is computed as random value between range - `backoff_factor * exponential_base^(attempts-1)` and `backoff_factor * exponential_base^(attempts) + `retry_interval * exponential_base^(attempts-1)` and `retry_interval * exponential_base^(attempts) - Example: for backoff_factor=5, exponential_base=2, max_retry_delay=125, total=5 + Example: for retry_interval=5, exponential_base=2, max_retry_delay=125, total=5 retry delays are random distributed values within the ranges of [5-10, 10-20, 20-40, 40-80, 80-125] """ - def __init__(self, max_retry_time=180, total=10, backoff_factor=5, max_retry_delay=125, exponential_base=2, **kw): + def __init__(self, max_retry_time=180, total=5, retry_interval=5, max_retry_delay=125, exponential_base=2, **kw): """Initialize defaults.""" super().__init__(**kw) self.total = total - self.backoff_factor = backoff_factor + self.retry_interval = retry_interval self.max_retry_delay = max_retry_delay self.max_retry_time = max_retry_time self.exponential_base = exponential_base @@ -44,6 +44,9 @@ def __init__(self, max_retry_time=180, total=10, backoff_factor=5, max_retry_del def new(self, **kw): """Initialize defaults.""" + if 'retry_interval' not in kw: + kw['retry_interval'] = self.retry_interval + if 'max_retry_delay' not in kw: kw['max_retry_delay'] = self.max_retry_delay @@ -77,8 +80,8 @@ def get_backoff_time(self): if consecutive_errors_len < 0: return 0 - range_start = self.backoff_factor - range_stop = self.backoff_factor * self.exponential_base + range_start = self.retry_interval + range_stop = self.retry_interval * self.exponential_base i = 1 while i <= consecutive_errors_len: diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 3384a852..a64ed1ec 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -38,7 +38,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, batch_size=1_000, flush_interval=1_000, jitter_interval=0, retry_interval=5_000, - max_retries=10, + max_retries=5, max_retry_delay=125_000, max_retry_time=180_000, exponential_base=2, @@ -73,9 +73,9 @@ def to_retry_strategy(self): """Create a Retry strategy from write options.""" return WritesRetry( total=self.max_retries, - backoff_factor=self.retry_interval / 1_000, + retry_interval=self.retry_interval / 1_000, max_retry_delay=self.max_retry_delay / 1_000, - max_retry_time=self.max_retry_time / 1000, + max_retry_time=self.max_retry_time / 1_000, exponential_base=self.exponential_base, allowed_methods=["POST"]) diff --git a/tests/test_WriteOptions.py b/tests/test_WriteOptions.py index 856e65ee..911e51f7 100644 --- a/tests/test_WriteOptions.py +++ b/tests/test_WriteOptions.py @@ -7,8 +7,8 @@ class TestWriteOptions(unittest.TestCase): def test_default(self): retry = WriteOptions().to_retry_strategy() - self.assertEqual(retry.total, 10) - self.assertEqual(retry.backoff_factor, 5) + self.assertEqual(retry.total, 5) + self.assertEqual(retry.retry_interval, 5) self.assertEqual(retry.max_retry_time, 180) self.assertEqual(retry.max_retry_delay, 125) self.assertEqual(retry.exponential_base, 2) @@ -21,7 +21,7 @@ def test_custom(self): .to_retry_strategy() self.assertEqual(retry.total, 5) - self.assertEqual(retry.backoff_factor, 0.5) + self.assertEqual(retry.retry_interval, 0.5) self.assertEqual(retry.max_retry_delay, 7.5) self.assertEqual(retry.exponential_base, 2) self.assertEqual(retry.method_whitelist, ["POST"]) diff --git a/tests/test_WritesRetry.py b/tests/test_WritesRetry.py index d4129f1c..1e3e3a91 100644 --- a/tests/test_WritesRetry.py +++ b/tests/test_WritesRetry.py @@ -57,7 +57,7 @@ def test_backoff_max_time(self): self.assertEqual("max_retry_time exceeded", exception.reason.args[0]) def test_backoff_start_range(self): - retry = NonRandomMinWritesRetry(total=5, backoff_factor=1, exponential_base=2, + retry = NonRandomMinWritesRetry(total=5, retry_interval=1, exponential_base=2, max_retry_delay=550) self.assertEqual(retry.total, 5) self.assertEqual(retry.is_exhausted(), False) @@ -95,7 +95,7 @@ def test_backoff_start_range(self): self.assertEqual("too many error responses", exception.reason.args[0]) def test_backoff_stop_range(self): - retry = NonRandomMaxWritesRetry(total=5, backoff_factor=5, exponential_base=2, + retry = NonRandomMaxWritesRetry(total=5, retry_interval=5, exponential_base=2, max_retry_delay=550) self.assertEqual(retry.total, 5) @@ -134,7 +134,7 @@ def test_backoff_stop_range(self): self.assertEqual("too many error responses", exception.reason.args[0]) def test_backoff_max(self): - retry = WritesRetry(total=5, backoff_factor=1, max_retry_delay=15) \ + retry = WritesRetry(total=5, retry_interval=1, max_retry_delay=15) \ .increment() \ .increment() \ .increment() \ @@ -144,7 +144,7 @@ def test_backoff_max(self): self.assertLessEqual(retry.get_backoff_time(), 15) def test_backoff_jitter(self): - retry = WritesRetry(total=5, backoff_factor=4).increment() + retry = WritesRetry(total=5, retry_interval=4).increment() self.assertEqual(retry.total, 4) self.assertEqual(retry.is_exhausted(), False) @@ -153,7 +153,7 @@ def test_backoff_jitter(self): self.assertLessEqual(backoff_time, 8) def test_backoff_exponential_base(self): - retry = NonRandomMinWritesRetry(total=5, backoff_factor=2, exponential_base=2) + retry = NonRandomMinWritesRetry(total=5, retry_interval=2, exponential_base=2) retry = retry.increment() self.assertEqual(retry.get_backoff_time(), 2) @@ -205,7 +205,7 @@ def test_logging(self): response.headers.add('Retry-After', '63') with self.assertLogs('influxdb_client.client.write.retry', level='WARNING') as cm: - WritesRetry(total=5, backoff_factor=1, max_retry_delay=15) \ + WritesRetry(total=5, retry_interval=1, max_retry_delay=15) \ .increment(response=response) \ .increment(error=Exception("too many requests")) \ .increment(url='http://localhost:9999') From 0b3c5b0019528d4ebcdd8586fbc4867bd62ff835 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Mon, 26 Apr 2021 14:55:06 +0200 Subject: [PATCH 09/13] feat: random exponential backoff retry --- influxdb_client/client/write/retry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/influxdb_client/client/write/retry.py b/influxdb_client/client/write/retry.py index 331bf1e1..66b8b395 100644 --- a/influxdb_client/client/write/retry.py +++ b/influxdb_client/client/write/retry.py @@ -17,7 +17,7 @@ class WritesRetry(Retry): """ Writes retry configuration. - :param int max_retry_time: maximum total retry timout in seconds, attempt after this timout throws MaxRetryError + :param int max_retry_time: maximum total retry timeout in seconds, attempt after this timout throws MaxRetryError :param int total: maximum number of retries :param num retry_interval: initial first retry delay range in seconds :param num max_retry_delay: maximum delay when retrying write in seconds From cd0889890803d893ea0b716d4b563df97b9df28d Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Mon, 26 Apr 2021 15:06:23 +0200 Subject: [PATCH 10/13] feat: random exponential backoff retry --- README.rst | 2 +- influxdb_client/client/write_api.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 8529a19b..2de2f180 100644 --- a/README.rst +++ b/README.rst @@ -259,7 +259,7 @@ The batching is configurable by ``write_options``\ : - the number of milliseconds to retry first unsuccessful write. The next retry delay is computed using exponential random backoff. The retry interval is used when the InfluxDB server does not specify "Retry-After" header. - ``5000`` * - **max_retry_time** - - maximum total retry timout in milliseconds. + - maximum total retry timeout in milliseconds. - ``180_000`` * - **max_retries** - the number of max retries when write fails diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index a64ed1ec..8aa3fdaf 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -77,7 +77,7 @@ def to_retry_strategy(self): max_retry_delay=self.max_retry_delay / 1_000, max_retry_time=self.max_retry_time / 1_000, exponential_base=self.exponential_base, - allowed_methods=["POST"]) + method_whitelist=["POST"]) def __getstate__(self): """Return a dict of attributes that you want to pickle.""" From 61c05936842677811da571a7aca48eddb0c985c0 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Mon, 26 Apr 2021 15:37:18 +0200 Subject: [PATCH 11/13] feat: random exponential backoff retry --- influxdb_client/client/write/retry.py | 22 +++++++++++++++++----- influxdb_client/client/write_api.py | 1 + tests/test_WritesRetry.py | 14 ++++++++++++-- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/influxdb_client/client/write/retry.py b/influxdb_client/client/write/retry.py index 66b8b395..136b8c1d 100644 --- a/influxdb_client/client/write/retry.py +++ b/influxdb_client/client/write/retry.py @@ -17,10 +17,11 @@ class WritesRetry(Retry): """ Writes retry configuration. + :param int jitter_interval: random milliseconds when retrying writes + :param num max_retry_delay: maximum delay when retrying write in seconds :param int max_retry_time: maximum total retry timeout in seconds, attempt after this timout throws MaxRetryError :param int total: maximum number of retries :param num retry_interval: initial first retry delay range in seconds - :param num max_retry_delay: maximum delay when retrying write in seconds :param int exponential_base: base for the exponential retry delay, The next delay is computed as random value between range @@ -32,9 +33,11 @@ class WritesRetry(Retry): """ - def __init__(self, max_retry_time=180, total=5, retry_interval=5, max_retry_delay=125, exponential_base=2, **kw): + def __init__(self, jitter_interval=0, max_retry_delay=125, exponential_base=2, max_retry_time=180, total=5, + retry_interval=5, **kw): """Initialize defaults.""" super().__init__(**kw) + self.jitter_interval = jitter_interval self.total = total self.retry_interval = retry_interval self.max_retry_delay = max_retry_delay @@ -44,15 +47,14 @@ def __init__(self, max_retry_time=180, total=5, retry_interval=5, max_retry_dela def new(self, **kw): """Initialize defaults.""" + if 'jitter_interval' not in kw: + kw['jitter_interval'] = self.jitter_interval if 'retry_interval' not in kw: kw['retry_interval'] = self.retry_interval - if 'max_retry_delay' not in kw: kw['max_retry_delay'] = self.max_retry_delay - if 'max_retry_time' not in kw: kw['max_retry_time'] = self.max_retry_time - if 'exponential_base' not in kw: kw['exponential_base'] = self.exponential_base @@ -96,6 +98,13 @@ def get_backoff_time(self): return range_start + (range_stop - range_start) * self._random() + def get_retry_after(self, response): + """Get the value of Retry-After header and append random jitter delay.""" + retry_after = super().get_retry_after(response) + if retry_after: + retry_after += self._jitter_delay() + return retry_after + def increment(self, method=None, url=None, response=None, error=None, _pool=None, _stacktrace=None): """Return a new Retry object with incremented retry counters.""" if self.retry_timeout < datetime.now(): @@ -118,5 +127,8 @@ def increment(self, method=None, url=None, response=None, error=None, _pool=None return new_retry + def _jitter_delay(self): + return self.jitter_interval * random() + def _random(self): return random() diff --git a/influxdb_client/client/write_api.py b/influxdb_client/client/write_api.py index 8aa3fdaf..403a7ea3 100644 --- a/influxdb_client/client/write_api.py +++ b/influxdb_client/client/write_api.py @@ -74,6 +74,7 @@ def to_retry_strategy(self): return WritesRetry( total=self.max_retries, retry_interval=self.retry_interval / 1_000, + jitter_interval=self.jitter_interval / 1_000, max_retry_delay=self.max_retry_delay / 1_000, max_retry_time=self.max_retry_time / 1_000, exponential_base=self.exponential_base, diff --git a/tests/test_WritesRetry.py b/tests/test_WritesRetry.py index 1e3e3a91..65a58880 100644 --- a/tests/test_WritesRetry.py +++ b/tests/test_WritesRetry.py @@ -144,13 +144,14 @@ def test_backoff_max(self): self.assertLessEqual(retry.get_backoff_time(), 15) def test_backoff_jitter(self): - retry = WritesRetry(total=5, retry_interval=4).increment() + retry = WritesRetry(total=5, retry_interval=4, jitter_interval=2).increment() self.assertEqual(retry.total, 4) self.assertEqual(retry.is_exhausted(), False) backoff_time = retry.get_backoff_time() - self.assertLessEqual(backoff_time, 8) + self.assertGreater(backoff_time, 4) + self.assertLessEqual(backoff_time, 6) def test_backoff_exponential_base(self): retry = NonRandomMinWritesRetry(total=5, retry_interval=2, exponential_base=2) @@ -174,6 +175,15 @@ def test_get_retry_after(self): retry = WritesRetry() self.assertEqual(retry.get_retry_after(response), 5) + def test_get_retry_after_jitter(self): + response = HTTPResponse() + response.headers.add('Retry-After', '5') + + retry = WritesRetry(jitter_interval=2) + retry_after = retry.get_retry_after(response) + self.assertGreater(retry_after, 5) + self.assertLessEqual(retry_after, 7) + def test_is_retry(self): retry = WritesRetry(method_whitelist=["POST"]) From 97da7ae16139d91ead1d517820119bc212140c26 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Mon, 26 Apr 2021 15:46:25 +0200 Subject: [PATCH 12/13] feat: random exponential backoff retry --- examples/import_data_set_sync_batching.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/import_data_set_sync_batching.py b/examples/import_data_set_sync_batching.py index f5e548a5..4ab37e10 100644 --- a/examples/import_data_set_sync_batching.py +++ b/examples/import_data_set_sync_batching.py @@ -30,7 +30,7 @@ def csv_to_generator(csv_file_path): """ Define Retry strategy - 3 attempts => 2, 4, 8 """ -retries = WritesRetry(total=3, backoff_factor=1, exponential_base=2) +retries = WritesRetry(total=3, retry_interval=1, exponential_base=2) with InfluxDBClient(url='http://localhost:8086', token='my-token', org='my-org', retries=retries) as client: """ From 3e641f35d56b48a0d93bb0acbeea94b0865d49b5 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Mon, 26 Apr 2021 16:04:05 +0200 Subject: [PATCH 13/13] feat: random exponential backoff retry --- tests/test_WritesRetry.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_WritesRetry.py b/tests/test_WritesRetry.py index 65a58880..88ceb91e 100644 --- a/tests/test_WritesRetry.py +++ b/tests/test_WritesRetry.py @@ -143,15 +143,15 @@ def test_backoff_max(self): self.assertLessEqual(retry.get_backoff_time(), 15) - def test_backoff_jitter(self): - retry = WritesRetry(total=5, retry_interval=4, jitter_interval=2).increment() + def test_backoff_increment(self): + retry = WritesRetry(total=5, retry_interval=4).increment() self.assertEqual(retry.total, 4) self.assertEqual(retry.is_exhausted(), False) backoff_time = retry.get_backoff_time() self.assertGreater(backoff_time, 4) - self.assertLessEqual(backoff_time, 6) + self.assertLessEqual(backoff_time, 8) def test_backoff_exponential_base(self): retry = NonRandomMinWritesRetry(total=5, retry_interval=2, exponential_base=2)