From a4aa6f1944ea03548b237379f86c0278be3c539b Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Fri, 20 Dec 2024 14:44:23 +0000 Subject: [PATCH] Allow configuration to be serialized --- .../kinesis/KinesisTransformRegistrar.java | 35 +++++++++++++------ 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 8dfe1f07729..ddb80131f6b 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -62,7 +62,7 @@ private abstract static class CrossLanguageConfiguration { String awsAccessKey; String awsSecretKey; Region region; - @Nullable URI serviceEndpoint; + @Nullable String serviceEndpoint; public void setStreamName(String streamName) { this.streamName = streamName; @@ -81,14 +81,7 @@ public void setRegion(String region) { } public void setServiceEndpoint(@Nullable String serviceEndpoint) { - if (serviceEndpoint != null) { - try { - this.serviceEndpoint = new URI(serviceEndpoint); - } catch (URISyntaxException ex) { - throw new RuntimeException( - String.format("Service endpoint must be URI format, got: %s", serviceEndpoint)); - } - } + this.serviceEndpoint = serviceEndpoint; } } @@ -111,6 +104,16 @@ public PTransform, KinesisIO.Write.Result> buildExternal( AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); SerializableFunction serializer = v -> v; + @Nullable URI endpoint; + if (configuration.serviceEndpoint != null) { + try { + endpoint = configuration.serviceEndpoint; + } + catch (URISyntaxException ex) { + throw new RuntimeException( + String.format("Service endpoint must be URI format, got: %s", serviceEndpoint)); + } + } KinesisIO.Write writeTransform = KinesisIO.write() .withStreamName(configuration.streamName) @@ -118,7 +121,7 @@ public PTransform, KinesisIO.Write.Result> buildExternal( ClientConfiguration.builder() .credentialsProvider(provider) .region(configuration.region) - .endpoint(configuration.serviceEndpoint) + .endpoint(endpoint) .build()) .withPartitioner(p -> configuration.partitionKey) .withSerializer(serializer); @@ -211,6 +214,16 @@ public PTransform> buildExternal( AwsBasicCredentials creds = AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey); StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds); + @Nullable URI endpoint; + if (configuration.serviceEndpoint != null) { + try { + endpoint = configuration.serviceEndpoint; + } + catch (URISyntaxException ex) { + throw new RuntimeException( + String.format("Service endpoint must be URI format, got: %s", serviceEndpoint)); + } + } KinesisIO.Read readTransform = KinesisIO.read() .withStreamName(configuration.streamName) @@ -218,7 +231,7 @@ public PTransform> buildExternal( ClientConfiguration.builder() .credentialsProvider(provider) .region(configuration.region) - .endpoint(configuration.serviceEndpoint) + .endpoint(endpoint) .build()); if (configuration.maxNumRecords != null) {