Skip to content

Commit

Permalink
Deserialize Protobuf messages using descriptor files
Browse files Browse the repository at this point in the history
  • Loading branch information
Taisiia Goltseva committed Nov 5, 2020
1 parent 3ebfd4b commit 5ff02de
Show file tree
Hide file tree
Showing 21 changed files with 2,726 additions and 13 deletions.
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,40 @@ These parameters are the default values used in the topic creation page.
* `akhq.topic-data.poll-timeout`: The time, in milliseconds, spent waiting in poll if data is not available in the
buffer (default: 1000).

#### Protobuf deserialization

To deserialize topics containing data in Protobuf format, you can set topics mapping:
for each `topic-regex` you can specify `descriptor-file-base64` (descriptor file encoded to Base64 format)
and corresponding message types for keys and values. If, for example, keys are not in Protobuf format,
`key-message-type` can be omitted, the same for `value-message-type`. This configuration can be specified
for each Kafka cluster.

Example configuration can look like as follows:

```
akhq:
connections:
kafka:
properties:
# standard kafka properties
deserialization:
protobuf:
topics-mapping:
- topic-regex: "album.*"
descriptor-file-base64: "Cs4BCgthbGJ1bS5wcm90bxIXY29tLm5ldGNyYWNrZXIucHJvdG9idWYidwoFQWxidW0SFAoFdGl0bGUYASABKAlSBXRpdGxlEhYKBmFydGlzdBgCIAMoCVIGYXJ0aXN0EiEKDHJlbGVhc2VfeWVhchgDIAEoBVILcmVsZWFzZVllYXISHQoKc29uZ190aXRsZRgEIAMoCVIJc29uZ1RpdGxlQiUKF2NvbS5uZXRjcmFja2VyLnByb3RvYnVmQgpBbGJ1bVByb3RvYgZwcm90bzM="
value-message-type: "Album"
- topic-regex: "film.*"
descriptor-file-base64: "CuEBCgpmaWxtLnByb3RvEhRjb20uY29tcGFueS5wcm90b2J1ZiKRAQoERmlsbRISCgRuYW1lGAEgASgJUgRuYW1lEhoKCHByb2R1Y2VyGAIgASgJUghwcm9kdWNlchIhCgxyZWxlYXNlX3llYXIYAyABKAVSC3JlbGVhc2VZZWFyEhoKCGR1cmF0aW9uGAQgASgFUghkdXJhdGlvbhIaCghzdGFycmluZxgFIAMoCVIIc3RhcnJpbmdCIQoUY29tLmNvbXBhbnkucHJvdG9idWZCCUZpbG1Qcm90b2IGcHJvdG8z"
value-message-type: "Film"
- topic-regex: "test.*"
descriptor-file-base64: "Cs4LChhzdHJlYW1pbmctcHJvdG9jb2wucHJvdG8SLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wiIwoFUG9pbnQSDAoBeBgBIAEoAVIBeBIMCgF5GAIgASgBUgF5IscFCgVEYXR1bRIfCgtjb2x1bW5fbmFtZRgBIAEoCVIKY29sdW1uTmFtZRJbCgtjb2x1bW5fdHlwZRgCIAEoDjI6LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuQ29sdW1uVHlwZVIKY29sdW1uVHlwZRIfCgtzY2hlbWFfbmFtZRgLIAEoCVIKc2NoZW1hTmFtZRJ4ChFzY2hlbWFfcGFyYW1ldGVycxgMIAMoCzJLLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW0uU2NoZW1hUGFyYW1ldGVyc0VudHJ5UhBzY2hlbWFQYXJhbWV0ZXJzEiUKDWRhdHVtX2ludGVnZXIYAyABKAVIAFIMZGF0dW1JbnRlZ2VyEh8KCmRhdHVtX2xvbmcYBCABKANIAFIJZGF0dW1Mb25nEiEKC2RhdHVtX2Zsb2F0GAUgASgCSABSCmRhdHVtRmxvYXQSIwoMZGF0dW1fZG91YmxlGAYgASgBSABSC2RhdHVtRG91YmxlEiUKDWRhdHVtX2Jvb2xlYW4YByABKAhIAFIMZGF0dW1Cb29sZWFuEiMKDGRhdHVtX3N0cmluZxgIIAEoCUgAUgtkYXR1bVN0cmluZxIhCgtkYXR1bV9ieXRlcxgJIAEoDEgAUgpkYXR1bUJ5dGVzElgKC2RhdHVtX3BvaW50GAogASgLMjUuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Qb2ludEgAUgpkYXR1bVBvaW50GkMKFVNjaGVtYVBhcmFtZXRlcnNFbnRyeRIQCgNrZXkYASABKAlSA2tleRIUCgV2YWx1ZRgCIAEoCVIFdmFsdWU6AjgBQgcKBWRhdHVtIlIKA1JvdxJLCgVkYXR1bRgBIAMoCzI1LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW1SBWRhdHVtImkKBlNvdXJjZRIcCgljb25uZWN0b3IYASABKAlSCWNvbm5lY3RvchIcCgl0aW1lc3RhbXAYAiABKARSCXRpbWVzdGFtcBIjCg1sYXN0X3NuYXBzaG90GAMgASgIUgxsYXN0U25hcHNob3QijwIKCEVudmVsb3BlEhwKCXRpbWVzdGFtcBgBIAEoBFIJdGltZXN0YW1wEhwKCW9wZXJhdGlvbhgCIAEoCVIJb3BlcmF0aW9uElAKCGRhdGFfcm93GAMgASgLMjMuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Sb3dIAFIHZGF0YVJvdxIdCglkYXRhX2pzb24YBCABKAlIAFIIZGF0YUpzb24STgoGc291cmNlGAUgASgLMjYuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Tb3VyY2VSBnNvdXJjZUIGCgRkYXRhKnMKCkNvbHVtblR5cGUSCwoHSU5URUdFUhAAEggKBExPTkcQARIJCgVGTE9BVBACEgoKBkRPVUJMRRADEgsKB0JPT0xFQU4QBBIKCgZTVFJJTkcQBRIICgRKU09OEAYSCQoFQllURVMQBxIJCgVQT0lOVBAIQkUKLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2xCEVN0cmVhbWluZ1Byb3RvY29sSAFiBnByb3RvMw=="
key-message-type: "Row"
value-message-type: "Envelope"
```

More examples about Protobuf deserialization can be found in [tests](./src/test/java/org/akhq/utils).
Info about descriptor files generation can be found in [test resources](./src/test/resources/protobuf_proto).


### Security
* `akhq.security.default-group`: Default group for all the user even unlogged user.
Expand Down
13 changes: 13 additions & 0 deletions application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,19 @@ akhq:
ssl-trust-store-password: trust-store-password
ssl-key-store: /app/truststore.jks
ssl-key-store-password: key-store-password
deserialization:
protobuf:
topics-mapping:
- topic-regex: "album.*"
descriptor-file-base64: "Cs4BCgthbGJ1bS5wcm90bxIXY29tLm5ldGNyYWNrZXIucHJvdG9idWYidwoFQWxidW0SFAoFdGl0bGUYASABKAlSBXRpdGxlEhYKBmFydGlzdBgCIAMoCVIGYXJ0aXN0EiEKDHJlbGVhc2VfeWVhchgDIAEoBVILcmVsZWFzZVllYXISHQoKc29uZ190aXRsZRgEIAMoCVIJc29uZ1RpdGxlQiUKF2NvbS5uZXRjcmFja2VyLnByb3RvYnVmQgpBbGJ1bVByb3RvYgZwcm90bzM="
value-message-type: "Album"
- topic-regex: "film.*"
descriptor-file-base64: "CuEBCgpmaWxtLnByb3RvEhRjb20uY29tcGFueS5wcm90b2J1ZiKRAQoERmlsbRISCgRuYW1lGAEgASgJUgRuYW1lEhoKCHByb2R1Y2VyGAIgASgJUghwcm9kdWNlchIhCgxyZWxlYXNlX3llYXIYAyABKAVSC3JlbGVhc2VZZWFyEhoKCGR1cmF0aW9uGAQgASgFUghkdXJhdGlvbhIaCghzdGFycmluZxgFIAMoCVIIc3RhcnJpbmdCIQoUY29tLmNvbXBhbnkucHJvdG9idWZCCUZpbG1Qcm90b2IGcHJvdG8z"
value-message-type: "Film"
- topic-regex: "test.*"
descriptor-file-base64: "Cs4LChhzdHJlYW1pbmctcHJvdG9jb2wucHJvdG8SLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wiIwoFUG9pbnQSDAoBeBgBIAEoAVIBeBIMCgF5GAIgASgBUgF5IscFCgVEYXR1bRIfCgtjb2x1bW5fbmFtZRgBIAEoCVIKY29sdW1uTmFtZRJbCgtjb2x1bW5fdHlwZRgCIAEoDjI6LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuQ29sdW1uVHlwZVIKY29sdW1uVHlwZRIfCgtzY2hlbWFfbmFtZRgLIAEoCVIKc2NoZW1hTmFtZRJ4ChFzY2hlbWFfcGFyYW1ldGVycxgMIAMoCzJLLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW0uU2NoZW1hUGFyYW1ldGVyc0VudHJ5UhBzY2hlbWFQYXJhbWV0ZXJzEiUKDWRhdHVtX2ludGVnZXIYAyABKAVIAFIMZGF0dW1JbnRlZ2VyEh8KCmRhdHVtX2xvbmcYBCABKANIAFIJZGF0dW1Mb25nEiEKC2RhdHVtX2Zsb2F0GAUgASgCSABSCmRhdHVtRmxvYXQSIwoMZGF0dW1fZG91YmxlGAYgASgBSABSC2RhdHVtRG91YmxlEiUKDWRhdHVtX2Jvb2xlYW4YByABKAhIAFIMZGF0dW1Cb29sZWFuEiMKDGRhdHVtX3N0cmluZxgIIAEoCUgAUgtkYXR1bVN0cmluZxIhCgtkYXR1bV9ieXRlcxgJIAEoDEgAUgpkYXR1bUJ5dGVzElgKC2RhdHVtX3BvaW50GAogASgLMjUuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Qb2ludEgAUgpkYXR1bVBvaW50GkMKFVNjaGVtYVBhcmFtZXRlcnNFbnRyeRIQCgNrZXkYASABKAlSA2tleRIUCgV2YWx1ZRgCIAEoCVIFdmFsdWU6AjgBQgcKBWRhdHVtIlIKA1JvdxJLCgVkYXR1bRgBIAMoCzI1LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW1SBWRhdHVtImkKBlNvdXJjZRIcCgljb25uZWN0b3IYASABKAlSCWNvbm5lY3RvchIcCgl0aW1lc3RhbXAYAiABKARSCXRpbWVzdGFtcBIjCg1sYXN0X3NuYXBzaG90GAMgASgIUgxsYXN0U25hcHNob3QijwIKCEVudmVsb3BlEhwKCXRpbWVzdGFtcBgBIAEoBFIJdGltZXN0YW1wEhwKCW9wZXJhdGlvbhgCIAEoCVIJb3BlcmF0aW9uElAKCGRhdGFfcm93GAMgASgLMjMuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Sb3dIAFIHZGF0YVJvdxIdCglkYXRhX2pzb24YBCABKAlIAFIIZGF0YUpzb24STgoGc291cmNlGAUgASgLMjYuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Tb3VyY2VSBnNvdXJjZUIGCgRkYXRhKnMKCkNvbHVtblR5cGUSCwoHSU5URUdFUhAAEggKBExPTkcQARIJCgVGTE9BVBACEgoKBkRPVUJMRRADEgsKB0JPT0xFQU4QBBIKCgZTVFJJTkcQBRIICgRKU09OEAYSCQoFQllURVMQBxIJCgVQT0lOVBAIQkUKLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2xCEVN0cmVhbWluZ1Byb3RvY29sSAFiBnByb3RvMw=="
key-message-type: "Row"
value-message-type: "Envelope"

my-cluster-ssl:
properties:
Expand Down
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ dependencies {
// utils
implementation group: 'org.codehaus.httpcache4j.uribuilder', name: 'uribuilder', version: '2.0.0'

// protobuf
implementation group: "com.google.protobuf", name: "protobuf-java", version: "3.13.0"
implementation group: "com.google.protobuf", name: "protobuf-java-util", version: "3.13.0"

// Password hashing
implementation group: "org.mindrot", name: "jbcrypt", version: "0.4"

Expand Down
11 changes: 10 additions & 1 deletion src/main/java/org/akhq/configs/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import io.micronaut.context.annotation.EachProperty;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.core.convert.format.MapFormat;
import lombok.Data;
import lombok.Getter;

import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand All @@ -15,6 +16,7 @@
public class Connection extends AbstractProperties {
SchemaRegistry schemaRegistry;
List<Connect> connect;
ProtobufDeserializationTopicsMapping deserialization;

public Connection(@Parameter String name) {
super(name);
Expand All @@ -30,5 +32,12 @@ public static class SchemaRegistry {
@MapFormat(transformation = MapFormat.MapTransformation.FLAT)
Map<String, String> properties;
}

@Getter
@Data
@ConfigurationProperties("deserialization.protobuf")
public static class ProtobufDeserializationTopicsMapping {
List<TopicsMapping> topicsMapping = new ArrayList<>();
}
}

15 changes: 15 additions & 0 deletions src/main/java/org/akhq/configs/TopicsMapping.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.akhq.configs;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class TopicsMapping {
String topicRegex;
String descriptorFileBase64;
String keyMessageType;
String valueMessageType;
}
20 changes: 15 additions & 5 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import lombok.*;
import org.akhq.utils.AvroToJsonSerializer;
import org.akhq.utils.ProtobufToJsonDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand Down Expand Up @@ -34,6 +35,7 @@ public class Record {
private Map<String, String> headers = new HashMap<>();
@JsonIgnore
private KafkaAvroDeserializer kafkaAvroDeserializer;
private ProtobufToJsonDeserializer protobufToJsonDeserializer;

@Getter(AccessLevel.NONE)
private byte[] bytesKey;
Expand All @@ -59,7 +61,8 @@ public Record(RecordMetadata record, byte[] bytesKey, byte[] bytesValue, Map<Str
this.headers = headers;
}

public Record(ConsumerRecord<byte[], byte[]> record, KafkaAvroDeserializer kafkaAvroDeserializer, byte[] bytesValue) {
public Record(ConsumerRecord<byte[], byte[]> record, KafkaAvroDeserializer kafkaAvroDeserializer,
ProtobufToJsonDeserializer protobufToJsonDeserializer, byte[] bytesValue) {
this.topic = record.topic();
this.partition = record.partition();
this.offset = record.offset();
Expand All @@ -74,11 +77,12 @@ public Record(ConsumerRecord<byte[], byte[]> record, KafkaAvroDeserializer kafka
}

this.kafkaAvroDeserializer = kafkaAvroDeserializer;
this.protobufToJsonDeserializer = protobufToJsonDeserializer;
}

public String getKey() {
if (this.key == null) {
this.key = convertToString(bytesKey, keySchemaId);
this.key = convertToString(bytesKey, keySchemaId, true);
}

return this.key;
Expand All @@ -95,23 +99,29 @@ public String getKeyAsBase64() {

public String getValue() {
if (this.value == null) {
this.value = convertToString(bytesValue, valueSchemaId);
this.value = convertToString(bytesValue, valueSchemaId, false);
}

return this.value;
}

private String convertToString(byte[] payload, Integer keySchemaId) {
private String convertToString(byte[] payload, Integer schemaId, boolean isKey) {
if (payload == null) {
return null;
} else if (keySchemaId != null) {
} else if (schemaId != null) {
try {
GenericRecord record = (GenericRecord) kafkaAvroDeserializer.deserialize(topic, payload);
return AvroToJsonSerializer.toJson(record);
} catch (Exception exception) {
return new String(payload);
}
} else {
if (protobufToJsonDeserializer != null) {
String record = protobufToJsonDeserializer.deserialize(topic, payload, isKey);
if (record != null) {
return record;
}
}
return new String(payload);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/akhq/modules/KafkaModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public List<String> getClustersList() {
.collect(Collectors.toList());
}

private Connection getConnection(String cluster) {
public Connection getConnection(String cluster) {
return this.connections
.stream()
.filter(r -> r.getName().equals(cluster))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.akhq.repositories;

import org.akhq.modules.KafkaModule;
import org.akhq.utils.ProtobufToJsonDeserializer;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.HashMap;
import java.util.Map;

@Singleton
public class CustomDeserializerRepository {
@Inject
private KafkaModule kafkaModule;
private final Map<String, ProtobufToJsonDeserializer> protobufToJsonDeserializers = new HashMap<>();

public ProtobufToJsonDeserializer getProtobufToJsonDeserializer(String clusterId) {
if (!this.protobufToJsonDeserializers.containsKey(clusterId)) {
this.protobufToJsonDeserializers.put(
clusterId,
new ProtobufToJsonDeserializer(this.kafkaModule.getConnection(clusterId).getDeserialization())
);
}
return this.protobufToJsonDeserializers.get(clusterId);
}
}
10 changes: 4 additions & 6 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,6 @@
import org.akhq.modules.AvroSerializer;
import org.akhq.modules.KafkaModule;
import org.akhq.utils.Debug;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.codehaus.httpcache4j.uri.URIBuilder;

import java.util.*;
import java.util.concurrent.ExecutionException;
Expand All @@ -55,6 +49,9 @@ public class RecordRepository extends AbstractRepository {
@Inject
private SchemaRegistryRepository schemaRegistryRepository;

@Inject
private CustomDeserializerRepository customDeserializerRepository;

@Inject
private AvroWireFormatConverter avroWireFormatConverter;

Expand Down Expand Up @@ -381,6 +378,7 @@ private Record newRecord(ConsumerRecord<byte[], byte[]> record, BaseOptions opti
return new Record(
record,
this.schemaRegistryRepository.getKafkaAvroDeserializer(options.clusterId),
this.customDeserializerRepository.getProtobufToJsonDeserializer(options.clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(options.clusterId))
);
}
Expand Down
Loading

0 comments on commit 5ff02de

Please sign in to comment.