-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Example to test actual implementation with TopologyTestDriver #219
Example to test actual implementation with TopologyTestDriver #219
Conversation
It looks like @jukkakarvanen hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement here. Once you've signed reply with Appreciation of efforts, clabot |
[clabot:check] |
@confluentinc It looks like @jukkakarvanen just signed our Contributor License Agreement. 👍 Always at your service, clabot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @jukkakarvanen! Over this looks good I just have a few minor comments.
README.md
Outdated
that demonstrate end-to-end data pipelines. Here, we use a testing framework to automatically spawn embedded Kafka | ||
* **Examples under [src/test/](src/test/)**: These examples should test applications under [src/main/](src/main/). | ||
Unit Tests with TopologyTestDriver test the stream logic without external system dependencies. | ||
The integration tests use a testing framework to automatically spawn embedded Kafka |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: integration tests use a testing framework to automatically spawn
-> integration tests use an embedded Kafka
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
private ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); | ||
|
||
@Before | ||
public void setup() throws IllegalAccessException, ClassNotFoundException, InstantiationException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can remove the throws
clause and all exceptions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
/** | ||
* Stream processing unit test of {@link WordCountLambdaExample}, using TopologyTestDriver. | ||
* | ||
* @author Jukka Karvanen / jukinimi.com |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we usually don't have any author names in the javadoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
} | ||
|
||
|
||
/** Read one Record from output topic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move the text one line below the start of the javadoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
return testDriver.readOutput(WordCountLambdaExample.outputTopic, stringDeserializer, longDeserializer); | ||
} | ||
|
||
/** Read counts from output to map. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto here and elsewhere below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
*/ | ||
package io.confluent.examples.streams; | ||
|
||
import org.apache.kafka.clients.producer.ProducerRecord; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove wildcard imports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, Changed IDEA settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -111,8 +111,48 @@ | |||
*/ | |||
public class WordCountLambdaExample { | |||
|
|||
static final String inputTopic = "inputTopic"; | |||
static final String outputTopic = "outputTopic"; | |||
|
|||
public static void main(final String[] args) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can rid of throws Exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed (Old problem,)
@@ -111,8 +111,48 @@ | |||
*/ | |||
public class WordCountLambdaExample { | |||
|
|||
static final String inputTopic = "inputTopic"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These topic names need to match the names of the directions in the javadoc
inputTopic
= streams-plaintext-input
outputTopic
= streams-wordcount-output
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Problem was inherited from WordCountLambdaIntegrationTest.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jukkakarvanen ,
You have an excellent point. I'm wondering if we should even keep the integration test at all. The application logic of the WordCountExample depends purely on Streams; it has no logical interactions with the broker. Streams already guarantees that it works with the broker, so it seems like the only necessary test is to verify the application logic, which is better done with the TopologyTestDriver.
What do you think about removing the integration test entirely?
(the code looks good to me, by the way)
No idea why previous test failed due to WordCountScalaIntegrationTest failure and now it is successful again. |
These integration tests are quite flaky, which is part of my motivation to just switch to TopologyTestDriver. |
@vvcephei, Yes, I don't see the point of the this kind of integration test with EmbeddedKafka at all, if you want to test Kafka Stream application logic. I understand the use case of EmbeddedSingleNodeKafkaCluster if the application contains also consumers or producers, but not in pure Kafka Streams. Maybe somebody knowing the internals of the TopologyTestDriver could comment is there possibility of difference, but I have found TopologyTestDriver a fast and reliable way of the testing the Kafka Stream application. |
It's historical reasons. When the examples where created, there was no |
FYI: There are PR apache/kafka#6569 to some point of time get rid of try / catch in tearDown, I needed to add because I am working with Windows. I also working to propose some extra Helper class to make simplify the use of TopologyTestDriver to be added kafka-streams-test-utils (and possible independent package for old Kafka versions) I added an example of how those class could be used in this WordCountLambdaExampleTest in separate brach of my repo: jukkakarvanen/kafka-streams-examples@TopologyTestDriver_tests...jukkakarvanen:InputOutputTopic |
Thanks @jukkakarvanen , It sounds like we can actually go ahead and just get rid of the Thanks also for those other PRs. I'd say, we should just consider them all independently. If we merge the other ones first, we can update this PR, but if we merge this PR first, we can always use a new PR to improve the test further. |
@jukkakarvanen thanks for the contribution! |
@bbejeck |
I pulled down the PR and verified the |
Thanks @jukkakarvanen ! |
merged with pint |
Sorry that I am late to the game. |
Readme states: "These examples are also a good starting point to learn how to implement your own end-to-end integration tests.". I disagree. See testing instructions:
https://docs.confluent.io/current/streams/developer-guide/test-streams.html
Now the Integration Tests are mainly copy/paste of actual implementation and not testing actual implementation class at all. The most of the test could also be done without EmbeddedKafka with TopologyTestDriver.
In this pull request there is a new WordCountLambdaExampleTest added utilizing TopologyTestDriver.
To be able to test WordCountLambdaExample implementation of WordCountLambdaExample main method needed to be refactored, so actual configuration and Stream building are own methods.
WordCountLambdaIntegrationTest class is also modified to utilize Stream logic of actual application and that way avoid code duplication.
The same logic should be applied also to other tests.