Skip to content

Commit

Permalink
Allow writebuilder to be serialized
Browse files Browse the repository at this point in the history
  • Loading branch information
damccorm committed Dec 23, 2024
1 parent 4260926 commit d737e5b
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -59,13 +56,7 @@ public static byte[] serializeToByteArray(Serializable value) {
}
return buffer.toByteArray();
} catch (IOException exn) {
ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
try {
throw new IllegalArgumentException(
"unable to serialize " + ow.writeValueAsString(value), exn);
} catch (JsonProcessingException ex) {
throw new IllegalArgumentException("unable to jsonify " + value, exn);
}
throw new IllegalArgumentException("unable to serialize " + value, exn);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public PTransform<PCollection<byte[]>, KinesisIO.Write.Result> buildExternal(
Configuration configuration) {
AwsBasicCredentials creds =
AwsBasicCredentials.create(configuration.awsAccessKey, configuration.awsSecretKey);
String pk = configuration.partitionKey;
StaticCredentialsProvider provider = StaticCredentialsProvider.create(creds);
SerializableFunction<byte[], byte[]> serializer = v -> v;
@Nullable URI endpoint = null;
Expand All @@ -123,7 +124,7 @@ public PTransform<PCollection<byte[]>, KinesisIO.Write.Result> buildExternal(
.region(Region.of(configuration.region))
.endpoint(endpoint)
.build())
.withPartitioner(p -> configuration.partitionKey)
.withPartitioner(p -> pk)
.withSerializer(serializer);

return writeTransform;
Expand Down

0 comments on commit d737e5b

Please sign in to comment.