Skip to content

Commit

Permalink
[RRIO] [Test] Create test Caller and SetupTeardown (#29262)
Browse files Browse the repository at this point in the history
* [RRIO] [Test] Create test Caller and SetupTeardown

* Fix argument checks; Implement HTTP

* Revert go changes

* Patch code comments

* Add missing flag to documentation

* Patch per PR comments
  • Loading branch information
damondouglas authored Nov 7, 2023
1 parent c68125d commit aa92afc
Show file tree
Hide file tree
Showing 8 changed files with 509 additions and 10 deletions.
4 changes: 0 additions & 4 deletions .test-infra/mock-apis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ flowchart LR
end
```

# Writing Integration Tests

TODO: See https://github.com/apache/beam/issues/28859

# Development Dependencies

| Dependency | Reason |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*it.*LT\.java" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*it.*ResourceManagerTest\.java" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*testinfra.*mockapis.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*requestresponse.*" />

<!-- Flink -->
<!-- Checkstyle does not correctly detect package files across multiple source directories. -->
Expand Down
19 changes: 13 additions & 6 deletions sdks/java/io/rrio/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.requestresponse'
)
provideIntegrationTestingDependencies()
enableJavaPerformanceTesting()

description = "Apache Beam :: SDKS :: Java :: IO :: RequestResponseIO (RRIO)"
ext.summary = "Support to read from and write to Web APIs"

var jedisVersion = "5.0.0"
var grpcVersion = "1.59.0"
var protobufVersion = "3.21.5"

dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
Expand All @@ -36,15 +40,18 @@ dependencies {

testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
testImplementation project(path: ":beam-test-infra-mock-apis")
// Vendored grpc library not fully compatible with proto autogenerated code
testImplementation "io.grpc:grpc-netty-shaded:${grpcVersion}"
testImplementation "io.grpc:grpc-protobuf:${grpcVersion}"
testImplementation "io.grpc:grpc-stub:${grpcVersion}"
testImplementation "com.google.protobuf:protobuf-java-util:${protobufVersion}"

testImplementation platform(library.java.google_cloud_platform_libraries_bom)
testImplementation library.java.google_http_client
testImplementation library.java.junit
testImplementation library.java.testcontainers_base

testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testRuntimeOnly library.java.slf4j_jdk14
}

task integrationTest(type: Test) {
group = "verification"

include '**/*IT.class'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.requestresponse;

import io.grpc.ChannelCredentials;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest;
import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoResponse;
import org.apache.beam.testinfra.mockapis.echo.v1.EchoServiceGrpc;
import org.apache.beam.testinfra.mockapis.echo.v1.EchoServiceGrpc.EchoServiceBlockingStub;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

/**
* Implements {@link Caller} and {@link SetupTeardown} to call the {@link EchoServiceGrpc}. The
* purpose of {@link EchoGRPCCallerWithSetupTeardown} is support integration tests.
*/
class EchoGRPCCallerWithSetupTeardown implements Caller<EchoRequest, EchoResponse>, SetupTeardown {

static EchoGRPCCallerWithSetupTeardown of(URI uri) {
return new EchoGRPCCallerWithSetupTeardown(uri);
}

private final URI uri;
private transient @MonotonicNonNull ManagedChannel cachedManagedChannel;
private transient @MonotonicNonNull EchoServiceBlockingStub cachedBlockingStub;
private static final ChannelCredentials DEFAULT_CREDENTIALS = InsecureChannelCredentials.create();

private EchoGRPCCallerWithSetupTeardown(URI uri) {
this.uri = uri;
}

/**
* Overrides {@link Caller#call} invoking the {@link EchoServiceGrpc} with a {@link EchoRequest},
* returning either a successful {@link EchoResponse} or throwing either a {@link
* UserCodeExecutionException}, a {@link UserCodeTimeoutException}, or a {@link
* UserCodeQuotaException}.
*/
@Override
public EchoResponse call(EchoRequest request) throws UserCodeExecutionException {
try {
return cachedBlockingStub.echo(request);
} catch (StatusRuntimeException e) {
switch (e.getStatus().getCode()) {
case RESOURCE_EXHAUSTED:
throw new UserCodeQuotaException(e);
case DEADLINE_EXCEEDED:
throw new UserCodeTimeoutException(e);
default:
throw new UserCodeExecutionException(e);
}
}
}

/**
* Overrides {@link SetupTeardown#setup} to initialize the {@link ManagedChannel} and {@link
* EchoServiceBlockingStub}.
*/
@Override
public void setup() throws UserCodeExecutionException {
cachedManagedChannel =
NettyChannelBuilder.forTarget(uri.toString(), DEFAULT_CREDENTIALS).build();
cachedBlockingStub = EchoServiceGrpc.newBlockingStub(cachedManagedChannel);
}

/** Overrides {@link SetupTeardown#teardown} to shut down the {@link ManagedChannel}. */
@Override
public void teardown() throws UserCodeExecutionException {
if (cachedManagedChannel != null && cachedManagedChannel.isShutdown()) {
cachedManagedChannel.shutdown();
try {
boolean ignored = cachedManagedChannel.awaitTermination(1L, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.requestresponse;

import static org.apache.beam.sdk.io.common.IOITHelper.readIOTestPipelineOptions;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import com.google.protobuf.ByteString;
import java.net.URI;
import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest;
import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoResponse;
import org.apache.beam.testinfra.mockapis.echo.v1.EchoServiceGrpc;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/**
* Tests for {@link EchoGRPCCallerWithSetupTeardown} on a deployed {@link EchoServiceGrpc} instance.
* See {@link EchoITOptions} for details on the required parameters and how to provide these for
* running integration tests.
*/
@RunWith(JUnit4.class)
public class EchoGRPCCallerWithSetupTeardownTestIT {

private static @MonotonicNonNull EchoITOptions options;
private static @MonotonicNonNull EchoGRPCCallerWithSetupTeardown client;
private static final ByteString PAYLOAD = ByteString.copyFromUtf8("payload");

@BeforeClass
public static void setUp() throws UserCodeExecutionException {
options = readIOTestPipelineOptions(EchoITOptions.class);
if (options.getgRPCEndpointAddress().isEmpty()) {
throw new RuntimeException(
"--gRPCEndpointAddress is missing. See " + EchoITOptions.class + "for details.");
}
client = EchoGRPCCallerWithSetupTeardown.of(URI.create(options.getgRPCEndpointAddress()));
checkStateNotNull(client).setup();

EchoRequest request = createShouldExceedQuotaRequest();

// The challenge with building and deploying a real quota aware endpoint, the integration with
// which these tests validate, is that we need a value of at least 1. The allocated quota where
// we expect to exceed will be shared among many tests and across languages. Code below in this
// setup ensures that the API is in the state where we can expect a quota exceeded error. There
// are tests in this file that detect errors in expected responses. We only throw exceptions
// that are not UserCodeQuotaException.
try {
EchoResponse ignored = client.call(request);
client.call(request);
client.call(request);
} catch (UserCodeExecutionException e) {
if (!(e instanceof UserCodeQuotaException)) {
throw e;
}
}
}

@AfterClass
public static void tearDown() throws UserCodeExecutionException {
checkStateNotNull(client).teardown();
}

@Test
public void givenValidRequest_receivesResponse() throws UserCodeExecutionException {
EchoRequest request = createShouldNeverExceedQuotaRequest();
EchoResponse response = client.call(request);
assertEquals(response.getId(), request.getId());
assertEquals(response.getPayload(), request.getPayload());
}

@Test
public void givenExceededQuota_shouldThrow() {
assertThrows(UserCodeQuotaException.class, () -> client.call(createShouldExceedQuotaRequest()));
}

@Test
public void givenNotFound_shouldThrow() {
UserCodeExecutionException error =
assertThrows(
UserCodeExecutionException.class,
() ->
client.call(
EchoRequest.newBuilder()
.setId("i-dont-exist-quota-id")
.setPayload(PAYLOAD)
.build()));
assertEquals(
"io.grpc.StatusRuntimeException: NOT_FOUND: error: source not found: i-dont-exist-quota-id, err resource does not exist",
error.getMessage());
}

private static @NonNull EchoRequest createShouldNeverExceedQuotaRequest() {
return EchoRequest.newBuilder()
.setPayload(PAYLOAD)
.setId(checkStateNotNull(options).getNeverExceedQuotaId())
.build();
}

private static @NonNull EchoRequest createShouldExceedQuotaRequest() {
return EchoRequest.newBuilder()
.setPayload(PAYLOAD)
.setId(checkStateNotNull(options).getShouldExceedQuotaId())
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.requestresponse;

import com.google.api.client.http.ByteArrayContent;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpMediaType;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.protobuf.util.JsonFormat;
import java.io.IOException;
import java.net.URI;
import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoRequest;
import org.apache.beam.testinfra.mockapis.echo.v1.Echo.EchoResponse;
import org.apache.beam.testinfra.mockapis.echo.v1.EchoServiceGrpc;

/**
* Implements {@link Caller} to call the {@link EchoServiceGrpc}'s HTTP handler. The purpose of
* {@link EchoHTTPCaller} is to suppport integration tests.
*/
class EchoHTTPCaller implements Caller<EchoRequest, EchoResponse> {

static EchoHTTPCaller of(URI uri) {
return new EchoHTTPCaller(uri);
}

private static final String PATH = "/v1/echo";
private static final HttpRequestFactory REQUEST_FACTORY =
new NetHttpTransport().createRequestFactory();
private static final HttpMediaType CONTENT_TYPE = new HttpMediaType("application/json");
private static final int STATUS_CODE_TOO_MANY_REQUESTS = 429;

private final URI uri;

private EchoHTTPCaller(URI uri) {
this.uri = uri;
}

/**
* Overrides {@link Caller#call} invoking the {@link EchoServiceGrpc}'s HTTP handler with a {@link
* EchoRequest}, returning either a successful {@link EchoResponse} or throwing either a {@link
* UserCodeExecutionException}, a {@link UserCodeTimeoutException}, or a {@link
* UserCodeQuotaException}.
*/
@Override
public EchoResponse call(EchoRequest request) throws UserCodeExecutionException {
try {
String json = JsonFormat.printer().omittingInsignificantWhitespace().print(request);
ByteArrayContent body = ByteArrayContent.fromString(CONTENT_TYPE.getType(), json);
HttpRequest httpRequest = REQUEST_FACTORY.buildPostRequest(getUrl(), body);
HttpResponse httpResponse = httpRequest.execute();
String responseJson = httpResponse.parseAsString();
EchoResponse.Builder builder = EchoResponse.newBuilder();
JsonFormat.parser().merge(responseJson, builder);
return builder.build();
} catch (IOException e) {
if (e instanceof HttpResponseException) {
HttpResponseException ex = (HttpResponseException) e;
if (ex.getStatusCode() == STATUS_CODE_TOO_MANY_REQUESTS) {
throw new UserCodeQuotaException(e);
}
}
throw new UserCodeExecutionException(e);
}
}

private GenericUrl getUrl() {
String rawUrl = uri.toString();
if (uri.getPath().isEmpty()) {
rawUrl += PATH;
}
return new GenericUrl(rawUrl);
}
}
Loading

0 comments on commit aa92afc

Please sign in to comment.