Skip to content

Commit

Permalink
Changed FailsafeExecutor.getAsyncExecution to accept an AsyncRunnable
Browse files Browse the repository at this point in the history
  • Loading branch information
jhalterman committed Aug 20, 2021
1 parent 9ae6438 commit c446fae
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 59 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

### API Changes

- Added a generic result type `R` to `ExecutionContext`, `Execution` and `AsyncExecution`. This ensures that result types are unified across the API. It does mean that there are a few minor breaking changes to the API:
- `ContextualSupplier` and `AsyncSupplier` now have an additional result type parameter `R`. Normally these types are used as lambda parameters where the type is inferred, so most users should not be impacted. But any explicit generic declaration of these types will not compile until the new parameter is added.
- Added a generic result type `R` to `ExecutionContext`, `Execution`, `AsyncExecution`, and `AsyncRunnable`. This ensures that result types are unified across the API. It does mean that there are a few minor breaking changes to the API:
- `ContextualSupplier` now has an additional result type parameter `R`. Normally this type is used as lambda parameters where the type is inferred, so most users should not be impacted. But any explicit generic declaration of this type will not compile until the new parameter is added.
- `PolicyExecutor`, which is part of the SPI, now accepts an additional result type parameter `R`. This is only relevant for SPI users who are implementing their own Policies.
- Changed `FailsafeExecutor.getAsyncExecution` to accept `AsyncRunnable` instead of `AsyncSupplier`. This is a breaking change for any `getAsyncExecution` calls, but the fix is to simply remove any `return` statement. The reason for this change is that the provided object does not need to return a result since the result will already be passed asynchronously to one of the `AsyncExecution` `complete` or `retry` methods.

# 2.4.3

Expand Down
8 changes: 4 additions & 4 deletions src/main/java/net/jodah/failsafe/FailsafeExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ public <T extends R> CompletableFuture<T> getAsync(ContextualSupplier<T, T> supp
* @throws NullPointerException if the {@code supplier} is null
* @throws RejectedExecutionException if the {@code supplier} cannot be scheduled for execution
*/
public <T extends R> CompletableFuture<T> getAsyncExecution(AsyncSupplier<T, T> supplier) {
return callAsync(execution -> getPromiseExecution(supplier, execution), true);
public <T extends R> CompletableFuture<T> getAsyncExecution(AsyncRunnable<T> runnable) {
return callAsync(execution -> getPromiseExecution(runnable, execution), true);
}

/**
Expand Down Expand Up @@ -277,8 +277,8 @@ public CompletableFuture<Void> runAsync(ContextualRunnable runnable) {
* @throws NullPointerException if the {@code runnable} is null
* @throws RejectedExecutionException if the {@code runnable} cannot be scheduled for execution
*/
public CompletableFuture<Void> runAsyncExecution(AsyncRunnable runnable) {
return callAsync(execution -> getPromiseExecution(toAsyncSupplier(runnable), execution), true);
public CompletableFuture<Void> runAsyncExecution(AsyncRunnable<Void> runnable) {
return callAsync(execution -> getPromiseExecution(runnable, execution), true);
}

/**
Expand Down
18 changes: 5 additions & 13 deletions src/main/java/net/jodah/failsafe/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,21 @@ static <R> Supplier<CompletableFuture<ExecutionResult>> getPromiseAsync(
}

/**
* Returns a Supplier that pre-executes the {@code execution}, applies the {@code supplier}, and attempts to complete
* the {@code execution} if a failure occurs. Locks to ensure the resulting supplier cannot be applied multiple times
* Returns a Supplier that pre-executes the {@code execution}, runs the {@code runnable}, and attempts to complete the
* {@code execution} if a failure occurs. Locks to ensure the resulting supplier cannot be applied multiple times
* concurrently.
*
* @param <R> result type
*/
static <R> Supplier<CompletableFuture<ExecutionResult>> getPromiseExecution(AsyncSupplier<R, R> supplier,
static <R> Supplier<CompletableFuture<ExecutionResult>> getPromiseExecution(AsyncRunnable<R> runnable,
AsyncExecution<R> execution) {
Assert.notNull(supplier, "supplier");
Assert.notNull(runnable, "runnable");
return new Supplier<CompletableFuture<ExecutionResult>>() {
@Override
public synchronized CompletableFuture<ExecutionResult> get() {
try {
execution.preExecute();
supplier.get(execution);
runnable.run(execution);
} catch (Throwable e) {
execution.completeOrHandle(null, e);
}
Expand Down Expand Up @@ -229,14 +229,6 @@ static <R> Supplier<CompletableFuture<ExecutionResult>> getPromiseOfStageExecuti
};
}

static AsyncSupplier<Void, Void> toAsyncSupplier(AsyncRunnable runnable) {
Assert.notNull(runnable, "runnable");
return execution -> {
runnable.run(execution);
return null;
};
}

/**
* Returns a SettableSupplier that supplies the set value once then uses the {@code supplier} for subsequent calls.
*
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/net/jodah/failsafe/function/AsyncRunnable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@

/**
* A Runnable that manually triggers asynchronous retries or completion via an asynchronous execution.
*
*
* @param <R> result type
* @author Jonathan Halterman
*/
@FunctionalInterface
public interface AsyncRunnable {
void run(AsyncExecution<Void> execution) throws Exception;
public interface AsyncRunnable<R> {
void run(AsyncExecution<R> execution) throws Exception;
}
14 changes: 5 additions & 9 deletions src/test/java/net/jodah/failsafe/AsyncFailsafeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,19 +178,17 @@ public void shouldGetAsyncContextual() throws Throwable {
}

public void shouldGetAsyncExecution() throws Throwable {
assertGetAsync((AsyncSupplier<?, ?>) exec -> {
assertGetAsync((AsyncRunnable<?>) exec -> {
try {
boolean result = service.connect();
if (!exec.complete(result))
exec.retry();
return result;
} catch (Exception failure) {
// Alternate between automatic and manual retries
if (exec.getAttemptCount() % 2 == 0)
throw failure;
if (!exec.retryOn(failure))
throw failure;
return null;
}
});
}
Expand Down Expand Up @@ -389,10 +387,9 @@ public void shouldCancelOnGetAsyncWithTimeout() throws Throwable {
}

public void shouldCancelOnGetAsyncExecution() throws Throwable {
assertCancel(executor -> getAsync(executor, (AsyncSupplier<?, ?>) (e) -> {
assertCancel(executor -> getAsync(executor, (AsyncRunnable<?>) (e) -> {
Thread.sleep(1000);
e.complete();
return null;
}), retryAlways);
}

Expand Down Expand Up @@ -435,7 +432,6 @@ public void shouldManuallyRetryAndComplete() throws Throwable {
exec.retryOn(new ConnectException());
else
exec.complete(true);
return true;
});
waiter.await(3000);
}
Expand Down Expand Up @@ -504,7 +500,6 @@ public void shouldTimeoutAndRetry() throws Throwable {
Thread.sleep(100);
if (!exec.complete(false))
exec.retry();
return null;
});

waiter.await(1000);
Expand Down Expand Up @@ -593,13 +588,14 @@ public void shouldSupportCovariance() {
.getAsync(() -> fastService);
}

@SuppressWarnings("unchecked")
private Future<?> runAsync(FailsafeExecutor<?> failsafe, Object runnable) {
if (runnable instanceof CheckedRunnable)
return failsafe.runAsync((CheckedRunnable) runnable);
else if (runnable instanceof ContextualRunnable)
return failsafe.runAsync((ContextualRunnable) runnable);
else
return failsafe.runAsyncExecution((AsyncRunnable) runnable);
return failsafe.runAsyncExecution((AsyncRunnable<Void>) runnable);
}

@SuppressWarnings("unchecked")
Expand All @@ -609,7 +605,7 @@ private <T> Future<T> getAsync(FailsafeExecutor<T> failsafe, Object supplier) {
else if (supplier instanceof ContextualSupplier)
return failsafe.getAsync((ContextualSupplier<T, T>) supplier);
else
return failsafe.getAsyncExecution((AsyncSupplier<T, T>) supplier);
return failsafe.getAsyncExecution((AsyncRunnable<T>) supplier);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down
37 changes: 9 additions & 28 deletions src/test/java/net/jodah/failsafe/Testing.java
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,10 @@ private static <T> void testSyncAndAsyncInternal(boolean testSync, FailsafeExecu
if (given != null)
given.run();
if (expectedExceptions.length == 0) {
T result = Testing.unwrapExceptions(() -> failsafe.onComplete(setCompletedEventFn::accept).get(when));
T result = Testing.unwrapExceptions(() -> failsafe.onComplete(setCompletedEventFn).get(when));
assertEquals(result, expectedResult);
} else
Asserts.assertThrows(() -> failsafe.onComplete(setCompletedEventFn::accept).get(when), expectedExceptions);
Asserts.assertThrows(() -> failsafe.onComplete(setCompletedEventFn).get(when), expectedExceptions);
postTestFn.run();
}

Expand All @@ -401,47 +401,28 @@ private static <T> void testSyncAndAsyncInternal(boolean testSync, FailsafeExecu
if (given != null)
given.run();
if (expectedExceptions.length == 0) {
T result = Testing.unwrapExceptions(() -> failsafe.onComplete(setCompletedEventFn::accept).getAsync(when).get());
T result = Testing.unwrapExceptions(() -> failsafe.onComplete(setCompletedEventFn).getAsync(when).get());
assertEquals(result, expectedResult);
} else {
expected.add(0, ExecutionException.class);
Asserts.assertThrows(() -> failsafe.onComplete(setCompletedEventFn::accept).getAsync(when).get(), expected);
Asserts.assertThrows(() -> failsafe.onComplete(setCompletedEventFn).getAsync(when).get(), expected);
}
postTestFn.run();
}

public static <T> void testAsyncExecutionSuccess(FailsafeExecutor<T> failsafe, AsyncRunnable when,
Consumer<ExecutionCompletedEvent<T>> then, T expectedResult) {
AsyncSupplier supplier = ex -> {
when.run(ex);
return null;
};
testAsyncExecutionInternal(failsafe, supplier, then, expectedResult);
}

public static <T> void testAsyncExecutionSuccess(FailsafeExecutor<T> failsafe, AsyncSupplier<T, T> when,
public static <T> void testAsyncExecutionSuccess(FailsafeExecutor<T> failsafe, AsyncRunnable<T> when,
Consumer<ExecutionCompletedEvent<T>> then, T expectedResult) {
testAsyncExecutionInternal(failsafe, when, then, expectedResult);
}

@SafeVarargs
public static <T> void testAsyncExecutionFailure(FailsafeExecutor<T> failsafe, AsyncRunnable when,
Consumer<ExecutionCompletedEvent<T>> then, Class<? extends Throwable>... expectedExceptions) {
AsyncSupplier supplier = ex -> {
when.run(ex);
return null;
};
testAsyncExecutionInternal(failsafe, supplier, then, null, expectedExceptions);
}

@SafeVarargs
public static <T> void testAsyncExecutionFailure(FailsafeExecutor<T> failsafe, AsyncSupplier<T, T> when,
public static <T> void testAsyncExecutionFailure(FailsafeExecutor<T> failsafe, AsyncRunnable<T> when,
Consumer<ExecutionCompletedEvent<T>> then, Class<? extends Throwable>... expectedExceptions) {
testAsyncExecutionInternal(failsafe, when, then, null, expectedExceptions);
}

@SafeVarargs
private static <T> void testAsyncExecutionInternal(FailsafeExecutor<T> failsafe, AsyncSupplier<T, T> when,
private static <T> void testAsyncExecutionInternal(FailsafeExecutor<T> failsafe, AsyncRunnable<T> when,
Consumer<ExecutionCompletedEvent<T>> then, T expectedResult, Class<? extends Throwable>... expectedExceptions) {

AtomicReference<ExecutionCompletedEvent<T>> completedEventRef = new AtomicReference<>();
Expand All @@ -459,13 +440,13 @@ private static <T> void testAsyncExecutionInternal(FailsafeExecutor<T> failsafe,
System.out.println("\nRunning async execution test");
if (expectedExceptions.length == 0) {
T result = Testing.unwrapExceptions(
() -> failsafe.onComplete(setCompletedEventFn::accept).getAsyncExecution(when).get());
() -> failsafe.onComplete(setCompletedEventFn).getAsyncExecution(when).get());
assertEquals(result, expectedResult);
} else {
List<Class<? extends Throwable>> expected = new LinkedList<>();
Collections.addAll(expected, expectedExceptions);
expected.add(0, ExecutionException.class);
Asserts.assertThrows(() -> failsafe.onComplete(setCompletedEventFn::accept).getAsyncExecution(when).get(),
Asserts.assertThrows(() -> failsafe.onComplete(setCompletedEventFn).getAsyncExecution(when).get(),
expected);
}
postTestFn.run();
Expand Down

0 comments on commit c446fae

Please sign in to comment.