Skip to content

Commit

Permalink
sinkToKafka works
Browse files Browse the repository at this point in the history
  • Loading branch information
vectorijk authored and aromanenko-dev committed Apr 4, 2018
1 parent 658915a commit 0d2db11
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 4 deletions.
35 changes: 33 additions & 2 deletions sdks/java/nexmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,22 @@
<argLine>-da</argLine> <!-- disable assert in Calcite converter validation -->
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

<properties>
<kafka.clients.version>0.10.1.0</kafka.clients.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
Expand Down Expand Up @@ -274,13 +287,13 @@
<artifactId>hamcrest-core</artifactId>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
Expand All @@ -303,5 +316,23 @@
<artifactId>commons-lang3</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.clients.version}</version>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.nexmark;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

import com.google.api.services.bigquery.model.TableFieldSchema;
Expand All @@ -42,6 +43,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
Expand Down Expand Up @@ -86,13 +88,21 @@
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery7;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -764,6 +774,68 @@ public void processElement(ProcessContext c) {
}));
}

static final DoFn<Event, byte[]> EVENT_TO_BYTEARRAY =
new DoFn<Event, byte[]>() {
@ProcessElement
public void processElement(ProcessContext c) {
try {
byte[] encodedEvent = CoderUtils.encodeToByteArray(Event.CODER, c.element());
c.output(encodedEvent);
} catch (CoderException e1) {
LOG.error("Error while sending Event {} to Kafka: serialization error",
c.element().toString());
}
}
};

/**
* Send {@code events} to Kafka.
*/
private void sinkEventsToKafka(PCollection<Event> events) {
PTransform<PCollection<byte[]>, PDone> io = KafkaIO.<Long, byte[]>write()
.withBootstrapServers(options.getBootstrapServers())
.withTopic(options.getKafkaSinkTopic())
.withKeySerializer(LongSerializer.class)
.withValueSerializer(ByteArraySerializer.class).values();
events.apply("Event to bytes", ParDo.of(EVENT_TO_BYTEARRAY)).apply(io);
}


static final DoFn<KV<Long, byte[]>, Event> BYTEARRAY_TO_EVENT =
new DoFn<KV<Long, byte[]>, Event>() {
@ProcessElement
public void processElement(ProcessContext c) {
byte[] encodedEvent = c.element().getValue();
try {
Event event = CoderUtils.decodeFromByteArray(Event.CODER, encodedEvent);
c.output(event);
} catch (CoderException e) {
LOG.error("Error while decoding Event from Kafka message: serialization error");
}
}
};

/**
* Return source of events from Kafka.
*/
private PCollection<Event> sourceEventsFromKafka(Pipeline p) {
NexmarkUtils.console("Reading events from Kafka Topic %s", options.getKafkaSourceTopic());

if (Strings.isNullOrEmpty(options.getBootstrapServers())) {
throw new RuntimeException("Missing --bootstrapServers");
}

KafkaIO.Read<Long, byte[]> io = KafkaIO.<Long, byte[]>read()
.withBootstrapServers(options.getBootstrapServers())
.withTopic(options.getKafkaSourceTopic())
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class);

return p
.apply(queryName + ".ReadKafkaEvents", io.withoutMetadata())
.apply(queryName + ".KafkaToEvents", ParDo.of(BYTEARRAY_TO_EVENT));
}

/**
* Return Avro source of events from {@code options.getInputFilePrefix}.
*/
Expand Down Expand Up @@ -813,6 +885,21 @@ public void processElement(ProcessContext c) {
.apply(queryName + ".WritePubsubEvents", io);
}

/**
* Send {@code formattedResults} to Kafka.
*/
private void sinkResultsToKafka(PCollection<String> formattedResults) {
checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()),
"Missing --bootstrapServers");

PTransform<PCollection<String>, PDone> io = KafkaIO.<Long, String>write()
.withBootstrapServers(options.getBootstrapServers())
.withTopic(options.getKafkaSinkTopic())
.withKeySerializer(LongSerializer.class)
.withValueSerializer(StringSerializer.class).values();
formattedResults.apply(queryName + ".WriteKafkaResults", io);
}

/**
* Send {@code formattedResults} to Pubsub.
*/
Expand Down Expand Up @@ -923,6 +1010,9 @@ private PCollection<Event> createSource(Pipeline p, final long now) {
case AVRO:
source = sourceEventsFromAvro(p);
break;
case KAFKA:
source = sourceEventsFromKafka(p);
break;
case PUBSUB:
// Setup the sink for the publisher.
switch (configuration.pubSubMode) {
Expand Down Expand Up @@ -1010,6 +1100,9 @@ private void sink(PCollection<TimestampedValue<KnownSize>> results, long now) {
case PUBSUB:
sinkResultsToPubsub(formattedResults, now);
break;
case KAFKA:
sinkResultsToKafka(formattedResults);
break;
case TEXT:
sinkResultsToText(formattedResults, now);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,4 +406,24 @@ public interface NexmarkOptions
String getQueryLanguage();

void setQueryLanguage(String value);

@Description("Base name of Kafka source topic in streaming mode.")
@Nullable
@Default.String("nexmark-source")
String getKafkaSourceTopic();

void setKafkaSourceTopic(String value);

@Description("Base name of Kafka sink topic in streaming mode.")
@Nullable
@Default.String("nexmark-sink")
String getKafkaSinkTopic();

void setKafkaSinkTopic(String value);

@Description("Kafka Bootstrap Server domains.")
@Nullable
String getBootstrapServers();

void setBootstrapServers(String value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ public enum SourceType {
/**
* Read from a PubSub topic. It will be fed the same synthetic events by this pipeline.
*/
PUBSUB
PUBSUB,
/**
* Read events from a Kafka topic. It will be fed the same synthetic events by this pipeline.
*/
KAFKA
}

/**
Expand All @@ -118,6 +122,10 @@ public enum SinkType {
* Write to a PubSub topic. It will be drained by this pipeline.
*/
PUBSUB,
/**
* Write to a Kafka topic. It will be drained by this pipeline.
*/
KAFKA,
/**
* Write to a text file. Only works in batch mode.
*/
Expand All @@ -129,7 +137,7 @@ public enum SinkType {
/**
* Write raw Events to BigQuery.
*/
BIGQUERY,
BIGQUERY
}

/**
Expand Down

0 comments on commit 0d2db11

Please sign in to comment.