Skip to content

Commit

Permalink
Merge pull request apache#2 from echauchot/fix_pr_5019
Browse files Browse the repository at this point in the history
Fix the code that uses Long key whereas there is no key in the PCollection
  • Loading branch information
aromanenko-dev authored Apr 4, 2018
2 parents 0d2db11 + 4044186 commit 9d73f3a
Showing 1 changed file with 12 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,17 @@
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery7;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -792,12 +789,13 @@ public void processElement(ProcessContext c) {
* Send {@code events} to Kafka.
*/
private void sinkEventsToKafka(PCollection<Event> events) {
PTransform<PCollection<byte[]>, PDone> io = KafkaIO.<Long, byte[]>write()
.withBootstrapServers(options.getBootstrapServers())
.withTopic(options.getKafkaSinkTopic())
.withKeySerializer(LongSerializer.class)
.withValueSerializer(ByteArraySerializer.class).values();
events.apply("Event to bytes", ParDo.of(EVENT_TO_BYTEARRAY)).apply(io);
PCollection<byte[]> event_to_bytes = events.apply("Event to bytes", ParDo.of(EVENT_TO_BYTEARRAY));
event_to_bytes.apply(KafkaIO.<Void, byte[]>write()
.withBootstrapServers(options.getBootstrapServers())
.withTopic(options.getKafkaSinkTopic())
.withValueSerializer(ByteArraySerializer.class)
.values());

}


Expand Down Expand Up @@ -892,12 +890,13 @@ private void sinkResultsToKafka(PCollection<String> formattedResults) {
checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()),
"Missing --bootstrapServers");

PTransform<PCollection<String>, PDone> io = KafkaIO.<Long, String>write()
formattedResults.apply(
queryName + ".WriteKafkaResults",
KafkaIO.<Void, String>write()
.withBootstrapServers(options.getBootstrapServers())
.withTopic(options.getKafkaSinkTopic())
.withKeySerializer(LongSerializer.class)
.withValueSerializer(StringSerializer.class).values();
formattedResults.apply(queryName + ".WriteKafkaResults", io);
.withValueSerializer(StringSerializer.class)
.values());
}

/**
Expand Down

0 comments on commit 9d73f3a

Please sign in to comment.