Skip to content

Commit

Permalink
Fix up
Browse files Browse the repository at this point in the history
  • Loading branch information
damccorm committed Dec 18, 2024
1 parent a0cf363 commit 93e427a
Showing 1 changed file with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void setRegion(String region) {

public void setServiceEndpoint(@Nullable String serviceEndpoint) {
if (serviceEndpoint != null) {
this.serviceEndpoint = URI(serviceEndpoint);
this.serviceEndpoint = new URI(serviceEndpoint);
}
}
}
Expand Down Expand Up @@ -254,7 +254,13 @@ public PTransform<PBegin, PCollection<byte[]>> buildExternal(
}
// Convert back to bytes to keep consistency with previous verison:
// https://github.com/apache/beam/blob/5eed396caf9e0065d8ed82edcc236bad5b71ba22/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java
return readTransform.Map(kr -> kr.getDataAsBytes());
return readTransform.apply("Convert to bytes", ParDo.of(new DoFn<KiesisRecord, byte[]>() {
@ProcessElement
public void processElement(ProcessContext c) {
KinesisRecord record = c.element();
return record.getDataAsBytes();
}
}));
}
}
}

0 comments on commit 93e427a

Please sign in to comment.