diff --git a/CHANGES.md b/CHANGES.md index 4c0cb3ce3c24..5774be7d6464 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -79,6 +79,7 @@ * CloudDebuggerOptions is removed (deprecated in Beam v2.47.0) for Dataflow runner as the Google Cloud Debugger service is [shutting down](https://cloud.google.com/debugger/docs/deprecations). (Java) ([#25959](https://github.com/apache/beam/issues/25959)). * AWS 2 client providers (deprecated in Beam [v2.38.0](#2380---2022-04-20)) are finally removed ([#26681](https://github.com/apache/beam/issues/26681)). * AWS 2 SnsIO.writeAsync (deprecated in Beam v2.37.0 due to risk of data loss) was finally removed ([#26710](https://github.com/apache/beam/issues/26710)). +* AWS 2 coders (deprecated in Beam v2.43.0 when adding Schema support for AWS Sdk Pojos) are finally removed ([#23315](https://github.com/apache/beam/issues/23315)). ## Deprecations diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/AwsCoders.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/AwsCoders.java deleted file mode 100644 index a15818875c2f..000000000000 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/AwsCoders.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.coders; - -import static software.amazon.awssdk.awscore.util.AwsHeader.AWS_REQUEST_ID; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.ListCoder; -import org.apache.beam.sdk.coders.MapCoder; -import org.apache.beam.sdk.coders.NullableCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import software.amazon.awssdk.awscore.AwsResponseMetadata; -import software.amazon.awssdk.http.SdkHttpResponse; -import software.amazon.awssdk.utils.ImmutableMap; - -/** - * {@link Coder}s for common AWS SDK objects. - * - * @deprecated {@link org.apache.beam.sdk.schemas.SchemaCoder SchemaCoders} for {@link - * software.amazon.awssdk.core.SdkPojo AWS model classes} will be automatically inferred by - * means of {@link org.apache.beam.sdk.io.aws2.schemas.AwsSchemaProvider AwsSchemaProvider}. - */ -@Deprecated -public final class AwsCoders { - - private AwsCoders() {} - - /** - * Returns a new coder for {@link AwsResponseMetadata} (AWS request ID only). - * - * @return the {@link AwsResponseMetadata} coder - */ - public static Coder awsResponseMetadata() { - return new AwsResponseMetadataCoder(); - } - - /** - * Returns a new coder for {@link SdkHttpResponse} (HTTP status code and headers). - * - * @return the SdkHttpResponse coder - */ - public static Coder sdkHttpResponse() { - return new SdkHttpResponseCoder(true); - } - - /** - * Returns a new coder for {@link SdkHttpResponse} (HTTP status code only). - * - * @return the SdkHttpResponse coder - */ - public static Coder sdkHttpResponseWithoutHeaders() { - return new SdkHttpResponseCoder(false); - } - - private static class AwsResponseMetadataCoder extends AtomicCoder { - private static final Coder REQUEST_ID_CODER = StringUtf8Coder.of(); - - private static class DecodedAwsResponseMetadata extends AwsResponseMetadata { - protected DecodedAwsResponseMetadata(String requestId) { - super(ImmutableMap.of(AWS_REQUEST_ID, requestId)); - } - } - - @Override - public void encode(AwsResponseMetadata value, OutputStream outStream) - throws CoderException, IOException { - REQUEST_ID_CODER.encode(value.requestId(), outStream); - } - - @Override - public AwsResponseMetadata decode(InputStream inStream) throws CoderException, IOException { - return new DecodedAwsResponseMetadata(REQUEST_ID_CODER.decode(inStream)); - } - } - - private static class SdkHttpResponseCoder extends CustomCoder { - private static final Coder STATUS_CODE_CODER = VarIntCoder.of(); - private static final Coder>> HEADERS_ENCODER = - NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), ListCoder.of(StringUtf8Coder.of()))); - - private final boolean includeHeaders; - - protected SdkHttpResponseCoder(boolean includeHeaders) { - this.includeHeaders = includeHeaders; - } - - @Override - public void encode(SdkHttpResponse value, OutputStream outStream) - throws CoderException, IOException { - STATUS_CODE_CODER.encode(value.statusCode(), outStream); - if (includeHeaders) { - HEADERS_ENCODER.encode(value.headers(), outStream); - } - } - - @Override - public SdkHttpResponse decode(InputStream inStream) throws CoderException, IOException { - SdkHttpResponse.Builder httpResponseBuilder = - SdkHttpResponse.builder().statusCode(STATUS_CODE_CODER.decode(inStream)); - - if (includeHeaders) { - Map> headers = HEADERS_ENCODER.decode(inStream); - if (headers != null) { - httpResponseBuilder.headers(headers); - } - } - return httpResponseBuilder.build(); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - STATUS_CODE_CODER.verifyDeterministic(); - if (includeHeaders) { - HEADERS_ENCODER.verifyDeterministic(); - } - } - } -} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/package-info.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/package-info.java deleted file mode 100644 index e23a1a5e763e..000000000000 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/coders/package-info.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** Defines common coders for Amazon Web Services. */ -package org.apache.beam.sdk.io.aws2.coders; diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCoders.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCoders.java deleted file mode 100644 index 57acd46f6151..000000000000 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCoders.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sns; - -import static org.apache.beam.sdk.io.aws2.coders.AwsCoders.sdkHttpResponse; -import static org.apache.beam.sdk.io.aws2.coders.AwsCoders.sdkHttpResponseWithoutHeaders; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; -import org.apache.beam.sdk.coders.NullableCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.aws2.coders.AwsCoders; -import org.checkerframework.checker.nullness.qual.Nullable; -import software.amazon.awssdk.awscore.AwsResponseMetadata; -import software.amazon.awssdk.http.SdkHttpResponse; -import software.amazon.awssdk.services.sns.model.PublishResponse; - -/** - * Coders for SNS {@link PublishResponse}. - * - * @deprecated Schema based coder is inferred automatically. - */ -@Deprecated -public class PublishResponseCoders { - private static final Coder MESSAGE_ID_CODER = StringUtf8Coder.of(); - private static final NullableCoder METADATA_CODER = - NullableCoder.of(AwsCoders.awsResponseMetadata()); - - private PublishResponseCoders() {} - - /** - * Returns a new SNS {@link PublishResponse} coder which by default serializes only the SNS - * messageId. - * - * @return the {@link PublishResponse} coder - */ - public static Coder defaultPublishResponse() { - return new PublishResponseCoder(null, null); - } - - /** - * Returns a new SNS {@link PublishResponse} coder which serializes {@link AwsResponseMetadata} - * and {@link SdkHttpResponse}, including the HTTP response headers. - * - * @return the {@link PublishResponse} coder - */ - public static Coder fullPublishResponse() { - return new PublishResponseCoder(METADATA_CODER, NullableCoder.of(sdkHttpResponse())); - } - - /** - * Returns a new SNS {@link PublishResponse} coder which serializes {@link AwsResponseMetadata} - * and {@link SdkHttpResponse}, but not including the HTTP response headers. - * - * @return the {@link PublishResponse} coder - */ - public static Coder fullPublishResponseWithoutHeaders() { - return new PublishResponseCoder( - METADATA_CODER, NullableCoder.of(sdkHttpResponseWithoutHeaders())); - } - - private static class PublishResponseCoder extends CustomCoder { - private final @Nullable NullableCoder metadataCoder; - private final @Nullable NullableCoder httpResponseCoder; - - private PublishResponseCoder( - @Nullable NullableCoder responseMetadataEncoder, - @Nullable NullableCoder sdkHttpMetadataCoder) { - this.metadataCoder = responseMetadataEncoder; - this.httpResponseCoder = sdkHttpMetadataCoder; - } - - @Override - public void encode(PublishResponse value, OutputStream outStream) - throws CoderException, IOException { - MESSAGE_ID_CODER.encode(value.messageId(), outStream); - if (metadataCoder != null) { - metadataCoder.encode(value.responseMetadata(), outStream); - } - if (httpResponseCoder != null) { - httpResponseCoder.encode(value.sdkHttpResponse(), outStream); - } - } - - @Override - public PublishResponse decode(InputStream inStream) throws CoderException, IOException { - PublishResponse.Builder responseBuilder = - PublishResponse.builder().messageId(MESSAGE_ID_CODER.decode(inStream)); - if (metadataCoder != null) { - AwsResponseMetadata metadata = metadataCoder.decode(inStream); - if (metadata != null) { - responseBuilder.responseMetadata(metadata); - } - } - if (httpResponseCoder != null) { - SdkHttpResponse httpResponse = httpResponseCoder.decode(inStream); - if (httpResponse != null) { - responseBuilder.sdkHttpResponse(httpResponse); - } - } - return responseBuilder.build(); - } - - @Override - public void verifyDeterministic() throws NonDeterministicException { - MESSAGE_ID_CODER.verifyDeterministic(); - if (metadataCoder != null) { - metadataCoder.verifyDeterministic(); - } - if (httpResponseCoder != null) { - httpResponseCoder.verifyDeterministic(); - } - } - } -} diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java index 7b9982517b80..182b7dbb3881 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java @@ -21,11 +21,9 @@ import com.google.auto.value.AutoValue; import java.util.function.Consumer; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory; import org.apache.beam.sdk.io.aws2.common.ClientConfiguration; import org.apache.beam.sdk.io.aws2.options.AwsOptions; -import org.apache.beam.sdk.io.aws2.schemas.AwsSchemaProvider; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptions; @@ -38,9 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.awscore.AwsResponseMetadata; import software.amazon.awssdk.core.exception.SdkException; -import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sns.SnsClient; import software.amazon.awssdk.services.sns.model.InvalidParameterException; @@ -69,12 +65,6 @@ *
  • Request builder function to create SNS publish requests from your input * * - *

    By default, the output {@link PublishResponse} contains only the SNS messageId, all other - * fields are null. If you need to include the full {@link SdkHttpResponse} and {@link - * AwsResponseMetadata}, you can call {@link Write#withFullPublishResponse()}. If you need the HTTP - * status code only but no headers, you can use {@link - * Write#withFullPublishResponseWithoutHeaders()}. - * *

    Configuration of AWS clients

    * *

    AWS clients for all AWS IOs can be configured using {@link AwsOptions}, e.g. {@code @@ -121,8 +111,6 @@ public abstract static class Write abstract @Nullable SerializableFunction getPublishRequestBuilder(); - abstract @Nullable Coder getCoder(); - abstract Builder builder(); @AutoValue.Builder @@ -135,8 +123,6 @@ abstract static class Builder { abstract Builder setPublishRequestBuilder( SerializableFunction requestBuilder); - abstract Builder setCoder(Coder coder); - abstract Write build(); } @@ -177,39 +163,6 @@ public Write withClientConfiguration(ClientConfiguration config) { return builder().setClientConfiguration(config).build(); } - /** - * Encode the full {@link PublishResponse} object, including sdkResponseMetadata and - * sdkHttpMetadata with the HTTP response headers. - * - * @deprecated Writes fail exceptionally in case of errors, there is no need to check headers. - */ - @Deprecated - public Write withFullPublishResponse() { - return withCoder(PublishResponseCoders.fullPublishResponse()); - } - - /** - * Encode the full {@link PublishResponse} object, including sdkResponseMetadata and - * sdkHttpMetadata but excluding the HTTP response headers. - * - * @deprecated Writes fail exceptionally in case of errors, there is no need to check headers. - */ - @Deprecated - public Write withFullPublishResponseWithoutHeaders() { - return withCoder(PublishResponseCoders.fullPublishResponseWithoutHeaders()); - } - - /** - * Encode the {@link PublishResponse} with the given coder. - * - * @deprecated Explicit usage of coders is deprecated. Inferred schemas provided by {@link - * AwsSchemaProvider} will be used instead. - */ - @Deprecated - public Write withCoder(Coder coder) { - return builder().setCoder(coder).build(); - } - @Override public PCollection expand(PCollection input) { checkArgument(getPublishRequestBuilder() != null, "withPublishRequestBuilder() is required"); @@ -221,11 +174,7 @@ public PCollection expand(PCollection input) { checkArgument(checkTopicExists(awsOptions), "Topic arn %s does not exist", getTopicArn()); } - PCollection result = input.apply(ParDo.of(new SnsWriterFn<>(this))); - if (getCoder() != null) { - result.setCoder(getCoder()); - } - return result; + return input.apply(ParDo.of(new SnsWriterFn<>(this))); } private boolean checkTopicExists(AwsOptions options) { diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/coders/AwsCodersTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/coders/AwsCodersTest.java deleted file mode 100644 index dd5ea454c340..000000000000 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/coders/AwsCodersTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.coders; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static software.amazon.awssdk.awscore.util.AwsHeader.AWS_REQUEST_ID; - -import java.util.Map; -import java.util.UUID; -import org.apache.beam.sdk.util.CoderUtils; -import org.junit.Test; -import software.amazon.awssdk.awscore.AwsResponseMetadata; -import software.amazon.awssdk.http.SdkHttpResponse; -import software.amazon.awssdk.utils.ImmutableMap; - -/** Tests for {@link AwsCoders}. */ -public class AwsCodersTest { - - @Test - public void testAwsResponseMetadataDecodeEncodeEquals() throws Exception { - AwsResponseMetadata value = buildAwsResponseMetadata(); - AwsResponseMetadata clone = CoderUtils.clone(AwsCoders.awsResponseMetadata(), value); - assertThat(clone.requestId(), equalTo(value.requestId())); - } - - @Test - public void testSdkHttpMetadataDecodeEncodeEquals() throws Exception { - SdkHttpResponse value = buildSdkHttpMetadata(); - SdkHttpResponse clone = CoderUtils.clone(AwsCoders.sdkHttpResponse(), value); - assertThat(clone.statusCode(), equalTo(value.statusCode())); - assertThat(clone.headers(), equalTo(value.headers())); - } - - @Test - public void testSdkHttpMetadataWithoutHeadersDecodeEncodeEquals() throws Exception { - SdkHttpResponse value = buildSdkHttpMetadata(); - SdkHttpResponse clone = CoderUtils.clone(AwsCoders.sdkHttpResponseWithoutHeaders(), value); - assertThat(clone.statusCode(), equalTo(value.statusCode())); - assertThat(clone.headers().isEmpty(), equalTo(true)); - } - - private AwsResponseMetadata buildAwsResponseMetadata() { - Map metadata = ImmutableMap.of(AWS_REQUEST_ID, UUID.randomUUID().toString()); - return new AwsResponseMetadata(metadata) {}; - } - - private SdkHttpResponse buildSdkHttpMetadata() { - return SdkHttpResponse.builder() - .statusCode(200) - .appendHeader("Content-Type", "application/json") - .build(); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCodersTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCodersTest.java deleted file mode 100644 index 4e21f885b21d..000000000000 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/PublishResponseCodersTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.aws2.sns; - -import static org.apache.beam.sdk.io.aws2.sns.PublishResponseCoders.defaultPublishResponse; -import static org.apache.beam.sdk.io.aws2.sns.PublishResponseCoders.fullPublishResponseWithoutHeaders; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static software.amazon.awssdk.awscore.util.AwsHeader.AWS_REQUEST_ID; - -import java.util.Map; -import java.util.UUID; -import org.apache.beam.sdk.testing.CoderProperties; -import org.apache.beam.sdk.util.CoderUtils; -import org.junit.Test; -import software.amazon.awssdk.awscore.AwsResponseMetadata; -import software.amazon.awssdk.http.SdkHttpResponse; -import software.amazon.awssdk.services.sns.model.PublishResponse; -import software.amazon.awssdk.utils.ImmutableMap; - -/** Tests for {@link PublishResponseCoders}. */ -public class PublishResponseCodersTest { - - @Test - public void testDefaultPublishResponseDecodeEncodeEquals() throws Exception { - CoderProperties.coderDecodeEncodeEqual( - defaultPublishResponse(), - PublishResponse.builder().messageId(UUID.randomUUID().toString()).build()); - } - - @Test - public void testFullPublishResponseWithoutHeadersDecodeEncodeEquals() throws Exception { - CoderProperties.coderDecodeEncodeEqual( - fullPublishResponseWithoutHeaders(), - PublishResponse.builder().messageId(UUID.randomUUID().toString()).build()); - - PublishResponse value = buildFullPublishResponse(); - PublishResponse clone = CoderUtils.clone(fullPublishResponseWithoutHeaders(), value); - assertThat(clone.responseMetadata().requestId(), equalTo(value.responseMetadata().requestId())); - assertThat(clone.sdkHttpResponse().statusCode(), equalTo(value.sdkHttpResponse().statusCode())); - assertThat(clone.sdkHttpResponse().headers().isEmpty(), equalTo(true)); - } - - @Test - public void testFullPublishResponseIncludingHeadersDecodeEncodeEquals() throws Exception { - CoderProperties.coderDecodeEncodeEqual( - PublishResponseCoders.fullPublishResponse(), - PublishResponse.builder().messageId(UUID.randomUUID().toString()).build()); - - PublishResponse value = buildFullPublishResponse(); - PublishResponse clone = CoderUtils.clone(PublishResponseCoders.fullPublishResponse(), value); - assertThat(clone.responseMetadata().requestId(), equalTo(value.responseMetadata().requestId())); - assertThat(clone.sdkHttpResponse().statusCode(), equalTo(value.sdkHttpResponse().statusCode())); - assertThat(clone.sdkHttpResponse().headers(), equalTo(value.sdkHttpResponse().headers())); - } - - private PublishResponse buildFullPublishResponse() { - return (PublishResponse) - PublishResponse.builder() - .messageId(UUID.randomUUID().toString()) - .responseMetadata(buildAwsResponseMetadata()) - .sdkHttpResponse(buildSdkHttpMetadata()) - .build(); - } - - private AwsResponseMetadata buildAwsResponseMetadata() { - Map metadata = ImmutableMap.of(AWS_REQUEST_ID, UUID.randomUUID().toString()); - return new AwsResponseMetadata(metadata) {}; - } - - private SdkHttpResponse buildSdkHttpMetadata() { - return SdkHttpResponse.builder() - .statusCode(200) - .appendHeader("Content-Type", "application/json") - .build(); - } -} diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java index 1eb40569577b..59fde7f532bc 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOTest.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.io.aws2.sns; -import static org.apache.beam.sdk.io.aws2.sns.PublishResponseCoders.defaultPublishResponse; -import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -29,8 +27,6 @@ import java.io.Serializable; import java.util.List; import java.util.function.Consumer; -import org.apache.beam.sdk.coders.DelegateCoder; -import org.apache.beam.sdk.coders.DelegateCoder.CodingFunction; import org.apache.beam.sdk.io.aws2.MockClientBuilderFactory; import org.apache.beam.sdk.io.aws2.sns.SnsIO.Write; import org.apache.beam.sdk.testing.PAssert; @@ -137,41 +133,6 @@ public void testWriteWithoutTopicArn() { } } - @Test - public void testWriteWithCustomCoder() { - List input = ImmutableList.of("message1"); - - when(sns.publish(any(PublishRequest.class))) - .thenReturn(PublishResponse.builder().messageId("id").build()); - - // Mockito mocks cause NotSerializableException even with withSettings().serializable() - final CountingFn countingFn = new CountingFn<>(); - - Write snsWrite = - SnsIO.write() - .withPublishRequestBuilder(msg -> requestBuilder(msg, topicArn)) - .withCoder(DelegateCoder.of(defaultPublishResponse(), countingFn, x -> x)); - - PCollection results = p.apply(Create.of(input)).apply(snsWrite); - PAssert.that(results.apply(Count.globally())).containsInAnyOrder(1L); - p.run(); - - assertThat(countingFn.count).isGreaterThan(0); - for (String msg : input) { - verify(sns).publish(requestBuilder(msg, topicArn).build()); - } - } - - private static class CountingFn implements CodingFunction { - int count; - - @Override - public T apply(T input) throws Exception { - count++; - return input; - } - } - private static PublishRequest.Builder requestBuilder(String msg, String topic) { return PublishRequest.builder().message(msg).topicArn(topic); }