diff --git a/CHANGES.md b/CHANGES.md
index 83e57d583ef1..4c0cb3ce3c24 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -78,6 +78,7 @@
* Passing a tag into MultiProcessShared is now required in the Python SDK ([#26168](https://github.com/apache/beam/issues/26168)).
* CloudDebuggerOptions is removed (deprecated in Beam v2.47.0) for Dataflow runner as the Google Cloud Debugger service is [shutting down](https://cloud.google.com/debugger/docs/deprecations). (Java) ([#25959](https://github.com/apache/beam/issues/25959)).
* AWS 2 client providers (deprecated in Beam [v2.38.0](#2380---2022-04-20)) are finally removed ([#26681](https://github.com/apache/beam/issues/26681)).
+* AWS 2 SnsIO.writeAsync (deprecated in Beam v2.37.0 due to risk of data loss) was finally removed ([#26710](https://github.com/apache/beam/issues/26710)).
## Deprecations
diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsAsyncClientProvider.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsAsyncClientProvider.java
deleted file mode 100644
index 7b9ea39989f4..000000000000
--- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsAsyncClientProvider.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.sns;
-
-import static org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory.defaultFactory;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-
-import java.net.URI;
-import org.apache.beam.sdk.io.aws2.common.ClientConfiguration;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.sns.SnsAsyncClient;
-
-/** Basic implementation of {@link SnsAsyncClientProvider} used by default in {@link SnsIO}. */
-class BasicSnsAsyncClientProvider implements SnsAsyncClientProvider {
- private final ClientConfiguration config;
-
- BasicSnsAsyncClientProvider(
- AwsCredentialsProvider credentialsProvider, String region, @Nullable URI endpoint) {
- checkArgument(credentialsProvider != null, "awsCredentialsProvider can not be null");
- checkArgument(region != null, "region can not be null");
- config = ClientConfiguration.create(credentialsProvider, Region.of(region), endpoint);
- }
-
- @Override
- public SnsAsyncClient getSnsAsyncClient() {
- return defaultFactory().create(SnsAsyncClient.builder(), config, null).build();
- }
-
- @Override
- public boolean equals(@Nullable Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- BasicSnsAsyncClientProvider that = (BasicSnsAsyncClientProvider) o;
- return config.equals(that.config);
- }
-
- @Override
- public int hashCode() {
- return config.hashCode();
- }
-}
diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsAsyncClientProvider.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsAsyncClientProvider.java
deleted file mode 100644
index 372ea73ef81f..000000000000
--- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsAsyncClientProvider.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.sns;
-
-import java.io.Serializable;
-import software.amazon.awssdk.services.sns.SnsAsyncClient;
-
-/**
- * Provides instances of Asynchronous SNS client.
- *
- *
Please note, that any instance of {@link SnsAsyncClientProvider} must be {@link Serializable}
- * to ensure it can be sent to worker machines.
- */
-public interface SnsAsyncClientProvider extends Serializable {
- SnsAsyncClient getSnsAsyncClient();
-}
diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java
index f8e40d00fadd..7b9982517b80 100644
--- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java
+++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsIO.java
@@ -18,11 +18,8 @@
package org.apache.beam.sdk.io.aws2.sns;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue;
-import java.net.URI;
-import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory;
@@ -45,7 +42,6 @@
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.sns.SnsAsyncClient;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.InvalidParameterException;
import software.amazon.awssdk.services.sns.model.NotFoundException;
@@ -114,16 +110,6 @@ public static Write write() {
.build();
}
- /**
- * @deprecated Please use {@link SnsIO#write()} to avoid the risk of data loss.
- * @see Issue #21366, BEAM-13203
- */
- @Deprecated
- public static WriteAsync writeAsync() {
- return new AutoValue_SnsIO_WriteAsync.Builder().build();
- }
-
/** Implementation of {@link #write}. */
@AutoValue
public abstract static class Write
@@ -305,152 +291,4 @@ public void tearDown() {
}
}
}
-
- /**
- * Implementation of {@link #writeAsync}.
- *
- * @deprecated Please use {@link SnsIO#write()} to avoid the risk of data loss.
- * @see Issue #21366, BEAM-13203
- */
- @Deprecated
- @AutoValue
- public abstract static class WriteAsync
- extends PTransform, PCollection>> {
-
- abstract @Nullable SnsAsyncClientProvider getSnsClientProvider();
-
- /** SerializableFunction to create PublishRequest. */
- abstract @Nullable SerializableFunction getPublishRequestFn();
-
- /** Coder for element T. */
- abstract @Nullable Coder getCoder();
-
- abstract Builder builder();
-
- @AutoValue.Builder
- abstract static class Builder {
- abstract Builder setSnsClientProvider(SnsAsyncClientProvider asyncClientProvider);
-
- abstract Builder setCoder(Coder elementCoder);
-
- abstract Builder setPublishRequestFn(
- SerializableFunction publishRequestFn);
-
- abstract WriteAsync build();
- }
-
- /**
- * Specify a Coder for SNS PublishRequest object.
- *
- * @param elementCoder Coder
- */
- public WriteAsync withCoder(Coder elementCoder) {
- checkNotNull(elementCoder, "elementCoder cannot be null");
- return builder().setCoder(elementCoder).build();
- }
-
- /**
- * Specify a function for converting a message into PublishRequest object.
- *
- * @param publishRequestFn publishRequestFn
- */
- public WriteAsync withPublishRequestFn(
- SerializableFunction publishRequestFn) {
- checkNotNull(publishRequestFn, "publishRequestFn cannot be null");
- return builder().setPublishRequestFn(publishRequestFn).build();
- }
-
- /**
- * Allows to specify custom {@link SnsAsyncClientProvider}. {@link SnsAsyncClientProvider}
- * creates new {@link SnsAsyncClientProvider} which is later used for writing to a SNS topic.
- */
- public WriteAsync withSnsClientProvider(SnsAsyncClientProvider asyncClientProvider) {
- checkNotNull(asyncClientProvider, "asyncClientProvider cannot be null");
- return builder().setSnsClientProvider(asyncClientProvider).build();
- }
-
- /**
- * Specify credential details and region to be used to write to SNS. If you need more
- * sophisticated credential protocol, then you should look at {@link
- * WriteAsync#withSnsClientProvider(SnsAsyncClientProvider)}.
- */
- public WriteAsync withSnsClientProvider(
- AwsCredentialsProvider credentialsProvider, String region) {
- checkNotNull(credentialsProvider, "credentialsProvider cannot be null");
- checkNotNull(region, "region cannot be null");
- return withSnsClientProvider(credentialsProvider, region, null);
- }
-
- /**
- * Specify credential details and region to be used to write to SNS. If you need more
- * sophisticated credential protocol, then you should look at {@link
- * WriteAsync#withSnsClientProvider(SnsAsyncClientProvider)}.
- *
- * The {@code serviceEndpoint} sets an alternative service host.
- */
- public WriteAsync withSnsClientProvider(
- AwsCredentialsProvider credentialsProvider, String region, URI serviceEndpoint) {
- checkNotNull(credentialsProvider, "credentialsProvider cannot be null");
- checkNotNull(region, "region cannot be null");
- return withSnsClientProvider(
- new BasicSnsAsyncClientProvider(credentialsProvider, region, serviceEndpoint));
- }
-
- @Override
- public PCollection> expand(PCollection input) {
- checkArgument(getSnsClientProvider() != null, "withSnsClientProvider() needs to called");
- checkArgument(getPublishRequestFn() != null, "withPublishRequestFn() needs to called");
- checkArgument(getCoder() != null, "withElementCoder() needs to called");
-
- return input
- .apply(ParDo.of(new SnsWriteAsyncFn<>(this)))
- .setCoder(SnsResponseCoder.of(getCoder()));
- }
-
- private static class SnsWriteAsyncFn extends DoFn> {
-
- private static final Logger LOG = LoggerFactory.getLogger(SnsWriteAsyncFn.class);
-
- private final WriteAsync spec;
- private transient SnsAsyncClient client;
-
- SnsWriteAsyncFn(WriteAsync spec) {
- this.spec = spec;
- }
-
- @Setup
- public void setup() {
- this.client = spec.getSnsClientProvider().getSnsAsyncClient();
- }
-
- @SuppressWarnings("FutureReturnValueIgnored")
- @ProcessElement
- public void processElement(ProcessContext context) {
- PublishRequest publishRequest = spec.getPublishRequestFn().apply(context.element());
- client.publish(publishRequest).whenComplete(getPublishResponse(context));
- }
-
- private BiConsumer super PublishResponse, ? super Throwable> getPublishResponse(
- DoFn>.ProcessContext context) {
- return (response, ex) -> {
- if (ex == null) {
- SnsResponse snsResponse = SnsResponse.of(context.element(), response);
- context.output(snsResponse);
- } else {
- LOG.error("Error while publishing request to SNS", ex);
- throw new SnsWriteException("Error while publishing request to SNS", ex);
- }
- };
- }
- }
- }
-
- /** Exception class for SNS write exceptions. */
- protected static class SnsWriteException extends RuntimeException {
-
- SnsWriteException(String message, Throwable error) {
- super(message, error);
- }
- }
}
diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponse.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponse.java
deleted file mode 100644
index 05348cd736ea..000000000000
--- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponse.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.sns;
-
-import com.google.auto.value.AutoValue;
-import java.io.Serializable;
-import java.util.Optional;
-import java.util.OptionalInt;
-import org.checkerframework.checker.nullness.qual.NonNull;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import software.amazon.awssdk.services.sns.model.PublishResponse;
-
-@AutoValue
-abstract class SnsResponse implements Serializable {
-
- public abstract T element();
-
- public abstract OptionalInt statusCode();
-
- public abstract Optional statusText();
-
- static SnsResponse create(
- @NonNull T element, OptionalInt statusCode, Optional statusText) {
-
- return new AutoValue_SnsResponse<>(element, statusCode, statusText);
- }
-
- public static SnsResponse of(@NonNull T element, @Nullable PublishResponse response) {
-
- final Optional publishResponse = Optional.ofNullable(response);
- OptionalInt statusCode =
- publishResponse
- .map(r -> OptionalInt.of(r.sdkHttpResponse().statusCode()))
- .orElse(OptionalInt.empty());
-
- Optional statusText = publishResponse.flatMap(r -> r.sdkHttpResponse().statusText());
-
- return create(element, statusCode, statusText);
- }
-}
diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoder.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoder.java
deleted file mode 100644
index 88d40f74edca..000000000000
--- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoder.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.sns;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.Optional;
-import java.util.OptionalInt;
-import org.apache.beam.sdk.coders.BooleanCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.StructuredCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-
-/**
- * Custom Coder for WrappedSnsResponse.
- *
- * @deprecated Coder of deprecated {@link SnsResponse}.
- */
-@Deprecated
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-class SnsResponseCoder extends StructuredCoder> {
-
- private final Coder elementCoder;
- private static final VarIntCoder STATUS_CODE_CODER = VarIntCoder.of();
- private static final StringUtf8Coder STATUS_TEXT_CODER = StringUtf8Coder.of();
-
- public SnsResponseCoder(Coder elementCoder) {
- this.elementCoder = elementCoder;
- }
-
- static SnsResponseCoder of(Coder elementCoder) {
- return new SnsResponseCoder<>(elementCoder);
- }
-
- @Override
- public void encode(SnsResponse value, OutputStream outStream) throws IOException {
- T element = value.element();
- elementCoder.encode(element, outStream);
-
- OptionalInt statusCode = value.statusCode();
- if (statusCode.isPresent()) {
- BooleanCoder.of().encode(Boolean.TRUE, outStream);
- STATUS_CODE_CODER.encode(statusCode.getAsInt(), outStream);
- } else {
- BooleanCoder.of().encode(Boolean.FALSE, outStream);
- }
-
- Optional statusText = value.statusText();
- if (statusText.isPresent()) {
- BooleanCoder.of().encode(Boolean.TRUE, outStream);
- STATUS_TEXT_CODER.encode(statusText.get(), outStream);
- } else {
- BooleanCoder.of().encode(Boolean.FALSE, outStream);
- }
- }
-
- @Override
- public SnsResponse decode(InputStream inStream) throws IOException {
- T element = elementCoder.decode(inStream);
-
- OptionalInt statusCode = OptionalInt.empty();
- if (BooleanCoder.of().decode(inStream)) {
- statusCode = OptionalInt.of(STATUS_CODE_CODER.decode(inStream));
- }
-
- Optional statusText = Optional.empty();
- if (BooleanCoder.of().decode(inStream)) {
- statusText = Optional.of(STATUS_TEXT_CODER.decode(inStream));
- }
- return SnsResponse.create(element, statusCode, statusText);
- }
-
- @Override
- public List extends Coder>> getCoderArguments() {
- return ImmutableList.of(elementCoder);
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- elementCoder.verifyDeterministic();
- }
-}
diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsAsyncClientProviderTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsAsyncClientProviderTest.java
deleted file mode 100644
index 4b94750a8bb2..000000000000
--- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/BasicSnsAsyncClientProviderTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.sns;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
-import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
-import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
-
-/** Tests on {@link BasicSnsAsyncClientProvider}. */
-@RunWith(JUnit4.class)
-public class BasicSnsAsyncClientProviderTest {
-
- @Test
- public void testSerialization() {
- AwsCredentialsProvider awsCredentialsProvider =
- StaticCredentialsProvider.create(
- AwsBasicCredentials.create("ACCESS_KEY_ID", "SECRET_ACCESS_KEY"));
-
- BasicSnsAsyncClientProvider snsAsyncClientProvider =
- new BasicSnsAsyncClientProvider(awsCredentialsProvider, "us-east-1", null);
-
- byte[] serializedBytes = SerializableUtils.serializeToByteArray(snsAsyncClientProvider);
-
- BasicSnsAsyncClientProvider snsAsyncClientProviderDeserialized =
- (BasicSnsAsyncClientProvider)
- SerializableUtils.deserializeFromByteArray(serializedBytes, "Aws Credentials Provider");
-
- assertEquals(snsAsyncClientProvider, snsAsyncClientProviderDeserialized);
- }
-}
diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncBaseClient.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncBaseClient.java
deleted file mode 100644
index 697ae8ba7b76..000000000000
--- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncBaseClient.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.sns;
-
-import java.io.Serializable;
-import software.amazon.awssdk.services.sns.SnsAsyncClient;
-
-class MockSnsAsyncBaseClient implements SnsAsyncClient, Serializable {
- @Override
- public String serviceName() {
- return null;
- }
-
- @Override
- public void close() {}
-}
diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncClient.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncClient.java
deleted file mode 100644
index 65e476874162..000000000000
--- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncClient.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.sns;
-
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import software.amazon.awssdk.http.SdkHttpResponse;
-import software.amazon.awssdk.services.sns.model.PublishRequest;
-import software.amazon.awssdk.services.sns.model.PublishResponse;
-
-final class MockSnsAsyncClient extends MockSnsAsyncBaseClient {
- private final int statusCode;
-
- private MockSnsAsyncClient(int statusCode) {
- this.statusCode = statusCode;
- }
-
- static MockSnsAsyncClient withStatusCode(int statusCode) {
- return new MockSnsAsyncClient(statusCode);
- }
-
- @Override
- public CompletableFuture publish(PublishRequest publishRequest) {
- return CompletableFuture.supplyAsync(
- () -> {
- SdkHttpResponse sdkHttpResponse =
- SdkHttpResponse.builder().statusCode(statusCode).build();
- PublishResponse.Builder builder = PublishResponse.builder();
- builder.messageId(UUID.randomUUID().toString());
- builder.sdkHttpResponse(sdkHttpResponse).build();
- return builder.build();
- });
- }
-}
diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncExceptionClient.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncExceptionClient.java
deleted file mode 100644
index 198b0f78dd68..000000000000
--- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/MockSnsAsyncExceptionClient.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.sns;
-
-import java.util.concurrent.CompletableFuture;
-import software.amazon.awssdk.services.sns.model.PublishRequest;
-import software.amazon.awssdk.services.sns.model.PublishResponse;
-
-final class MockSnsAsyncExceptionClient extends MockSnsAsyncBaseClient {
- private MockSnsAsyncExceptionClient() {}
-
- static MockSnsAsyncExceptionClient create() {
- return new MockSnsAsyncExceptionClient();
- }
-
- @Override
- public CompletableFuture publish(PublishRequest publishRequest) {
- return CompletableFuture.supplyAsync(
- () -> {
- throw new RuntimeException("Error occurred during publish call");
- });
- }
-}
diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOWriteTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOWriteTest.java
deleted file mode 100644
index 9fb544aca36a..000000000000
--- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsIOWriteTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.sns;
-
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.Set;
-import java.util.stream.StreamSupport;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import software.amazon.awssdk.services.sns.model.PublishRequest;
-
-@RunWith(JUnit4.class)
-public class SnsIOWriteTest implements Serializable {
- private static final String TOPIC = "test";
- private static final int FAILURE_STATUS_CODE = 400;
- private static final int SUCCESS_STATUS_CODE = 200;
-
- @Rule public transient TestPipeline pipeline = TestPipeline.create();
-
- @Test
- @Ignore("Deprecated SnsIO.writeAsync doesn't wait for future responses.")
- public void shouldReturnResponseOnPublishSuccess() {
- String testMessage1 = "test1";
- String testMessage2 = "test2";
- String testMessage3 = "test3";
-
- PCollection> result =
- pipeline
- .apply(
- Create.of(testMessage1, testMessage2, testMessage3).withCoder(StringUtf8Coder.of()))
- .apply(
- SnsIO.writeAsync()
- .withCoder(StringUtf8Coder.of())
- .withPublishRequestFn(createPublishRequestFn())
- .withSnsClientProvider(
- () -> MockSnsAsyncClient.withStatusCode(SUCCESS_STATUS_CODE)));
-
- PAssert.that(result)
- .satisfies(
- (responses) -> {
- ImmutableSet messagesInResponse =
- StreamSupport.stream(responses.spliterator(), false)
- .filter(response -> response.statusCode().getAsInt() == SUCCESS_STATUS_CODE)
- .map(SnsResponse::element)
- .collect(ImmutableSet.toImmutableSet());
-
- Set originalMessages =
- Sets.newHashSet(testMessage1, testMessage2, testMessage3);
- Sets.SetView difference =
- Sets.difference(messagesInResponse, originalMessages);
-
- assertEquals(3, messagesInResponse.size());
- assertEquals(0, difference.size());
- return null;
- });
- pipeline.run().waitUntilFinish();
- }
-
- @Test
- @Ignore("Deprecated SnsIO.writeAsync doesn't wait for future responses.")
- public void shouldReturnResponseOnPublishFailure() {
- String testMessage1 = "test1";
- String testMessage2 = "test2";
-
- PCollection> result =
- pipeline
- .apply(Create.of(testMessage1, testMessage2).withCoder(StringUtf8Coder.of()))
- .apply(
- SnsIO.writeAsync()
- .withCoder(StringUtf8Coder.of())
- .withPublishRequestFn(createPublishRequestFn())
- .withSnsClientProvider(
- () -> MockSnsAsyncClient.withStatusCode(FAILURE_STATUS_CODE)));
-
- PAssert.that(result)
- .satisfies(
- (responses) -> {
- ImmutableSet messagesInResponse =
- StreamSupport.stream(responses.spliterator(), false)
- .filter(response -> response.statusCode().getAsInt() != SUCCESS_STATUS_CODE)
- .map(SnsResponse::element)
- .collect(ImmutableSet.toImmutableSet());
-
- Set originalMessages = Sets.newHashSet(testMessage1, testMessage2);
- Sets.SetView difference =
- Sets.difference(messagesInResponse, originalMessages);
-
- assertEquals(2, messagesInResponse.size());
- assertEquals(0, difference.size());
- return null;
- });
- pipeline.run().waitUntilFinish();
- }
-
- @Test
- @Ignore("Deprecated SnsIO.writeAsync doesn't fail on failure status code.")
- public void shouldThrowIfThrowErrorOptionSet() {
- pipeline
- .apply(Create.of("test1"))
- .apply(
- SnsIO.writeAsync()
- .withCoder(StringUtf8Coder.of())
- .withPublishRequestFn(createPublishRequestFn())
- .withSnsClientProvider(
- () -> MockSnsAsyncClient.withStatusCode(FAILURE_STATUS_CODE)));
-
- assertThatThrownBy(() -> pipeline.run().waitUntilFinish())
- .isInstanceOf(Pipeline.PipelineExecutionException.class);
- }
-
- @Test
- @Ignore("Deprecated SnsIO.writeAsync doesn't propagate async failures.")
- public void shouldThrowIfThrowErrorOptionSetOnInternalException() {
- pipeline
- .apply(Create.of("test1"))
- .apply(
- SnsIO.writeAsync()
- .withCoder(StringUtf8Coder.of())
- .withPublishRequestFn(createPublishRequestFn())
- .withSnsClientProvider(MockSnsAsyncExceptionClient::create));
-
- assertThatThrownBy(() -> pipeline.run().waitUntilFinish())
- .isInstanceOf(Pipeline.PipelineExecutionException.class);
- }
-
- private SerializableFunction createPublishRequestFn() {
- return (input) -> PublishRequest.builder().topicArn(TOPIC).message(input).build();
- }
-}
diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoderTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoderTest.java
deleted file mode 100644
index f0d4563a90c1..000000000000
--- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/sns/SnsResponseCoderTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.aws2.sns;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Optional;
-import java.util.OptionalInt;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class SnsResponseCoderTest {
-
- @Test
- public void verifyResponseWithStatusCodeAndText() throws IOException {
-
- SnsResponse expected =
- SnsResponse.create("test-1", OptionalInt.of(200), Optional.of("OK"));
-
- SnsResponseCoder coder = SnsResponseCoder.of(StringUtf8Coder.of());
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- coder.encode(expected, output);
-
- ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray());
- SnsResponse actual = coder.decode(in);
-
- Assert.assertEquals(expected, actual);
- }
-
- @Test
- public void verifyResponseWithStatusAndNoText() throws IOException {
- SnsResponse expected =
- SnsResponse.create("test-2", OptionalInt.of(200), Optional.empty());
-
- SnsResponseCoder coder = SnsResponseCoder.of(StringUtf8Coder.of());
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- coder.encode(expected, output);
-
- ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray());
- SnsResponse actual = coder.decode(in);
-
- Assert.assertEquals(expected, actual);
- }
-
- @Test
- public void verifyResponseWithNoStatusCodeAndText() throws IOException {
-
- SnsResponse expected =
- SnsResponse.create("test-3", OptionalInt.empty(), Optional.empty());
-
- SnsResponseCoder coder = SnsResponseCoder.of(StringUtf8Coder.of());
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- coder.encode(expected, output);
-
- ByteArrayInputStream in = new ByteArrayInputStream(output.toByteArray());
- SnsResponse actual = coder.decode(in);
-
- Assert.assertEquals(expected, actual);
- }
-}