Skip to content

Commit

Permalink
Support Protobuf descriptor files in addition to Base64 format
Browse files Browse the repository at this point in the history
  • Loading branch information
Taisiia Goltseva committed Dec 7, 2020
1 parent d3770f0 commit 766d36a
Show file tree
Hide file tree
Showing 12 changed files with 1,188 additions and 21 deletions.
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,12 @@ These parameters are the default values used in the topic creation page.
#### Protobuf deserialization

To deserialize topics containing data in Protobuf format, you can set topics mapping:
for each `topic-regex` you can specify `descriptor-file-base64` (descriptor file encoded to Base64 format)
and corresponding message types for keys and values. If, for example, keys are not in Protobuf format,
`key-message-type` can be omitted, the same for `value-message-type`. This configuration can be specified
for each Kafka cluster.
for each `topic-regex` you can specify `descriptor-file-base64` (descriptor file encoded to Base64 format),
or you can put descriptor files in `descriptors-folder` and specify `descriptor-file` name,
also specify corresponding message types for keys and values.
If, for example, keys are not in Protobuf format, `key-message-type` can be omitted,
the same for `value-message-type`.
This configuration can be specified for each Kafka cluster.

Example configuration can look like as follows:

Expand All @@ -313,6 +315,7 @@ akhq:
# standard kafka properties
deserialization:
protobuf:
descriptors-folder: "/app/protobuf_desc"
topics-mapping:
- topic-regex: "album.*"
descriptor-file-base64: "Cs4BCgthbGJ1bS5wcm90bxIXY29tLm5ldGNyYWNrZXIucHJvdG9idWYidwoFQWxidW0SFAoFdGl0bGUYASABKAlSBXRpdGxlEhYKBmFydGlzdBgCIAMoCVIGYXJ0aXN0EiEKDHJlbGVhc2VfeWVhchgDIAEoBVILcmVsZWFzZVllYXISHQoKc29uZ190aXRsZRgEIAMoCVIJc29uZ1RpdGxlQiUKF2NvbS5uZXRjcmFja2VyLnByb3RvYnVmQgpBbGJ1bVByb3RvYgZwcm90bzM="
Expand All @@ -321,7 +324,7 @@ akhq:
descriptor-file-base64: "CuEBCgpmaWxtLnByb3RvEhRjb20uY29tcGFueS5wcm90b2J1ZiKRAQoERmlsbRISCgRuYW1lGAEgASgJUgRuYW1lEhoKCHByb2R1Y2VyGAIgASgJUghwcm9kdWNlchIhCgxyZWxlYXNlX3llYXIYAyABKAVSC3JlbGVhc2VZZWFyEhoKCGR1cmF0aW9uGAQgASgFUghkdXJhdGlvbhIaCghzdGFycmluZxgFIAMoCVIIc3RhcnJpbmdCIQoUY29tLmNvbXBhbnkucHJvdG9idWZCCUZpbG1Qcm90b2IGcHJvdG8z"
value-message-type: "Film"
- topic-regex: "test.*"
descriptor-file-base64: "Cs4LChhzdHJlYW1pbmctcHJvdG9jb2wucHJvdG8SLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wiIwoFUG9pbnQSDAoBeBgBIAEoAVIBeBIMCgF5GAIgASgBUgF5IscFCgVEYXR1bRIfCgtjb2x1bW5fbmFtZRgBIAEoCVIKY29sdW1uTmFtZRJbCgtjb2x1bW5fdHlwZRgCIAEoDjI6LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuQ29sdW1uVHlwZVIKY29sdW1uVHlwZRIfCgtzY2hlbWFfbmFtZRgLIAEoCVIKc2NoZW1hTmFtZRJ4ChFzY2hlbWFfcGFyYW1ldGVycxgMIAMoCzJLLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW0uU2NoZW1hUGFyYW1ldGVyc0VudHJ5UhBzY2hlbWFQYXJhbWV0ZXJzEiUKDWRhdHVtX2ludGVnZXIYAyABKAVIAFIMZGF0dW1JbnRlZ2VyEh8KCmRhdHVtX2xvbmcYBCABKANIAFIJZGF0dW1Mb25nEiEKC2RhdHVtX2Zsb2F0GAUgASgCSABSCmRhdHVtRmxvYXQSIwoMZGF0dW1fZG91YmxlGAYgASgBSABSC2RhdHVtRG91YmxlEiUKDWRhdHVtX2Jvb2xlYW4YByABKAhIAFIMZGF0dW1Cb29sZWFuEiMKDGRhdHVtX3N0cmluZxgIIAEoCUgAUgtkYXR1bVN0cmluZxIhCgtkYXR1bV9ieXRlcxgJIAEoDEgAUgpkYXR1bUJ5dGVzElgKC2RhdHVtX3BvaW50GAogASgLMjUuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Qb2ludEgAUgpkYXR1bVBvaW50GkMKFVNjaGVtYVBhcmFtZXRlcnNFbnRyeRIQCgNrZXkYASABKAlSA2tleRIUCgV2YWx1ZRgCIAEoCVIFdmFsdWU6AjgBQgcKBWRhdHVtIlIKA1JvdxJLCgVkYXR1bRgBIAMoCzI1LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW1SBWRhdHVtImkKBlNvdXJjZRIcCgljb25uZWN0b3IYASABKAlSCWNvbm5lY3RvchIcCgl0aW1lc3RhbXAYAiABKARSCXRpbWVzdGFtcBIjCg1sYXN0X3NuYXBzaG90GAMgASgIUgxsYXN0U25hcHNob3QijwIKCEVudmVsb3BlEhwKCXRpbWVzdGFtcBgBIAEoBFIJdGltZXN0YW1wEhwKCW9wZXJhdGlvbhgCIAEoCVIJb3BlcmF0aW9uElAKCGRhdGFfcm93GAMgASgLMjMuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Sb3dIAFIHZGF0YVJvdxIdCglkYXRhX2pzb24YBCABKAlIAFIIZGF0YUpzb24STgoGc291cmNlGAUgASgLMjYuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Tb3VyY2VSBnNvdXJjZUIGCgRkYXRhKnMKCkNvbHVtblR5cGUSCwoHSU5URUdFUhAAEggKBExPTkcQARIJCgVGTE9BVBACEgoKBkRPVUJMRRADEgsKB0JPT0xFQU4QBBIKCgZTVFJJTkcQBRIICgRKU09OEAYSCQoFQllURVMQBxIJCgVQT0lOVBAIQkUKLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2xCEVN0cmVhbWluZ1Byb3RvY29sSAFiBnByb3RvMw=="
descriptor-file: "other.desc"
key-message-type: "Row"
value-message-type: "Envelope"
```
Expand Down
4 changes: 3 additions & 1 deletion application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ akhq:
ssl-key-store-password: key-store-password
deserialization:
protobuf:
# (optional) if descriptor-file properties are used
descriptors-folder: "/app/protobuf_desc"
topics-mapping:
- topic-regex: "album.*"
descriptor-file-base64: "Cs4BCgthbGJ1bS5wcm90bxIXY29tLm5ldGNyYWNrZXIucHJvdG9idWYidwoFQWxidW0SFAoFdGl0bGUYASABKAlSBXRpdGxlEhYKBmFydGlzdBgCIAMoCVIGYXJ0aXN0EiEKDHJlbGVhc2VfeWVhchgDIAEoBVILcmVsZWFzZVllYXISHQoKc29uZ190aXRsZRgEIAMoCVIJc29uZ1RpdGxlQiUKF2NvbS5uZXRjcmFja2VyLnByb3RvYnVmQgpBbGJ1bVByb3RvYgZwcm90bzM="
Expand All @@ -90,7 +92,7 @@ akhq:
descriptor-file-base64: "CuEBCgpmaWxtLnByb3RvEhRjb20uY29tcGFueS5wcm90b2J1ZiKRAQoERmlsbRISCgRuYW1lGAEgASgJUgRuYW1lEhoKCHByb2R1Y2VyGAIgASgJUghwcm9kdWNlchIhCgxyZWxlYXNlX3llYXIYAyABKAVSC3JlbGVhc2VZZWFyEhoKCGR1cmF0aW9uGAQgASgFUghkdXJhdGlvbhIaCghzdGFycmluZxgFIAMoCVIIc3RhcnJpbmdCIQoUY29tLmNvbXBhbnkucHJvdG9idWZCCUZpbG1Qcm90b2IGcHJvdG8z"
value-message-type: "Film"
- topic-regex: "test.*"
descriptor-file-base64: "Cs4LChhzdHJlYW1pbmctcHJvdG9jb2wucHJvdG8SLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wiIwoFUG9pbnQSDAoBeBgBIAEoAVIBeBIMCgF5GAIgASgBUgF5IscFCgVEYXR1bRIfCgtjb2x1bW5fbmFtZRgBIAEoCVIKY29sdW1uTmFtZRJbCgtjb2x1bW5fdHlwZRgCIAEoDjI6LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuQ29sdW1uVHlwZVIKY29sdW1uVHlwZRIfCgtzY2hlbWFfbmFtZRgLIAEoCVIKc2NoZW1hTmFtZRJ4ChFzY2hlbWFfcGFyYW1ldGVycxgMIAMoCzJLLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW0uU2NoZW1hUGFyYW1ldGVyc0VudHJ5UhBzY2hlbWFQYXJhbWV0ZXJzEiUKDWRhdHVtX2ludGVnZXIYAyABKAVIAFIMZGF0dW1JbnRlZ2VyEh8KCmRhdHVtX2xvbmcYBCABKANIAFIJZGF0dW1Mb25nEiEKC2RhdHVtX2Zsb2F0GAUgASgCSABSCmRhdHVtRmxvYXQSIwoMZGF0dW1fZG91YmxlGAYgASgBSABSC2RhdHVtRG91YmxlEiUKDWRhdHVtX2Jvb2xlYW4YByABKAhIAFIMZGF0dW1Cb29sZWFuEiMKDGRhdHVtX3N0cmluZxgIIAEoCUgAUgtkYXR1bVN0cmluZxIhCgtkYXR1bV9ieXRlcxgJIAEoDEgAUgpkYXR1bUJ5dGVzElgKC2RhdHVtX3BvaW50GAogASgLMjUuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Qb2ludEgAUgpkYXR1bVBvaW50GkMKFVNjaGVtYVBhcmFtZXRlcnNFbnRyeRIQCgNrZXkYASABKAlSA2tleRIUCgV2YWx1ZRgCIAEoCVIFdmFsdWU6AjgBQgcKBWRhdHVtIlIKA1JvdxJLCgVkYXR1bRgBIAMoCzI1LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW1SBWRhdHVtImkKBlNvdXJjZRIcCgljb25uZWN0b3IYASABKAlSCWNvbm5lY3RvchIcCgl0aW1lc3RhbXAYAiABKARSCXRpbWVzdGFtcBIjCg1sYXN0X3NuYXBzaG90GAMgASgIUgxsYXN0U25hcHNob3QijwIKCEVudmVsb3BlEhwKCXRpbWVzdGFtcBgBIAEoBFIJdGltZXN0YW1wEhwKCW9wZXJhdGlvbhgCIAEoCVIJb3BlcmF0aW9uElAKCGRhdGFfcm93GAMgASgLMjMuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Sb3dIAFIHZGF0YVJvdxIdCglkYXRhX2pzb24YBCABKAlIAFIIZGF0YUpzb24STgoGc291cmNlGAUgASgLMjYuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Tb3VyY2VSBnNvdXJjZUIGCgRkYXRhKnMKCkNvbHVtblR5cGUSCwoHSU5URUdFUhAAEggKBExPTkcQARIJCgVGTE9BVBACEgoKBkRPVUJMRRADEgsKB0JPT0xFQU4QBBIKCgZTVFJJTkcQBRIICgRKU09OEAYSCQoFQllURVMQBxIJCgVQT0lOVBAIQkUKLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2xCEVN0cmVhbWluZ1Byb3RvY29sSAFiBnByb3RvMw=="
descriptor-file: "other.desc"
key-message-type: "Row"
value-message-type: "Envelope"
# Ui Cluster Options (optional)
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ dependencies {
testImplementation 'com.fasterxml.jackson.core:jackson-annotations:2.11.2'
testImplementation 'com.fasterxml.jackson.core:jackson-databind:2.11.2'
testImplementation 'org.codehaus.jackson:jackson-mapper-lgpl:1.9.13'
testImplementation 'io.github.hakky54:logcaptor:2.2.0'
}

/**********************************************************************************************************************\
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/akhq/configs/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public static class SchemaRegistry {
@Data
@ConfigurationProperties("deserialization.protobuf")
public static class ProtobufDeserializationTopicsMapping {
String descriptorsFolder;
List<TopicsMapping> topicsMapping = new ArrayList<>();
}

Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/akhq/configs/TopicsMapping.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
@NoArgsConstructor
public class TopicsMapping {
String topicRegex;
String descriptorFile;
String descriptorFileBase64;
String keyMessageType;
String valueMessageType;
Expand Down
47 changes: 38 additions & 9 deletions src/main/java/org/akhq/utils/ProtobufToJsonDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
import org.akhq.configs.Connection;
import org.akhq.configs.TopicsMapping;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Collectors;

Expand All @@ -20,41 +24,63 @@
*/
@Slf4j
public class ProtobufToJsonDeserializer {
private final Connection.ProtobufDeserializationTopicsMapping protobufDeserializationTopicsMapping;
private final Map<String, List<Descriptor>> descriptors;
private final List<TopicsMapping> topicsMapping;
private final String protobufDescriptorsFolder;

public ProtobufToJsonDeserializer(Connection.ProtobufDeserializationTopicsMapping protobufDeserializationTopicsMapping) {
this.protobufDeserializationTopicsMapping = protobufDeserializationTopicsMapping;
if (protobufDeserializationTopicsMapping == null) {
this.descriptors = new HashMap<>();
this.topicsMapping = new ArrayList<>();
this.protobufDescriptorsFolder = null;
} else {
this.descriptors = buildAllDescriptors();
this.protobufDescriptorsFolder = protobufDeserializationTopicsMapping.getDescriptorsFolder();
this.topicsMapping = protobufDeserializationTopicsMapping.getTopicsMapping();
this.descriptors = buildAllDescriptors();
}
}

/**
* Check protobuf deserialization topics mapping config, get all protobuf descriptor files in Base64 format and convert to bytes
* For each descriptor file builds Descriptors list - full description with all dependencies
* Check Protobuf deserialization topics mapping config, get all Protobuf descriptor files
* from Protobuf descriptor folder or descriptor files in Base64 format and convert to bytes.
* For each descriptor file builds Descriptors list - full description with all dependencies.
*
* @return map where keys are topic regexes and values are Descriptors matching these regexes
*/
private Map<String, List<Descriptor>> buildAllDescriptors() {
List<TopicsMapping> topicsMapping = protobufDeserializationTopicsMapping.getTopicsMapping();
Map<String, List<Descriptor>> allDescriptors = new HashMap<>();
for (TopicsMapping mapping : topicsMapping) {
byte[] decodedDescriptorFile = Base64.getDecoder().decode(mapping.getDescriptorFileBase64());
byte[] fileBytes = new byte[0];
try {
allDescriptors.put(mapping.getTopicRegex(), buildAllDescriptorsForDescriptorFile(decodedDescriptorFile));
fileBytes = getDescriptorFileAsBytes(mapping);
} catch (IOException e) {
log.error("Cannot get descriptor file for topics regex [{}]", mapping.getTopicRegex(), e);
}
try {
allDescriptors.put(mapping.getTopicRegex(), buildAllDescriptorsForDescriptorFile(fileBytes));
} catch (IOException | DescriptorValidationException e) {
log.error("Cannot build Protobuf descriptors for topics regex [{}]", mapping.getTopicRegex(), e);
}
}
return allDescriptors;
}

byte[] getDescriptorFileAsBytes(TopicsMapping mapping) throws IOException {
if (protobufDescriptorsFolder != null && Files.exists(Path.of(protobufDescriptorsFolder))) {
String descriptorFile = mapping.getDescriptorFile();
if (descriptorFile != null) {
String fullPath = protobufDescriptorsFolder + File.separator + descriptorFile;
return Files.readAllBytes(Path.of(fullPath));
}
}
String descriptorFileBase64 = mapping.getDescriptorFileBase64();
if (descriptorFileBase64 != null) {
return Base64.getDecoder().decode(descriptorFileBase64);
}
throw new FileNotFoundException("Protobuf descriptor file is not found for topic regex [" +
mapping.getTopicRegex() + "]. File name or Base64 file content is not specified.");
}

/**
* Builds Descriptors list for current descriptor file
*/
Expand Down Expand Up @@ -111,7 +137,10 @@ public String deserialize(String topic, byte[] buffer, boolean isKey) {
private TopicsMapping findMatchingConfig(String topic) {
for (TopicsMapping mapping : topicsMapping) {
if (topic.matches(mapping.getTopicRegex())) {
return new TopicsMapping(mapping.getTopicRegex(), mapping.getDescriptorFileBase64(), mapping.getKeyMessageType(), mapping.getValueMessageType());
return new TopicsMapping(
mapping.getTopicRegex(),
mapping.getDescriptorFile(), mapping.getDescriptorFileBase64(),
mapping.getKeyMessageType(), mapping.getValueMessageType());
}
}
return null;
Expand Down
Loading

0 comments on commit 766d36a

Please sign in to comment.