-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2852] Add support for Kafka as source/sink on Nexmark #5019
[BEAM-2852] Add support for Kafka as source/sink on Nexmark #5019
Conversation
R: @echauchot |
Thanks @aromanenko-dev ! |
Thanks @aromanenko-dev ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
some small changes + one new small feature + include my PR
/** | ||
* Send {@code events} to Kafka. | ||
*/ | ||
private void sinkEventsToKafka(PCollection<Event> events) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please wire this method up and add a COMBINED mode similar to what is done in Pub/Sub?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO I think we should refactor the whole COMBINED mode:
- See NexmarkLauncher#createSource: it does a switch on the source type to configure sink.
- it sends synthetic events to sink when in COMBINED mode but NexmarkUtils#COMBINED states that combine modes is for "Both publish and consume, but as separate jobs".
Once refactored to something more coherent, implement it for kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is important in the connection with MOM:
- have the ability to keep a track of the generated events that lead to a benchmark result
- be able to read events from a topic
- write benchmark results to topic
* Send {@code events} to Kafka. | ||
*/ | ||
private void sinkEventsToKafka(PCollection<Event> events) { | ||
PTransform<PCollection<byte[]>, PDone> io = KafkaIO.<Long, byte[]>write() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a comment that I had on the previous PR. IMHO it is very wired code: it explicitely uses PTransform in place of Write transform and also it specifies a key and the associated coder whereas there is no key in the input PCollection. To be quicker I submited a PR to your repo so that you could include the fix in that PR branch. See aromanenko-dev#2
throw new RuntimeException("Missing --bootstrapServers"); | ||
} | ||
|
||
KafkaIO.Read<Long, byte[]> io = KafkaIO.<Long, byte[]>read() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename io
to read
private PCollection<Event> sourceEventsFromKafka(Pipeline p) { | ||
NexmarkUtils.console("Reading events from Kafka Topic %s", options.getKafkaSourceTopic()); | ||
|
||
if (Strings.isNullOrEmpty(options.getBootstrapServers())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use checkArgument
checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()), | ||
"Missing --bootstrapServers"); | ||
|
||
PTransform<PCollection<String>, PDone> io = KafkaIO.<Long, String>write() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above See my PR #2 on your repo
2e744e5
to
8f7d724
Compare
@aromanenko-dev please update gradle build to reflect maven one, squash this commit with code style one and run "Run Java PreCommit" to launch gradle build. LGTM at green lights of gradle build |
…n the PCollection
[BEAM-2852] Adjust gradle build with maven
ca3436c
to
6df0add
Compare
Run Java PreCommit |
Merging this PR, we will takle COMBINED mode (refactoring this mode in pub/sub and apply it to kafka) in another PR. I opened a ticket for that: https://issues.apache.org/jira/browse/BEAM-4048 |
Thanks! @aromanenko-dev |
@vectorijk Thank you for your initial work! |
Allows to use Kafka as as source/sink for Nexmark benchmark.
Based on original implementation of #3937 by @vectorijk
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue.mvn clean verify
to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.