diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index cb8820f62a27..a7218aae635d 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -88,20 +88,17 @@ import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery7; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; import org.slf4j.LoggerFactory; @@ -792,12 +789,13 @@ public void processElement(ProcessContext c) { * Send {@code events} to Kafka. */ private void sinkEventsToKafka(PCollection events) { - PTransform, PDone> io = KafkaIO.write() - .withBootstrapServers(options.getBootstrapServers()) - .withTopic(options.getKafkaSinkTopic()) - .withKeySerializer(LongSerializer.class) - .withValueSerializer(ByteArraySerializer.class).values(); - events.apply("Event to bytes", ParDo.of(EVENT_TO_BYTEARRAY)).apply(io); + PCollection event_to_bytes = events.apply("Event to bytes", ParDo.of(EVENT_TO_BYTEARRAY)); + event_to_bytes.apply(KafkaIO.write() + .withBootstrapServers(options.getBootstrapServers()) + .withTopic(options.getKafkaSinkTopic()) + .withValueSerializer(ByteArraySerializer.class) + .values()); + } @@ -892,12 +890,13 @@ private void sinkResultsToKafka(PCollection formattedResults) { checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()), "Missing --bootstrapServers"); - PTransform, PDone> io = KafkaIO.write() + formattedResults.apply( + queryName + ".WriteKafkaResults", + KafkaIO.write() .withBootstrapServers(options.getBootstrapServers()) .withTopic(options.getKafkaSinkTopic()) - .withKeySerializer(LongSerializer.class) - .withValueSerializer(StringSerializer.class).values(); - formattedResults.apply(queryName + ".WriteKafkaResults", io); + .withValueSerializer(StringSerializer.class) + .values()); } /**