Skip to content

Commit

Permalink
[java] Ensure retry mechanism does not swallow an exception (#12838)
Browse files Browse the repository at this point in the history
  • Loading branch information
pujagani authored Oct 6, 2023
1 parent 915b5b9 commit a67b81d
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 40 deletions.
85 changes: 45 additions & 40 deletions java/src/org/openqa/selenium/remote/http/RetryRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,24 @@
import dev.failsafe.Failsafe;
import dev.failsafe.Fallback;
import dev.failsafe.RetryPolicy;
import dev.failsafe.event.ExecutionAttemptedEvent;
import dev.failsafe.function.CheckedFunction;
import java.net.ConnectException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import org.openqa.selenium.TimeoutException;

public class RetryRequest implements Filter {

private static final Logger LOG = Logger.getLogger(RetryRequest.class.getName());
private static final AtomicReference<HttpResponse> fallBackResponse = new AtomicReference<>();

private static final Fallback<Object> fallback = Fallback.of(fallBackResponse::get);
private static final Fallback<HttpResponse> fallback =
Fallback.of(
(CheckedFunction<ExecutionAttemptedEvent<? extends HttpResponse>, ? extends HttpResponse>)
RetryRequest::getFallback);

// Retry on connection error.
private static final RetryPolicy<Object> connectionFailurePolicy =
RetryPolicy.builder()
private static final RetryPolicy<HttpResponse> connectionFailurePolicy =
RetryPolicy.<HttpResponse>builder()
.handleIf(failure -> failure.getCause() instanceof ConnectException)
.withMaxRetries(3)
.onRetry(
Expand All @@ -52,60 +55,32 @@ public class RetryRequest implements Filter {
getDebugLogLevel(),
"Connection failure #{0}. Retrying.",
e.getAttemptCount()))
.onRetriesExceeded(
e ->
fallBackResponse.set(
new HttpResponse()
.setStatus(HTTP_CLIENT_TIMEOUT)
.setContent(
asJson(
ImmutableMap.of(
"value", ImmutableMap.of("message", "Connection failure"))))))
.build();

// Retry on read timeout.
private static final RetryPolicy<Object> readTimeoutPolicy =
RetryPolicy.builder()
private static final RetryPolicy<HttpResponse> readTimeoutPolicy =
RetryPolicy.<HttpResponse>builder()
.handle(TimeoutException.class)
.withMaxRetries(3)
.onRetry(
e -> LOG.log(getDebugLogLevel(), "Read timeout #{0}. Retrying.", e.getAttemptCount()))
.onRetriesExceeded(
e ->
fallBackResponse.set(
new HttpResponse()
.setStatus(HTTP_GATEWAY_TIMEOUT)
.setContent(
asJson(
ImmutableMap.of(
"value", ImmutableMap.of("message", "Read timeout"))))))
.build();

// Retry if server is unavailable or an internal server error occurs without response body.
private static final RetryPolicy<Object> serverErrorPolicy =
RetryPolicy.builder()
private static final RetryPolicy<HttpResponse> serverErrorPolicy =
RetryPolicy.<HttpResponse>builder()
.handleResultIf(
response ->
((HttpResponse) response).getStatus() == HTTP_INTERNAL_ERROR
&& Integer.parseInt(((HttpResponse) response).getHeader(CONTENT_LENGTH)) == 0)
.handleResultIf(response -> ((HttpResponse) response).getStatus() == HTTP_UNAVAILABLE)
response.getStatus() == HTTP_INTERNAL_ERROR
&& Integer.parseInt((response).getHeader(CONTENT_LENGTH)) == 0)
.handleResultIf(response -> (response).getStatus() == HTTP_UNAVAILABLE)
.withMaxRetries(2)
.onRetry(
e ->
LOG.log(
getDebugLogLevel(),
"Failure due to server error #{0}. Retrying.",
e.getAttemptCount()))
.onRetriesExceeded(
e ->
fallBackResponse.set(
new HttpResponse()
.setStatus(((HttpResponse) e.getResult()).getStatus())
.setContent(
asJson(
ImmutableMap.of(
"value",
ImmutableMap.of("message", "Internal server error"))))))
.build();

@Override
Expand All @@ -117,4 +92,34 @@ public HttpHandler apply(HttpHandler next) {
.compose(connectionFailurePolicy)
.get(() -> next.execute(req));
}

private static HttpResponse getFallback(
ExecutionAttemptedEvent<? extends HttpResponse> executionAttemptedEvent) throws Exception {
if (executionAttemptedEvent.getLastException() != null) {
Exception exception = (Exception) executionAttemptedEvent.getLastException();
if (exception.getCause() instanceof ConnectException) {
return new HttpResponse()
.setStatus(HTTP_CLIENT_TIMEOUT)
.setContent(
asJson(ImmutableMap.of("value", ImmutableMap.of("message", "Connection failure"))));
} else if (exception instanceof TimeoutException) {
return new HttpResponse()
.setStatus(HTTP_GATEWAY_TIMEOUT)
.setContent(
asJson(ImmutableMap.of("value", ImmutableMap.of("message", "Read timeout"))));
} else throw exception;
} else if (executionAttemptedEvent.getLastResult() != null) {
HttpResponse response = executionAttemptedEvent.getLastResult();
if ((response.getStatus() == HTTP_INTERNAL_ERROR
&& Integer.parseInt(response.getHeader(CONTENT_LENGTH)) == 0)
|| response.getStatus() == HTTP_UNAVAILABLE) {
return new HttpResponse()
.setStatus(response.getStatus())
.setContent(
asJson(
ImmutableMap.of("value", ImmutableMap.of("message", "Internal server error"))));
}
}
return executionAttemptedEvent.getLastResult();
}
}
152 changes: 152 additions & 0 deletions java/test/org/openqa/selenium/remote/http/RetryRequestTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.openqa.selenium.remote.http;

import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT;
import static java.net.HttpURLConnection.HTTP_GATEWAY_TIMEOUT;
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.openqa.selenium.remote.http.Contents.asJson;
import static org.openqa.selenium.remote.http.HttpMethod.GET;
Expand All @@ -28,9 +30,18 @@
import java.net.MalformedURLException;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.openqa.selenium.TimeoutException;
import org.openqa.selenium.environment.webserver.AppServer;
import org.openqa.selenium.environment.webserver.NettyAppServer;
import org.openqa.selenium.remote.http.netty.NettyClient;
Expand All @@ -50,6 +61,66 @@ public void setUp() throws MalformedURLException {
client = new NettyClient.Factory().createClient(config);
}

@Test
void canThrowUnexpectedException() {
HttpHandler handler =
new RetryRequest()
.andFinally(
(HttpRequest request) -> {
throw new UnsupportedOperationException("Testing");
});

Assertions.assertThrows(
UnsupportedOperationException.class, () -> handler.execute(new HttpRequest(GET, "/")));
}

@Test
void canReturnAppropriateFallbackResponse() {
HttpHandler handler1 =
new RetryRequest()
.andFinally(
(HttpRequest request) -> {
throw new TimeoutException();
});

Assertions.assertEquals(
HTTP_GATEWAY_TIMEOUT, handler1.execute(new HttpRequest(GET, "/")).getStatus());

HttpHandler handler2 =
new RetryRequest()
.andFinally((HttpRequest request) -> new HttpResponse().setStatus(HTTP_UNAVAILABLE));

Assertions.assertEquals(
HTTP_UNAVAILABLE, handler2.execute(new HttpRequest(GET, "/")).getStatus());
}

@Test
void canReturnAppropriateFallbackResponseWithMultipleThreads()
throws InterruptedException, ExecutionException {
HttpHandler handler1 =
new RetryRequest()
.andFinally(
(HttpRequest request) -> {
throw new TimeoutException();
});

HttpHandler handler2 =
new RetryRequest()
.andFinally((HttpRequest request) -> new HttpResponse().setStatus(HTTP_UNAVAILABLE));

ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Callable<HttpResponse>> tasks = new ArrayList<>();

tasks.add(() -> handler1.execute(new HttpRequest(GET, "/")));
tasks.add(() -> handler2.execute(new HttpRequest(GET, "/")));

List<Future<HttpResponse>> results = executorService.invokeAll(tasks);

Assertions.assertEquals(HTTP_GATEWAY_TIMEOUT, results.get(0).get().getStatus());

Assertions.assertEquals(HTTP_UNAVAILABLE, results.get(1).get().getStatus());
}

@Test
void shouldBeAbleToHandleARequest() {
AtomicInteger count = new AtomicInteger(0);
Expand Down Expand Up @@ -98,6 +169,28 @@ void shouldBeAbleToRetryARequestOnInternalServerError() {
server.stop();
}

@Test
void shouldBeAbleToGetTheErrorResponseOnInternalServerError() {
AtomicInteger count = new AtomicInteger(0);
AppServer server =
new NettyAppServer(
req -> {
count.incrementAndGet();
return new HttpResponse().setStatus(500);
});
server.start();

URI uri = URI.create(server.whereIs("/"));
HttpRequest request =
new HttpRequest(GET, String.format(REQUEST_PATH, uri.getHost(), uri.getPort()));
HttpResponse response = client.execute(request);

assertThat(response).extracting(HttpResponse::getStatus).isEqualTo(HTTP_INTERNAL_ERROR);
assertThat(count.get()).isGreaterThanOrEqualTo(3);

server.stop();
}

@Test
void shouldNotRetryRequestOnInternalServerErrorWithContent() {
AtomicInteger count = new AtomicInteger(0);
Expand Down Expand Up @@ -149,6 +242,30 @@ void shouldRetryRequestOnServerUnavailableError() {
server.stop();
}

@Test
void shouldGetTheErrorResponseOnServerUnavailableError() {
AtomicInteger count = new AtomicInteger(0);
AppServer server =
new NettyAppServer(
req -> {
count.incrementAndGet();
return new HttpResponse()
.setStatus(503)
.setContent(asJson(ImmutableMap.of("error", "server down")));
});
server.start();

URI uri = URI.create(server.whereIs("/"));
HttpRequest request =
new HttpRequest(GET, String.format(REQUEST_PATH, uri.getHost(), uri.getPort()));
HttpResponse response = client.execute(request);

assertThat(response).extracting(HttpResponse::getStatus).isEqualTo(HTTP_UNAVAILABLE);
assertThat(count.get()).isEqualTo(3);

server.stop();
}

@Test
void shouldBeAbleToRetryARequestOnTimeout() {
AtomicInteger count = new AtomicInteger(0);
Expand Down Expand Up @@ -178,6 +295,41 @@ void shouldBeAbleToRetryARequestOnTimeout() {
server.stop();
}

@Test
void shouldBeAbleToGetErrorResponseOnRequestTimeout() {
AtomicInteger count = new AtomicInteger(0);
AppServer server =
new NettyAppServer(
req -> {
count.incrementAndGet();
throw new TimeoutException();
});
server.start();

URI uri = URI.create(server.whereIs("/"));
HttpRequest request =
new HttpRequest(GET, String.format(REQUEST_PATH, uri.getHost(), uri.getPort()));

HttpResponse response = client.execute(request);

// The NettyAppServer passes the request through ErrorFilter.
// This maps the timeout exception to HTTP response code 500 and HTTP response body containing
// "timeout".
// RetryRequest retries if it gets a TimeoutException only.
// Parsing and inspecting the response body each time if HTTP response code 500 is not
// efficient.
// A potential solution can be updating the ErrorCodec to reflect the appropriate HTTP code
// (this is a breaking change).
// RetryRequest can then inspect just the HTTP response status code and retry.

assertThat(response).extracting(HttpResponse::getStatus).isEqualTo(HTTP_INTERNAL_ERROR);

// This should ideally be more than the number of retries configured i.e. greater than 3
assertThat(count.get()).isEqualTo(1);

server.stop();
}

@Test
void shouldBeAbleToRetryARequestOnConnectionFailure() {
AppServer server = new NettyAppServer(req -> new HttpResponse());
Expand Down

0 comments on commit a67b81d

Please sign in to comment.