Skip to content

Commit

Permalink
Deserialize Protobuf messages using descriptor files
Browse files Browse the repository at this point in the history
  • Loading branch information
Taisiia Goltseva committed Nov 2, 2020
1 parent 3ebfd4b commit 0a48ec5
Show file tree
Hide file tree
Showing 21 changed files with 2,774 additions and 60 deletions.
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,40 @@ These parameters are the default values used in the topic creation page.
* `akhq.topic-data.poll-timeout`: The time, in milliseconds, spent waiting in poll if data is not available in the
buffer (default: 1000).

#### Protobuf deserialization

To deserialize topics containing data in Protobuf format, you can set topics mapping:
for each `topic-regex` you can specify `descriptor-file-base64` (descriptor file encoded to Base64 format)
and corresponding message types for keys and values. If, for example, keys are not in Protobuf format,
`key-message-type` can be omitted, the same for `value-message-type`. This configuration can be specified
for each Kafka cluster.

Example configuration can look like as follows:

```
akhq:
connections:
kafka:
properties:
# standard kafka properties
deserialization:
protobuf:
topics-mapping:
- topic-regex: "album.*"
descriptor-file-base64: "Cs4BCgthbGJ1bS5wcm90bxIXY29tLm5ldGNyYWNrZXIucHJvdG9idWYidwoFQWxidW0SFAoFdGl0bGUYASABKAlSBXRpdGxlEhYKBmFydGlzdBgCIAMoCVIGYXJ0aXN0EiEKDHJlbGVhc2VfeWVhchgDIAEoBVILcmVsZWFzZVllYXISHQoKc29uZ190aXRsZRgEIAMoCVIJc29uZ1RpdGxlQiUKF2NvbS5uZXRjcmFja2VyLnByb3RvYnVmQgpBbGJ1bVByb3RvYgZwcm90bzM="
value-message-type: "Album"
- topic-regex: "film.*"
descriptor-file-base64: "CuEBCgpmaWxtLnByb3RvEhRjb20uY29tcGFueS5wcm90b2J1ZiKRAQoERmlsbRISCgRuYW1lGAEgASgJUgRuYW1lEhoKCHByb2R1Y2VyGAIgASgJUghwcm9kdWNlchIhCgxyZWxlYXNlX3llYXIYAyABKAVSC3JlbGVhc2VZZWFyEhoKCGR1cmF0aW9uGAQgASgFUghkdXJhdGlvbhIaCghzdGFycmluZxgFIAMoCVIIc3RhcnJpbmdCIQoUY29tLmNvbXBhbnkucHJvdG9idWZCCUZpbG1Qcm90b2IGcHJvdG8z"
value-message-type: "Film"
- topic-regex: "test.*"
descriptor-file-base64: "Cs4LChhzdHJlYW1pbmctcHJvdG9jb2wucHJvdG8SLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wiIwoFUG9pbnQSDAoBeBgBIAEoAVIBeBIMCgF5GAIgASgBUgF5IscFCgVEYXR1bRIfCgtjb2x1bW5fbmFtZRgBIAEoCVIKY29sdW1uTmFtZRJbCgtjb2x1bW5fdHlwZRgCIAEoDjI6LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuQ29sdW1uVHlwZVIKY29sdW1uVHlwZRIfCgtzY2hlbWFfbmFtZRgLIAEoCVIKc2NoZW1hTmFtZRJ4ChFzY2hlbWFfcGFyYW1ldGVycxgMIAMoCzJLLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW0uU2NoZW1hUGFyYW1ldGVyc0VudHJ5UhBzY2hlbWFQYXJhbWV0ZXJzEiUKDWRhdHVtX2ludGVnZXIYAyABKAVIAFIMZGF0dW1JbnRlZ2VyEh8KCmRhdHVtX2xvbmcYBCABKANIAFIJZGF0dW1Mb25nEiEKC2RhdHVtX2Zsb2F0GAUgASgCSABSCmRhdHVtRmxvYXQSIwoMZGF0dW1fZG91YmxlGAYgASgBSABSC2RhdHVtRG91YmxlEiUKDWRhdHVtX2Jvb2xlYW4YByABKAhIAFIMZGF0dW1Cb29sZWFuEiMKDGRhdHVtX3N0cmluZxgIIAEoCUgAUgtkYXR1bVN0cmluZxIhCgtkYXR1bV9ieXRlcxgJIAEoDEgAUgpkYXR1bUJ5dGVzElgKC2RhdHVtX3BvaW50GAogASgLMjUuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Qb2ludEgAUgpkYXR1bVBvaW50GkMKFVNjaGVtYVBhcmFtZXRlcnNFbnRyeRIQCgNrZXkYASABKAlSA2tleRIUCgV2YWx1ZRgCIAEoCVIFdmFsdWU6AjgBQgcKBWRhdHVtIlIKA1JvdxJLCgVkYXR1bRgBIAMoCzI1LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW1SBWRhdHVtImkKBlNvdXJjZRIcCgljb25uZWN0b3IYASABKAlSCWNvbm5lY3RvchIcCgl0aW1lc3RhbXAYAiABKARSCXRpbWVzdGFtcBIjCg1sYXN0X3NuYXBzaG90GAMgASgIUgxsYXN0U25hcHNob3QijwIKCEVudmVsb3BlEhwKCXRpbWVzdGFtcBgBIAEoBFIJdGltZXN0YW1wEhwKCW9wZXJhdGlvbhgCIAEoCVIJb3BlcmF0aW9uElAKCGRhdGFfcm93GAMgASgLMjMuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Sb3dIAFIHZGF0YVJvdxIdCglkYXRhX2pzb24YBCABKAlIAFIIZGF0YUpzb24STgoGc291cmNlGAUgASgLMjYuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Tb3VyY2VSBnNvdXJjZUIGCgRkYXRhKnMKCkNvbHVtblR5cGUSCwoHSU5URUdFUhAAEggKBExPTkcQARIJCgVGTE9BVBACEgoKBkRPVUJMRRADEgsKB0JPT0xFQU4QBBIKCgZTVFJJTkcQBRIICgRKU09OEAYSCQoFQllURVMQBxIJCgVQT0lOVBAIQkUKLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2xCEVN0cmVhbWluZ1Byb3RvY29sSAFiBnByb3RvMw=="
key-message-type: "Row"
value-message-type: "Envelope"
```

More examples about Protobuf deserialization can be found in [tests](./src/test/java/org/akhq/utils).
Info about descriptor files generation can be found in [test resources](./src/test/resources/protobuf_proto).


### Security
* `akhq.security.default-group`: Default group for all the user even unlogged user.
Expand Down
17 changes: 15 additions & 2 deletions application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ micronaut:

server:
context-path: "" # if behind a reverse proxy, path to akhq without trailing slash (optional). Example: akhq is
# behind a reverse proxy with url http://my-server/akhq, set base-path: "/akhq".
# Not needed if you're behind a reverse proxy with subdomain http://akhq.my-server/
# behind a reverse proxy with url http://my-server/akhq, set base-path: "/akhq".
# Not needed if you're behind a reverse proxy with subdomain http://akhq.my-server/
akhq:
server:
access-log: # Access log configuration (optional)
Expand Down Expand Up @@ -80,6 +80,19 @@ akhq:
ssl-trust-store-password: trust-store-password
ssl-key-store: /app/truststore.jks
ssl-key-store-password: key-store-password
deserialization:
protobuf:
topics-mapping:
- topic-regex: "album.*"
descriptor-file-base64: "Cs4BCgthbGJ1bS5wcm90bxIXY29tLm5ldGNyYWNrZXIucHJvdG9idWYidwoFQWxidW0SFAoFdGl0bGUYASABKAlSBXRpdGxlEhYKBmFydGlzdBgCIAMoCVIGYXJ0aXN0EiEKDHJlbGVhc2VfeWVhchgDIAEoBVILcmVsZWFzZVllYXISHQoKc29uZ190aXRsZRgEIAMoCVIJc29uZ1RpdGxlQiUKF2NvbS5uZXRjcmFja2VyLnByb3RvYnVmQgpBbGJ1bVByb3RvYgZwcm90bzM="
value-message-type: "Album"
- topic-regex: "film.*"
descriptor-file-base64: "CuEBCgpmaWxtLnByb3RvEhRjb20uY29tcGFueS5wcm90b2J1ZiKRAQoERmlsbRISCgRuYW1lGAEgASgJUgRuYW1lEhoKCHByb2R1Y2VyGAIgASgJUghwcm9kdWNlchIhCgxyZWxlYXNlX3llYXIYAyABKAVSC3JlbGVhc2VZZWFyEhoKCGR1cmF0aW9uGAQgASgFUghkdXJhdGlvbhIaCghzdGFycmluZxgFIAMoCVIIc3RhcnJpbmdCIQoUY29tLmNvbXBhbnkucHJvdG9idWZCCUZpbG1Qcm90b2IGcHJvdG8z"
value-message-type: "Film"
- topic-regex: "test.*"
descriptor-file-base64: "Cs4LChhzdHJlYW1pbmctcHJvdG9jb2wucHJvdG8SLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wiIwoFUG9pbnQSDAoBeBgBIAEoAVIBeBIMCgF5GAIgASgBUgF5IscFCgVEYXR1bRIfCgtjb2x1bW5fbmFtZRgBIAEoCVIKY29sdW1uTmFtZRJbCgtjb2x1bW5fdHlwZRgCIAEoDjI6LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuQ29sdW1uVHlwZVIKY29sdW1uVHlwZRIfCgtzY2hlbWFfbmFtZRgLIAEoCVIKc2NoZW1hTmFtZRJ4ChFzY2hlbWFfcGFyYW1ldGVycxgMIAMoCzJLLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW0uU2NoZW1hUGFyYW1ldGVyc0VudHJ5UhBzY2hlbWFQYXJhbWV0ZXJzEiUKDWRhdHVtX2ludGVnZXIYAyABKAVIAFIMZGF0dW1JbnRlZ2VyEh8KCmRhdHVtX2xvbmcYBCABKANIAFIJZGF0dW1Mb25nEiEKC2RhdHVtX2Zsb2F0GAUgASgCSABSCmRhdHVtRmxvYXQSIwoMZGF0dW1fZG91YmxlGAYgASgBSABSC2RhdHVtRG91YmxlEiUKDWRhdHVtX2Jvb2xlYW4YByABKAhIAFIMZGF0dW1Cb29sZWFuEiMKDGRhdHVtX3N0cmluZxgIIAEoCUgAUgtkYXR1bVN0cmluZxIhCgtkYXR1bV9ieXRlcxgJIAEoDEgAUgpkYXR1bUJ5dGVzElgKC2RhdHVtX3BvaW50GAogASgLMjUuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Qb2ludEgAUgpkYXR1bVBvaW50GkMKFVNjaGVtYVBhcmFtZXRlcnNFbnRyeRIQCgNrZXkYASABKAlSA2tleRIUCgV2YWx1ZRgCIAEoCVIFdmFsdWU6AjgBQgcKBWRhdHVtIlIKA1JvdxJLCgVkYXR1bRgBIAMoCzI1LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW1SBWRhdHVtImkKBlNvdXJjZRIcCgljb25uZWN0b3IYASABKAlSCWNvbm5lY3RvchIcCgl0aW1lc3RhbXAYAiABKARSCXRpbWVzdGFtcBIjCg1sYXN0X3NuYXBzaG90GAMgASgIUgxsYXN0U25hcHNob3QijwIKCEVudmVsb3BlEhwKCXRpbWVzdGFtcBgBIAEoBFIJdGltZXN0YW1wEhwKCW9wZXJhdGlvbhgCIAEoCVIJb3BlcmF0aW9uElAKCGRhdGFfcm93GAMgASgLMjMuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Sb3dIAFIHZGF0YVJvdxIdCglkYXRhX2pzb24YBCABKAlIAFIIZGF0YUpzb24STgoGc291cmNlGAUgASgLMjYuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Tb3VyY2VSBnNvdXJjZUIGCgRkYXRhKnMKCkNvbHVtblR5cGUSCwoHSU5URUdFUhAAEggKBExPTkcQARIJCgVGTE9BVBACEgoKBkRPVUJMRRADEgsKB0JPT0xFQU4QBBIKCgZTVFJJTkcQBRIICgRKU09OEAYSCQoFQllURVMQBxIJCgVQT0lOVBAIQkUKLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2xCEVN0cmVhbWluZ1Byb3RvY29sSAFiBnByb3RvMw=="
key-message-type: "Row"
value-message-type: "Envelope"

my-cluster-ssl:
properties:
Expand Down
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ dependencies {
// utils
implementation group: 'org.codehaus.httpcache4j.uribuilder', name: 'uribuilder', version: '2.0.0'

// protobuf
implementation group: "com.google.protobuf", name: "protobuf-java", version: "3.13.0"
implementation group: "com.google.protobuf", name: "protobuf-java-util", version: "3.13.0"

// Password hashing
implementation group: "org.mindrot", name: "jbcrypt", version: "0.4"

Expand Down
11 changes: 10 additions & 1 deletion src/main/java/org/akhq/configs/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
import io.micronaut.context.annotation.EachProperty;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.core.convert.format.MapFormat;
import lombok.Data;
import lombok.Getter;

import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand All @@ -15,6 +16,7 @@
public class Connection extends AbstractProperties {
SchemaRegistry schemaRegistry;
List<Connect> connect;
ProtobufDeserializationTopicsMapping deserialization;

public Connection(@Parameter String name) {
super(name);
Expand All @@ -30,5 +32,12 @@ public static class SchemaRegistry {
@MapFormat(transformation = MapFormat.MapTransformation.FLAT)
Map<String, String> properties;
}

@Getter
@Data
@ConfigurationProperties("deserialization.protobuf")
public static class ProtobufDeserializationTopicsMapping {
List<TopicsMapping> topicsMapping = new ArrayList<>();
}
}

15 changes: 15 additions & 0 deletions src/main/java/org/akhq/configs/TopicsMapping.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.akhq.configs;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class TopicsMapping {
String topicRegex;
String descriptorFileBase64;
String keyMessageType;
String valueMessageType;
}
22 changes: 16 additions & 6 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import lombok.*;
import org.akhq.utils.AvroToJsonSerializer;
import org.akhq.utils.ProtobufToJsonDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand Down Expand Up @@ -34,6 +35,7 @@ public class Record {
private Map<String, String> headers = new HashMap<>();
@JsonIgnore
private KafkaAvroDeserializer kafkaAvroDeserializer;
private ProtobufToJsonDeserializer protobufToJsonDeserializer;

@Getter(AccessLevel.NONE)
private byte[] bytesKey;
Expand All @@ -59,7 +61,8 @@ public Record(RecordMetadata record, byte[] bytesKey, byte[] bytesValue, Map<Str
this.headers = headers;
}

public Record(ConsumerRecord<byte[], byte[]> record, KafkaAvroDeserializer kafkaAvroDeserializer, byte[] bytesValue) {
public Record(ConsumerRecord<byte[], byte[]> record, KafkaAvroDeserializer kafkaAvroDeserializer,
ProtobufToJsonDeserializer protobufToJsonDeserializer, byte[] bytesValue) {
this.topic = record.topic();
this.partition = record.partition();
this.offset = record.offset();
Expand All @@ -69,16 +72,17 @@ public Record(ConsumerRecord<byte[], byte[]> record, KafkaAvroDeserializer kafka
this.keySchemaId = getAvroSchemaId(this.bytesKey);
this.bytesValue = bytesValue;
this.valueSchemaId = getAvroSchemaId(this.bytesValue);
for (Header header: record.headers()) {
for (Header header : record.headers()) {
this.headers.put(header.key(), header.value() != null ? new String(header.value()) : null);
}

this.kafkaAvroDeserializer = kafkaAvroDeserializer;
this.protobufToJsonDeserializer = protobufToJsonDeserializer;
}

public String getKey() {
if (this.key == null) {
this.key = convertToString(bytesKey, keySchemaId);
this.key = convertToString(bytesKey, keySchemaId, true);
}

return this.key;
Expand All @@ -95,23 +99,29 @@ public String getKeyAsBase64() {

public String getValue() {
if (this.value == null) {
this.value = convertToString(bytesValue, valueSchemaId);
this.value = convertToString(bytesValue, valueSchemaId, false);
}

return this.value;
}

private String convertToString(byte[] payload, Integer keySchemaId) {
private String convertToString(byte[] payload, Integer schemaId, boolean isKey) {
if (payload == null) {
return null;
} else if (keySchemaId != null) {
} else if (schemaId != null) {
try {
GenericRecord record = (GenericRecord) kafkaAvroDeserializer.deserialize(topic, payload);
return AvroToJsonSerializer.toJson(record);
} catch (Exception exception) {
return new String(payload);
}
} else {
if (protobufToJsonDeserializer != null) {
String record = protobufToJsonDeserializer.deserialize(topic, payload, isKey);
if (record != null) {
return record;
}
}
return new String(payload);
}
}
Expand Down
Loading

0 comments on commit 0a48ec5

Please sign in to comment.