Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement exponential random retry strategy #225

Merged
merged 13 commits into from
Apr 29, 2021
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
@@ -265,7 +265,7 @@ The batching is configurable by ``write_options``\ :
- the maximum delay between each retry attempt in milliseconds
- ``180_000``
* - **exponential_base**
- the base for the exponential retry delay, the next delay is computed as ``retry_interval * exponential_base^(attempts-1) + random(jitter_interval)``
- the base for the exponential retry delay, the next delay is computed using Full Jitter formula ``retry_interval * exponential_base^(attempts-1) * random()``
- ``5``


22 changes: 6 additions & 16 deletions influxdb_client/client/write/retry.py
Original file line number Diff line number Diff line change
@@ -15,23 +15,19 @@ class WritesRetry(Retry):
"""
Writes retry configuration.

:param int jitter_interval: random milliseconds when retrying writes
:param int max_retry_delay: maximum delay when retrying write
:param int exponential_base: base for the exponential retry delay, the next delay is computed as
`backoff_factor * exponential_base^(attempts-1) + random(jitter_interval)`
`backoff_factor * exponential_base^(attempts-1) * random()`
"""

def __init__(self, jitter_interval=0, max_retry_delay=180, exponential_base=5, **kw):
def __init__(self, max_retry_delay=180, exponential_base=5, **kw):
"""Initialize defaults."""
super().__init__(**kw)
self.jitter_interval = jitter_interval
self.max_retry_delay = max_retry_delay
self.exponential_base = exponential_base

def new(self, **kw):
"""Initialize defaults."""
if 'jitter_interval' not in kw:
kw['jitter_interval'] = self.jitter_interval
if 'max_retry_delay' not in kw:
kw['max_retry_delay'] = self.max_retry_delay
if 'exponential_base' not in kw:
@@ -58,16 +54,10 @@ def get_backoff_time(self):
if consecutive_errors_len < 0:
return 0

backoff_value = self.backoff_factor * (self.exponential_base ** consecutive_errors_len) + self._jitter_delay()
# Full Jitter strategy
backoff_value = self.backoff_factor * (self.exponential_base ** consecutive_errors_len) * self._random()
Copy link

@sranka sranka Apr 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose to change the implementation to compute the next retry delay this way:

def nextDelay(attempt /* 1 means called for the first time */, options):
  range = options.first_retry_range
  i = 1
  while i<attempt:
    i++ 
    range = range * options.exponential_base
    if range > options.max_retry_delay :
      break
  delay = options.min_retry_delay + (range - options.min_retry_delay) * random() /* at least min_retry_delay */
  delay = min(options.max_retry_delay, delay) /* at most max_retry_delay */
  return delay

Additionally, the implementation must ensure that the request is not scheduled for retries after
options.max_retry_time elapsed (max_request_time if possible).

options.max_retry_time can be the only meaningful configurable value from the library user POV. Setting to 0 disables retry.

These could be the defaults:

options.first_retry_range = 5 seconds
options.exponential_base = 2
options.max_retry_delay = 125 seconds
options.min_retry_delay = 1 second
options.max_retry_time = 180 seconds

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This delay function does no guarantee that delay is increasing. If generated random is a lot smaller than in previous attempt, then resulting delay is also smaller. I have no smart proposal how to fix at this moment though :(

Copy link
Contributor

@alespour alespour Apr 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose this simple modification to the above algorithm to ensure that delay values are increasing and increasing enough.

+ def randomArbitrary(min, max) {
+  return random() * (max - min) + min;
+ }
...
- delay = options.min_retry_delay + (range - options.min_retry_delay) * random() /* at least min_retry_delay */
+ delay = options.min_retry_delay + (range - options.min_retry_delay) * options.random /* at least min_retry_delay */
...
+ options.random = randomArbitrary(0.5, 1.0)

Or similarly in the PR like

+ self.random = randomArbitrary(0.5, 1.0)
...
- backoff_value = self.backoff_factor * (self.exponential_base ** consecutive_errors_len) * self._random()
+ backoff_value = self.backoff_factor * (self.exponential_base ** consecutive_errors_len) * self.random

Copy link
Contributor Author

@rhajek rhajek Apr 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return min(self.max_retry_delay, backoff_value)

def get_retry_after(self, response):
"""Get the value of Retry-After header and append random jitter delay."""
retry_after = super().get_retry_after(response)
if retry_after:
retry_after += self._jitter_delay()
return retry_after

def increment(self, method=None, url=None, response=None, error=None, _pool=None, _stacktrace=None):
"""Return a new Retry object with incremented retry counters."""
new_retry = super().increment(method, url, response, error, _pool, _stacktrace)
@@ -87,5 +77,5 @@ def increment(self, method=None, url=None, response=None, error=None, _pool=None

return new_retry

def _jitter_delay(self):
return self.jitter_interval * random()
def _random(self):
return random()
1 change: 0 additions & 1 deletion influxdb_client/client/write_api.py
Original file line number Diff line number Diff line change
@@ -72,7 +72,6 @@ def to_retry_strategy(self):
return WritesRetry(
total=self.max_retries,
backoff_factor=self.retry_interval / 1_000,
jitter_interval=self.jitter_interval / 1_000,
max_retry_delay=self.max_retry_delay / 1_000,
exponential_base=self.exponential_base,
method_whitelist=["POST"])
27 changes: 9 additions & 18 deletions tests/test_WritesRetry.py
Original file line number Diff line number Diff line change
@@ -6,28 +6,29 @@
from influxdb_client.client.write.retry import WritesRetry


class NonRandomWritesRetry(WritesRetry):
def _random(self):
return 1

class TestWritesRetry(unittest.TestCase):
def test_copy(self):
retry = WritesRetry(jitter_interval=123, exponential_base=3, max_retry_delay=145)
self.assertEqual(retry.jitter_interval, 123)
retry = WritesRetry(exponential_base=3, max_retry_delay=145)
self.assertEqual(retry.max_retry_delay, 145)
self.assertEqual(retry.exponential_base, 3)
self.assertEqual(retry.total, 10)

retry = retry.increment()
self.assertEqual(retry.jitter_interval, 123)
self.assertEqual(retry.max_retry_delay, 145)
self.assertEqual(retry.exponential_base, 3)
self.assertEqual(retry.total, 9)

retry = retry.increment()
self.assertEqual(retry.jitter_interval, 123)
self.assertEqual(retry.max_retry_delay, 145)
self.assertEqual(retry.exponential_base, 3)
self.assertEqual(retry.total, 8)

def test_backoff(self):
retry = WritesRetry(total=5, backoff_factor=1, max_retry_delay=550)
retry = NonRandomWritesRetry(total=5, backoff_factor=1, max_retry_delay=550)
self.assertEqual(retry.total, 5)
self.assertEqual(retry.is_exhausted(), False)
self.assertEqual(retry.get_backoff_time(), 0)
@@ -74,17 +75,16 @@ def test_backoff_max(self):
self.assertEqual(retry.get_backoff_time(), 15)

def test_backoff_jitter(self):
retry = WritesRetry(total=5, backoff_factor=4, jitter_interval=2).increment()
retry = WritesRetry(total=5, backoff_factor=4).increment()

self.assertEqual(retry.total, 4)
self.assertEqual(retry.is_exhausted(), False)

backoff_time = retry.get_backoff_time()
self.assertGreater(backoff_time, 4)
self.assertLessEqual(backoff_time, 6)
self.assertLessEqual(backoff_time, 4)

def test_backoff_exponential_base(self):
retry = WritesRetry(total=5, backoff_factor=2, exponential_base=2)
retry = NonRandomWritesRetry(total=5, backoff_factor=2, exponential_base=2)

retry = retry.increment()
self.assertEqual(retry.get_backoff_time(), 2)
@@ -105,15 +105,6 @@ def test_get_retry_after(self):
retry = WritesRetry()
self.assertEqual(retry.get_retry_after(response), 5)

def test_get_retry_after_jitter(self):
response = HTTPResponse()
response.headers.add('Retry-After', '5')

retry = WritesRetry(jitter_interval=2)
retry_after = retry.get_retry_after(response)
self.assertGreater(retry_after, 5)
self.assertLessEqual(retry_after, 7)

def test_is_retry(self):
retry = WritesRetry(method_whitelist=["POST"])