diff --git a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCallImpl.java b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCallImpl.java index b98a5319fc..4ec7572216 100644 --- a/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCallImpl.java +++ b/gax-java/gax-httpjson/src/main/java/com/google/api/gax/httpjson/HttpJsonClientCallImpl.java @@ -46,6 +46,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -121,6 +122,13 @@ final class HttpJsonClientCallImpl @GuardedBy("lock") private volatile boolean closed; + // Store the timeout future created by the deadline schedule executor. The future + // can be cancelled if a response (either an error or valid payload) has been + // received before the timeout. This value may be null if the RPC does not have a + // timeout. + @GuardedBy("lock") + private volatile ScheduledFuture timeoutFuture; + HttpJsonClientCallImpl( ApiMethodDescriptor methodDescriptor, String endpoint, @@ -167,16 +175,20 @@ public void start(Listener responseListener, HttpJsonMetadata request Preconditions.checkState(this.listener == null, "The call is already started"); this.listener = responseListener; this.requestHeaders = requestHeaders; - } - // Use the timeout duration value instead of calculating the future Instant - // Only schedule the deadline if the RPC timeout has been set in the RetrySettings - Duration timeout = callOptions.getTimeout(); - if (timeout != null) { - // The future timeout value is guaranteed to not be a negative value as the - // RetryAlgorithm will not retry - long timeoutMs = timeout.toMillis(); - this.deadlineCancellationExecutor.schedule(this::timeout, timeoutMs, TimeUnit.MILLISECONDS); + // Use the timeout duration value instead of calculating the future Instant + // Only schedule the deadline if the RPC timeout has been set in the RetrySettings + Duration timeout = callOptions.getTimeout(); + if (timeout != null) { + // The future timeout value is guaranteed to not be a negative value as the + // RetryAlgorithm will not retry + long timeoutMs = timeout.toMillis(); + // Assign the scheduled future so that it can be cancelled if the timeout task + // is not needed (response received prior to timeout) + timeoutFuture = + this.deadlineCancellationExecutor.schedule( + this::timeout, timeoutMs, TimeUnit.MILLISECONDS); + } } } @@ -430,6 +442,16 @@ private void close( return; } closed = true; + + // Cancel the timeout future if there is a timeout associated with the RPC + if (timeoutFuture != null) { + // The timeout method also invokes close() and the second invocation of close() + // will be guarded by the closed check above. No need to interrupt the timeout + // task as running the timeout task is quick. + timeoutFuture.cancel(false); + timeoutFuture = null; + } + // Best effort task cancellation (to not be confused with task's thread interruption). // If the task is in blocking I/O waiting for the server response, it will keep waiting for // the response from the server, but once response is received the task will exit silently. diff --git a/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonClientCallImplTest.java b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonClientCallImplTest.java new file mode 100644 index 0000000000..0355dd0d4b --- /dev/null +++ b/gax-java/gax-httpjson/src/test/java/com/google/api/gax/httpjson/HttpJsonClientCallImplTest.java @@ -0,0 +1,139 @@ +/* + * Copyright 2024 Google LLC + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google LLC nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package com.google.api.gax.httpjson; + +import com.google.api.client.http.HttpTransport; +import com.google.common.truth.Truth; +import com.google.protobuf.TypeRegistry; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.Reader; +import java.time.Duration; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class HttpJsonClientCallImplTest { + @Mock private ApiMethodDescriptor apiMethodDescriptor; + @Mock private HttpResponseParser httpResponseParser; + @Mock private HttpJsonCallOptions httpJsonCallOptions; + @Mock private TypeRegistry typeRegistry; + @Mock private HttpTransport httpTransport; + @Mock private Executor executor; + @Mock private HttpJsonClientCall.Listener listener; + + @Test + public void responseReceived_noCancellationTask() { + ScheduledThreadPoolExecutor deadlineSchedulerExecutor = new ScheduledThreadPoolExecutor(1); + // Null timeout means no timeout task created + Mockito.when(httpJsonCallOptions.getTimeout()).thenReturn(null); + + HttpJsonClientCallImpl httpJsonClientCall = + new HttpJsonClientCallImpl<>( + apiMethodDescriptor, + "", + httpJsonCallOptions, + httpTransport, + executor, + deadlineSchedulerExecutor); + httpJsonClientCall.start(listener, HttpJsonMetadata.newBuilder().build()); + // No timeout task in the work queue + Truth.assertThat(deadlineSchedulerExecutor.getQueue().size()).isEqualTo(0); + // Follows the numMessages requested from HttpJsonClientCalls.futureUnaryCall() + httpJsonClientCall.request(2); + httpJsonClientCall.setResult( + HttpRequestRunnable.RunnableResult.builder() + .setStatusCode(200) + .setTrailers(HttpJsonMetadata.newBuilder().build()) + .build()); + Truth.assertThat(deadlineSchedulerExecutor.getQueue().size()).isEqualTo(0); + deadlineSchedulerExecutor.shutdown(); + // Scheduler is not waiting for any task and should terminate immediately + Truth.assertThat(deadlineSchedulerExecutor.isTerminated()).isTrue(); + } + + @Test + public void responseReceived_cancellationTaskExists_isCancelledProperly() + throws InterruptedException { + ScheduledThreadPoolExecutor deadlineSchedulerExecutor = new ScheduledThreadPoolExecutor(1); + // SetRemoveOnCancelPolicy will immediately remove the task from the work queue + // when the task is cancelled + deadlineSchedulerExecutor.setRemoveOnCancelPolicy(true); + + // Setting a timeout for this call will enqueue a timeout task + Mockito.when(httpJsonCallOptions.getTimeout()).thenReturn(Duration.ofMinutes(10)); + + String response = "Content"; + InputStream inputStream = new ByteArrayInputStream(response.getBytes()); + Mockito.when(httpJsonCallOptions.getTypeRegistry()).thenReturn(typeRegistry); + Mockito.when(apiMethodDescriptor.getResponseParser()).thenReturn(httpResponseParser); + Mockito.when( + httpResponseParser.parse(Mockito.any(Reader.class), Mockito.any(TypeRegistry.class))) + .thenReturn(response); + HttpJsonClientCallImpl httpJsonClientCall = + new HttpJsonClientCallImpl<>( + apiMethodDescriptor, + "", + httpJsonCallOptions, + httpTransport, + executor, + deadlineSchedulerExecutor); + httpJsonClientCall.start(listener, HttpJsonMetadata.newBuilder().build()); + // The timeout task is scheduled for 10 minutes from invocation. The task should be + // populated in the work queue, scheduled to run, but not active yet. + Truth.assertThat(deadlineSchedulerExecutor.getQueue().size()).isEqualTo(1); + // Follows the numMessages requested from HttpJsonClientCalls.futureUnaryCall() + httpJsonClientCall.request(2); + httpJsonClientCall.setResult( + HttpRequestRunnable.RunnableResult.builder() + .setStatusCode(200) + .setTrailers(HttpJsonMetadata.newBuilder().build()) + .setResponseContent(inputStream) + .build()); + // After the result is received, `close()` should have run and removed the timeout task + // Expect that there are no tasks in the queue and no active tasks + Truth.assertThat(deadlineSchedulerExecutor.getQueue().size()).isEqualTo(0); + deadlineSchedulerExecutor.shutdown(); + + // Ideally, this test wouldn't need to awaitTermination. Given the machine this test + // is running on, we can't guarantee that isTerminated is true immediately. The point + // of this test is that it doesn't wait the full timeout duration (10 min) to terminate + // and rather is able to terminate after we invoke shutdown on the deadline scheduler. + deadlineSchedulerExecutor.awaitTermination(5, TimeUnit.SECONDS); + // Scheduler is not waiting for any task and should terminate quickly + Truth.assertThat(deadlineSchedulerExecutor.isTerminated()).isTrue(); + } +} diff --git a/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITClientShutdown.java b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITClientShutdown.java new file mode 100644 index 0000000000..5915c8e065 --- /dev/null +++ b/showcase/gapic-showcase/src/test/java/com/google/showcase/v1beta1/it/ITClientShutdown.java @@ -0,0 +1,158 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.showcase.v1beta1.it; + +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.StatusCode; +import com.google.common.collect.ImmutableSet; +import com.google.common.truth.Truth; +import com.google.showcase.v1beta1.BlockRequest; +import com.google.showcase.v1beta1.BlockResponse; +import com.google.showcase.v1beta1.EchoClient; +import com.google.showcase.v1beta1.EchoRequest; +import com.google.showcase.v1beta1.it.util.TestClientInitializer; +import org.junit.Test; +import org.threeten.bp.Duration; + +public class ITClientShutdown { + + private static final long DEFAULT_RPC_TIMEOUT_MS = 15000L; + private static final long DEFAULT_CLIENT_TERMINATION_MS = 5000L; + + // Test to ensure the client can close + terminate properly + @Test(timeout = 15000L) + public void testGrpc_closeClient() throws Exception { + EchoClient grpcClient = TestClientInitializer.createGrpcEchoClient(); + assertClientTerminated(grpcClient); + } + + // Test to ensure the client can close + terminate properly + @Test(timeout = 15000L) + public void testHttpJson_closeClient() throws Exception { + EchoClient httpjsonClient = TestClientInitializer.createHttpJsonEchoClient(); + assertClientTerminated(httpjsonClient); + } + + // Test to ensure the client can close + terminate after a quick RPC invocation + @Test(timeout = 15000L) + public void testGrpc_rpcInvoked_closeClient() throws Exception { + EchoClient grpcClient = TestClientInitializer.createGrpcEchoClient(); + // Response is ignored for this test + grpcClient.echo(EchoRequest.newBuilder().setContent("Test").build()); + assertClientTerminated(grpcClient); + } + + // Test to ensure the client can close + terminate after a quick RPC invocation + @Test(timeout = 15000L) + public void testHttpJson_rpcInvoked_closeClient() throws Exception { + EchoClient httpjsonClient = TestClientInitializer.createHttpJsonEchoClient(); + // Response is ignored for this test + httpjsonClient.echo(EchoRequest.newBuilder().setContent("Test").build()); + assertClientTerminated(httpjsonClient); + } + + // This test is to ensure that the client is able to close + terminate any resources + // once a response has been received. Set a max test duration of 15s to ensure that + // the test does not continue on forever. + @Test(timeout = 15000L) + public void testGrpc_rpcInvokedWithLargeTimeout_closeClientOnceResponseReceived() + throws Exception { + // Set the maxAttempts to 1 to ensure there are no retries scheduled. The single RPC + // invocation should time out in 15s, but the client will receive a response in 2s. + // Any outstanding tasks (timeout tasks) should be cancelled once a response has been + // received so the client can properly terminate. + RetrySettings defaultRetrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMillis(DEFAULT_RPC_TIMEOUT_MS)) + .setMaxRpcTimeout(Duration.ofMillis(DEFAULT_RPC_TIMEOUT_MS)) + .setTotalTimeout(Duration.ofMillis(DEFAULT_RPC_TIMEOUT_MS)) + .setMaxAttempts(1) + .build(); + EchoClient grpcClient = + TestClientInitializer.createGrpcEchoClientCustomBlockSettings( + defaultRetrySettings, ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED)); + + BlockRequest blockRequest = + BlockRequest.newBuilder() + .setSuccess(BlockResponse.newBuilder().setContent("gRPCBlockContent_2sDelay")) + .setResponseDelay(com.google.protobuf.Duration.newBuilder().setSeconds(2).build()) + .build(); + + // Response is ignored for this test + grpcClient.block(blockRequest); + + assertClientTerminated(grpcClient); + } + + // This test is to ensure that the client is able to close + terminate any resources + // once a response has been received. Set a max test duration of 15s to ensure that + // the test does not continue on forever. + @Test(timeout = 15000L) + public void testHttpJson_rpcInvokedWithLargeTimeout_closeClientOnceResponseReceived() + throws Exception { + // Set the maxAttempts to 1 to ensure there are no retries scheduled. The single RPC + // invocation should time out in 15s, but the client will receive a response in 2s. + // Any outstanding tasks (timeout tasks) should be cancelled once a response has been + // received so the client can properly terminate. + RetrySettings defaultRetrySettings = + RetrySettings.newBuilder() + .setInitialRpcTimeout(Duration.ofMillis(DEFAULT_RPC_TIMEOUT_MS)) + .setMaxRpcTimeout(Duration.ofMillis(DEFAULT_RPC_TIMEOUT_MS)) + .setTotalTimeout(Duration.ofMillis(DEFAULT_RPC_TIMEOUT_MS)) + .setMaxAttempts(1) + .build(); + EchoClient httpjsonClient = + TestClientInitializer.createHttpJsonEchoClientCustomBlockSettings( + defaultRetrySettings, ImmutableSet.of(StatusCode.Code.DEADLINE_EXCEEDED)); + + BlockRequest blockRequest = + BlockRequest.newBuilder() + .setSuccess(BlockResponse.newBuilder().setContent("httpjsonBlockContent_2sDelay")) + .setResponseDelay(com.google.protobuf.Duration.newBuilder().setSeconds(2).build()) + .build(); + + // Response is ignored for this test + httpjsonClient.block(blockRequest); + + assertClientTerminated(httpjsonClient); + } + + // This helper method asserts that the client is able to terminate within + // `AWAIT_TERMINATION_SECONDS` + private void assertClientTerminated(EchoClient echoClient) throws InterruptedException { + long start = System.currentTimeMillis(); + // Intentionally do not run echoClient.awaitTermination(...) as this test will + // check that everything is properly terminated after close() is called. + echoClient.close(); + + // Loop until the client has terminated successfully. For tests that use this, + // try to ensure there is a timeout associated, otherwise this may run forever. + // Future enhancement: Use awaitility instead of busy waiting + while (!echoClient.isTerminated()) { + Thread.sleep(500L); + } + // The busy-wait time won't be accurate, so account for a bit of buffer + long end = System.currentTimeMillis(); + + Truth.assertThat(echoClient.isShutdown()).isTrue(); + + // Check the termination time. If all the tasks/ resources are closed successfully, + // the termination time should only occur shortly after `close()` was invoked. The + // `DEFAULT_TERMINATION_MS` value should include a bit of buffer. + long terminationTime = end - start; + Truth.assertThat(terminationTime).isLessThan(DEFAULT_CLIENT_TERMINATION_MS); + } +}