The Kipes SDK simplifies the implementation of Kafka stream processing applications. The SDK provides a high-level interface to describe stream analytics, eliminates the need for much of the repetitive technical boilerplate code, and provides scaffolding to set up stream processing microservices quickly and structured.
We built the SDK applying the concept of Linux command pipes, making it easy to pick a specific command for each stream transformation case and forward the results to the next. The SDK commands cover areas like:
- Event and field manipulation
- Event filtering
- Event correlation
- Statistical evaluations
- Event time adjustments
With these dedicated commands, Engineers can directly create complex stream-processing applications in a much more business logic-aligned language.
Example
Story: As a ProductMarketer I want to know how many customers visited a particular Product but didn't purchased it, so that I can identify what are the most visited Products that not get purchased."
KipesBuilder.init(streamsBuilder)
.from(topicShopEvents)
.transaction()
.groupBy((key, value) -> value.getSessionId())
.startswith((key, value) -> value.getType() == ProductVisited)
.endswith((key, value) -> value.getType() == NoPurchase)
.emit(END)
.stats()
.groupBy((key, value) -> value.getProductId())
.count().as("qtyVisitedButNotBought")
.build()
.to(topicProductStats);
Besides this easy to use stream processing commands the SDK provides specialized test classes so that Engineers can quickly set up unit tests around their stream topologies without connecting to an actual running Kafka cluster. The testbed speeds up development and delivery time and makes testing and understanding complex applications more accessible.
To further speed up the development of stream-processing microservices, our Kipes SDK comes with dedicated classes and blueprints to scaffold microservices quickly. We support multiple application frameworks like Micronaut or Spring Boot (planned).
- Kipes SDK
- High-level, multi-faceted stream processing commands in a fluent API
- Out-of-the-box serializers for JSON, Avro, and Protobuf
- Custom serializer support
- Stream testing utilities
- And more!
- Java 11 or higher
Add the required dependencies streams-kafka
and/or streams-kafka-test
to your project using Maven or Gradle.
<dependencies>
<!-- Streams Kafka -->
<dependency>
<groupId>io.kipe</groupId>
<artifactId>streams-kafka</artifactId>
<version>${kipes.version}</version>
</dependency>
<!-- Streams Kafka Test (Optional) -->
<dependency>
<groupId>io.kipe</groupId>
<artifactId>streams-kafka-test</artifactId>
<version>${kipes.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
dependencies {
// Streams Kafka
implementation "io.kipe:streams-kafka:$kipesVersion"
// Streams Kafka Test (Optional)
testImplementation "io.kipe:streams-kafka-test:$kipesVersion"
}
Follow these steps to create a KipesBuilder instance, define an input KStream, and build the stream topology by chaining operations:
// 1. Create a KipesBuilder instance with a StreamsBuilder object
StreamsBuilder streamsBuilder = new StreamsBuilder();
KipesBuilder<K, V> kipesBuilder = KipesBuilder.init(streamsBuilder);
// 2. Define the input KStream and pass it to the from() method
KStream<String, Integer> inputStream = streamsBuilder.stream("inputTopic");
kipesBuilder.from(inputStream, Serdes.String(), Serdes.Integer());
// 3. Chain operations on the KipesBuilder instance to build the stream topology
kipesBuilder
.logDebug("Input")
.filter((key, value) -> value > 0)
.logDebug("Filtered")
.to(outputTopic);
GenericRecord
is a flexible data representation in the Kipes SDK for storing and manipulating records with various
fields. It allows reading and writing data without generating code based on a specific schema, making it ideal for
evolving data structures or handling data with different field combinations. The Kipes SDK uses GenericRecord
in
builder
classes such as EvalBuilder
, BinBuilder
, StatsBuilder
, and TableBuilder
.
Create a GenericRecord
instance and set field values using the fluent interface or the set()
method:
GenericRecord record=GenericRecord.create()
.with("sensorId","S001")
.with("timestamp",1628493021L)
.with("temperature",25.6);
record.set("newField": "value");
Retrieve field values with get(fieldName)
and perform advanced operations using other GenericRecord
methods.
The Kipes SDK comes with pre-packaged serializers for JSON, Avro, and Protobuf. To use custom serializers or override default serializers, provide a Serde to the builder methods that require streams.
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app-id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
StreamsConfig config = new StreamsConfig(props);
For JSON serialization and deserialization using Jackson, obtain Serde instances through the JsonSerdeFactory:
Serde<MyDataClass> jsonSerde = JsonSerdeFactory.getJsonSerde(MyDataClass.class);
For Avro serialization and deserialization using Confluent classes, obtain Serde instances through the AvroSerdeFactory. Here are some options:
Serde<FooEvent> serde = AvroSerdeFactory.createSpecificAvroSerde(SCHEMA_REGISTRY_URL_CONFIG,false);
GenericAvroSerde serde = AvroSerdeFactory.createGenericAvroSerde(SCHEMA_REGISTRY_URL_CONFIG,false);
PrimitiveAvroSerde<Integer> serde = AvroSerdeFactory.createPrimitiveAvroSerde(SCHEMA_REGISTRY_URL_CONFIG,false);
For Protobuf serialization and deserialization using Confluent classes, obtain Serde instances through the ProtobufSerdeFactory. Here's an option:
KafkaProtobufSerde<Message> protoSerde=ProtobufSerdeFactory.createProtoSerde(SCHEMA_REGISTRY_URL_CONFIG,false);
Kipes SDK provides testing support for Kipe topologies through two base classes:
AbstractTopologyTest
AbstractGenericRecordProcessorTopologyTest
These classes utilize TopologyTestDriver
to test Kipe applications without a running Kafka cluster.
To configure topology-specific properties, pass a map of properties into the super()
method in the constructor of your
test class:
AbstractTopologyTest
is a base class for testing Kipe applications using TopologyTestDriver
. To create tests for
your builders, follow these steps:
- Extend
AbstractTopologyTest
. - Implement
initTopology()
andinitTestTopics()
to set up the topology and test topics. - Create test input and output topics using
TopologyTestContext
. - Send and receive messages using
TestInputTopic
andTestOutputTopic
.
In this example, we will create a test for the simple topology from the "Initialization and Building Stream Topologies" section.
For example, in the SimpleTopologyTest class, you can pass an empty map:``
First, extend AbstractTopologyTest
and implement the required methods:
import io.kipe.sdk.testing.AbstractTopologyTest;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
class SimpleTopologyTest extends AbstractTopologyTest {
private final String INPUT_TOPIC = "inputTopic";
private final String OUTPUT_TOPIC = "outputTopic";
private TestInputTopic<String, Integer> inputTopic;
private TestOutputTopic<String, Integer> outputTopic;
public SimpleTopologyTest() {
super(Map.of());
}
@Override
protected void initTopology(TopologyTestContext topologyTestContext) {
KipesBuilder<?, ?> kipesBuilder = KipesBuilder.init(topologyTestContext.getStreamsBuilder());
kipesBuilder
.from(topologyTestContext.createKStream(INPUT_TOPIC, Serdes.String(), Serdes.Integer()), Serdes.String(), Serdes.Integer())
.logDebug("Input")
.filter((key, value) -> value > 1)
.logDebug("Filtered")
.to(OUTPUT_TOPIC);
}
@Override
protected void initTestTopics(TopologyTestContext topologyTestContext) {
this.inputTopic = topologyTestContext.createTestInputTopic(INPUT_TOPIC, Serdes.String(), Serdes.Integer());
this.outputTopic = topologyTestContext.createTestOutputTopic(OUTPUT_TOPIC, Serdes.String(), Serdes.Integer());
}
}
Now, add a test method to send input records and verify the output records:
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
class SimpleTopologyTest extends AbstractTopologyTest {
// ...
@Test
void testFilterPositiveValues() {
// Send 5 input records to the input topic
inputTopic.pipeInput("key1", 5);
inputTopic.pipeInput("key2", -3);
inputTopic.pipeInput("key3", 7);
inputTopic.pipeInput("key4", 0);
inputTopic.pipeInput("key5", 2);
// Get 3 records back after the filter
assertEquals(3, this.outputTopic.getQueueSize());
}
}
For topologies processing GenericRecords
, extend AbstractGenericRecordProcessorTopologyTest
:
- Extend
AbstractGenericRecordProcessorTopologyTest
. - Override
addGenericRecordProcessor()
to add the specific processor. This abstracts initializing the topology and topics. - Send
GenericRecords
to the input topic using provided utility methods.
Here's a simple example of using KipesBuilder
to create a stream topology:
KipesBuilder<String, Integer> kipesBuilder = KipesBuilder.init(streamsBuilder);
// Chain various operations on the KipesBuilder instance
kipesBuilder
.from(inputStream, Serdes.String(), Serdes.Integer())
.logDebug("Input")
.filter((key, value) -> value > 0)
.logDebug("Filtered")
.to(outputTopic);
// run the stream…
This example demonstrates using KipesBuilder
and sub-builders to create a more complex stream topology:
KipesBuilder<String, GenericRecord> builder = KipesBuilder
.init(streamsBuilder)
.from(inputStream)
.withTopicsBaseName(SOURCE);
builder
.bin()
.field("input")
.span(0.1)
.build()
.to(TARGET);
TODO: Add instructions on how to generate project documentation, e.g., with GitHub Pages or another documentation tool.
Contributions are welcome! Please read the contributing.md file for guidelines on how to contribute to this project.
This project is licensed under the GNU Lesser General Public License v3.0.