Skip to content

Commit

Permalink
Support Protobuf descriptor files in addition to Base64 format
Browse files Browse the repository at this point in the history
  • Loading branch information
Taisiia Goltseva committed Dec 9, 2020
1 parent 92d4ee8 commit 96e7d97
Show file tree
Hide file tree
Showing 13 changed files with 2,488 additions and 29 deletions.
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,12 @@ These parameters are the default values used in the topic creation page.
#### Protobuf deserialization

To deserialize topics containing data in Protobuf format, you can set topics mapping:
for each `topic-regex` you can specify `descriptor-file-base64` (descriptor file encoded to Base64 format)
and corresponding message types for keys and values. If, for example, keys are not in Protobuf format,
`key-message-type` can be omitted, the same for `value-message-type`. This configuration can be specified
for each Kafka cluster.
for each `topic-regex` you can specify `descriptor-file-base64` (descriptor file encoded to Base64 format),
or you can put descriptor files in `descriptors-folder` and specify `descriptor-file` name,
also specify corresponding message types for keys and values.
If, for example, keys are not in Protobuf format, `key-message-type` can be omitted,
the same for `value-message-type`.
This configuration can be specified for each Kafka cluster.

Example configuration can look like as follows:

Expand All @@ -313,6 +315,7 @@ akhq:
# standard kafka properties
deserialization:
protobuf:
descriptors-folder: "/app/protobuf_desc"
topics-mapping:
- topic-regex: "album.*"
descriptor-file-base64: "Cs4BCgthbGJ1bS5wcm90bxIXY29tLm5ldGNyYWNrZXIucHJvdG9idWYidwoFQWxidW0SFAoFdGl0bGUYASABKAlSBXRpdGxlEhYKBmFydGlzdBgCIAMoCVIGYXJ0aXN0EiEKDHJlbGVhc2VfeWVhchgDIAEoBVILcmVsZWFzZVllYXISHQoKc29uZ190aXRsZRgEIAMoCVIJc29uZ1RpdGxlQiUKF2NvbS5uZXRjcmFja2VyLnByb3RvYnVmQgpBbGJ1bVByb3RvYgZwcm90bzM="
Expand All @@ -321,7 +324,7 @@ akhq:
descriptor-file-base64: "CuEBCgpmaWxtLnByb3RvEhRjb20uY29tcGFueS5wcm90b2J1ZiKRAQoERmlsbRISCgRuYW1lGAEgASgJUgRuYW1lEhoKCHByb2R1Y2VyGAIgASgJUghwcm9kdWNlchIhCgxyZWxlYXNlX3llYXIYAyABKAVSC3JlbGVhc2VZZWFyEhoKCGR1cmF0aW9uGAQgASgFUghkdXJhdGlvbhIaCghzdGFycmluZxgFIAMoCVIIc3RhcnJpbmdCIQoUY29tLmNvbXBhbnkucHJvdG9idWZCCUZpbG1Qcm90b2IGcHJvdG8z"
value-message-type: "Film"
- topic-regex: "test.*"
descriptor-file-base64: "Cs4LChhzdHJlYW1pbmctcHJvdG9jb2wucHJvdG8SLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wiIwoFUG9pbnQSDAoBeBgBIAEoAVIBeBIMCgF5GAIgASgBUgF5IscFCgVEYXR1bRIfCgtjb2x1bW5fbmFtZRgBIAEoCVIKY29sdW1uTmFtZRJbCgtjb2x1bW5fdHlwZRgCIAEoDjI6LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuQ29sdW1uVHlwZVIKY29sdW1uVHlwZRIfCgtzY2hlbWFfbmFtZRgLIAEoCVIKc2NoZW1hTmFtZRJ4ChFzY2hlbWFfcGFyYW1ldGVycxgMIAMoCzJLLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW0uU2NoZW1hUGFyYW1ldGVyc0VudHJ5UhBzY2hlbWFQYXJhbWV0ZXJzEiUKDWRhdHVtX2ludGVnZXIYAyABKAVIAFIMZGF0dW1JbnRlZ2VyEh8KCmRhdHVtX2xvbmcYBCABKANIAFIJZGF0dW1Mb25nEiEKC2RhdHVtX2Zsb2F0GAUgASgCSABSCmRhdHVtRmxvYXQSIwoMZGF0dW1fZG91YmxlGAYgASgBSABSC2RhdHVtRG91YmxlEiUKDWRhdHVtX2Jvb2xlYW4YByABKAhIAFIMZGF0dW1Cb29sZWFuEiMKDGRhdHVtX3N0cmluZxgIIAEoCUgAUgtkYXR1bVN0cmluZxIhCgtkYXR1bV9ieXRlcxgJIAEoDEgAUgpkYXR1bUJ5dGVzElgKC2RhdHVtX3BvaW50GAogASgLMjUuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Qb2ludEgAUgpkYXR1bVBvaW50GkMKFVNjaGVtYVBhcmFtZXRlcnNFbnRyeRIQCgNrZXkYASABKAlSA2tleRIUCgV2YWx1ZRgCIAEoCVIFdmFsdWU6AjgBQgcKBWRhdHVtIlIKA1JvdxJLCgVkYXR1bRgBIAMoCzI1LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW1SBWRhdHVtImkKBlNvdXJjZRIcCgljb25uZWN0b3IYASABKAlSCWNvbm5lY3RvchIcCgl0aW1lc3RhbXAYAiABKARSCXRpbWVzdGFtcBIjCg1sYXN0X3NuYXBzaG90GAMgASgIUgxsYXN0U25hcHNob3QijwIKCEVudmVsb3BlEhwKCXRpbWVzdGFtcBgBIAEoBFIJdGltZXN0YW1wEhwKCW9wZXJhdGlvbhgCIAEoCVIJb3BlcmF0aW9uElAKCGRhdGFfcm93GAMgASgLMjMuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Sb3dIAFIHZGF0YVJvdxIdCglkYXRhX2pzb24YBCABKAlIAFIIZGF0YUpzb24STgoGc291cmNlGAUgASgLMjYuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Tb3VyY2VSBnNvdXJjZUIGCgRkYXRhKnMKCkNvbHVtblR5cGUSCwoHSU5URUdFUhAAEggKBExPTkcQARIJCgVGTE9BVBACEgoKBkRPVUJMRRADEgsKB0JPT0xFQU4QBBIKCgZTVFJJTkcQBRIICgRKU09OEAYSCQoFQllURVMQBxIJCgVQT0lOVBAIQkUKLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2xCEVN0cmVhbWluZ1Byb3RvY29sSAFiBnByb3RvMw=="
descriptor-file: "other.desc"
key-message-type: "Row"
value-message-type: "Envelope"
```
Expand Down
4 changes: 3 additions & 1 deletion application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ akhq:
ssl-key-store-password: key-store-password
deserialization:
protobuf:
# (optional) if descriptor-file properties are used
descriptors-folder: "/app/protobuf_desc"
topics-mapping:
- topic-regex: "album.*"
descriptor-file-base64: "Cs4BCgthbGJ1bS5wcm90bxIXY29tLm5ldGNyYWNrZXIucHJvdG9idWYidwoFQWxidW0SFAoFdGl0bGUYASABKAlSBXRpdGxlEhYKBmFydGlzdBgCIAMoCVIGYXJ0aXN0EiEKDHJlbGVhc2VfeWVhchgDIAEoBVILcmVsZWFzZVllYXISHQoKc29uZ190aXRsZRgEIAMoCVIJc29uZ1RpdGxlQiUKF2NvbS5uZXRjcmFja2VyLnByb3RvYnVmQgpBbGJ1bVByb3RvYgZwcm90bzM="
Expand All @@ -90,7 +92,7 @@ akhq:
descriptor-file-base64: "CuEBCgpmaWxtLnByb3RvEhRjb20uY29tcGFueS5wcm90b2J1ZiKRAQoERmlsbRISCgRuYW1lGAEgASgJUgRuYW1lEhoKCHByb2R1Y2VyGAIgASgJUghwcm9kdWNlchIhCgxyZWxlYXNlX3llYXIYAyABKAVSC3JlbGVhc2VZZWFyEhoKCGR1cmF0aW9uGAQgASgFUghkdXJhdGlvbhIaCghzdGFycmluZxgFIAMoCVIIc3RhcnJpbmdCIQoUY29tLmNvbXBhbnkucHJvdG9idWZCCUZpbG1Qcm90b2IGcHJvdG8z"
value-message-type: "Film"
- topic-regex: "test.*"
descriptor-file-base64: "Cs4LChhzdHJlYW1pbmctcHJvdG9jb2wucHJvdG8SLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wiIwoFUG9pbnQSDAoBeBgBIAEoAVIBeBIMCgF5GAIgASgBUgF5IscFCgVEYXR1bRIfCgtjb2x1bW5fbmFtZRgBIAEoCVIKY29sdW1uTmFtZRJbCgtjb2x1bW5fdHlwZRgCIAEoDjI6LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuQ29sdW1uVHlwZVIKY29sdW1uVHlwZRIfCgtzY2hlbWFfbmFtZRgLIAEoCVIKc2NoZW1hTmFtZRJ4ChFzY2hlbWFfcGFyYW1ldGVycxgMIAMoCzJLLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW0uU2NoZW1hUGFyYW1ldGVyc0VudHJ5UhBzY2hlbWFQYXJhbWV0ZXJzEiUKDWRhdHVtX2ludGVnZXIYAyABKAVIAFIMZGF0dW1JbnRlZ2VyEh8KCmRhdHVtX2xvbmcYBCABKANIAFIJZGF0dW1Mb25nEiEKC2RhdHVtX2Zsb2F0GAUgASgCSABSCmRhdHVtRmxvYXQSIwoMZGF0dW1fZG91YmxlGAYgASgBSABSC2RhdHVtRG91YmxlEiUKDWRhdHVtX2Jvb2xlYW4YByABKAhIAFIMZGF0dW1Cb29sZWFuEiMKDGRhdHVtX3N0cmluZxgIIAEoCUgAUgtkYXR1bVN0cmluZxIhCgtkYXR1bV9ieXRlcxgJIAEoDEgAUgpkYXR1bUJ5dGVzElgKC2RhdHVtX3BvaW50GAogASgLMjUuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Qb2ludEgAUgpkYXR1bVBvaW50GkMKFVNjaGVtYVBhcmFtZXRlcnNFbnRyeRIQCgNrZXkYASABKAlSA2tleRIUCgV2YWx1ZRgCIAEoCVIFdmFsdWU6AjgBQgcKBWRhdHVtIlIKA1JvdxJLCgVkYXR1bRgBIAMoCzI1LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW1SBWRhdHVtImkKBlNvdXJjZRIcCgljb25uZWN0b3IYASABKAlSCWNvbm5lY3RvchIcCgl0aW1lc3RhbXAYAiABKARSCXRpbWVzdGFtcBIjCg1sYXN0X3NuYXBzaG90GAMgASgIUgxsYXN0U25hcHNob3QijwIKCEVudmVsb3BlEhwKCXRpbWVzdGFtcBgBIAEoBFIJdGltZXN0YW1wEhwKCW9wZXJhdGlvbhgCIAEoCVIJb3BlcmF0aW9uElAKCGRhdGFfcm93GAMgASgLMjMuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Sb3dIAFIHZGF0YVJvdxIdCglkYXRhX2pzb24YBCABKAlIAFIIZGF0YUpzb24STgoGc291cmNlGAUgASgLMjYuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Tb3VyY2VSBnNvdXJjZUIGCgRkYXRhKnMKCkNvbHVtblR5cGUSCwoHSU5URUdFUhAAEggKBExPTkcQARIJCgVGTE9BVBACEgoKBkRPVUJMRRADEgsKB0JPT0xFQU4QBBIKCgZTVFJJTkcQBRIICgRKU09OEAYSCQoFQllURVMQBxIJCgVQT0lOVBAIQkUKLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2xCEVN0cmVhbWluZ1Byb3RvY29sSAFiBnByb3RvMw=="
descriptor-file: "other.desc"
key-message-type: "Row"
value-message-type: "Envelope"
# Ui Cluster Options (optional)
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/akhq/configs/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public static class SchemaRegistry {
@Data
@ConfigurationProperties("deserialization.protobuf")
public static class ProtobufDeserializationTopicsMapping {
String descriptorsFolder;
List<TopicsMapping> topicsMapping = new ArrayList<>();
}

Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/akhq/configs/TopicsMapping.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
@NoArgsConstructor
public class TopicsMapping {
String topicRegex;
String descriptorFile;
String descriptorFileBase64;
String keyMessageType;
String valueMessageType;
Expand Down
12 changes: 9 additions & 3 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,15 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey)
}
} else {
if (protobufToJsonDeserializer != null) {
String record = protobufToJsonDeserializer.deserialize(topic, payload, isKey);
if (record != null) {
return record;
try {
String record = protobufToJsonDeserializer.deserialize(topic, payload, isKey);
if (record != null) {
return record;
}
} catch (Exception exception) {
this.exceptions.add(exception.getMessage());

return new String(payload);
}
}
return new String(payload);
Expand Down
64 changes: 50 additions & 14 deletions src/main/java/org/akhq/utils/ProtobufToJsonDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.Connection;
import org.akhq.configs.TopicsMapping;
import org.apache.kafka.common.errors.SerializationException;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Collectors;

Expand All @@ -20,41 +25,63 @@
*/
@Slf4j
public class ProtobufToJsonDeserializer {
private final Connection.ProtobufDeserializationTopicsMapping protobufDeserializationTopicsMapping;
private final Map<String, List<Descriptor>> descriptors;
private final List<TopicsMapping> topicsMapping;
private final String protobufDescriptorsFolder;

public ProtobufToJsonDeserializer(Connection.ProtobufDeserializationTopicsMapping protobufDeserializationTopicsMapping) {
this.protobufDeserializationTopicsMapping = protobufDeserializationTopicsMapping;
if (protobufDeserializationTopicsMapping == null) {
this.descriptors = new HashMap<>();
this.topicsMapping = new ArrayList<>();
this.protobufDescriptorsFolder = null;
} else {
this.descriptors = buildAllDescriptors();
this.protobufDescriptorsFolder = protobufDeserializationTopicsMapping.getDescriptorsFolder();
this.topicsMapping = protobufDeserializationTopicsMapping.getTopicsMapping();
this.descriptors = buildAllDescriptors();
}
}

/**
* Check protobuf deserialization topics mapping config, get all protobuf descriptor files in Base64 format and convert to bytes
* For each descriptor file builds Descriptors list - full description with all dependencies
* Check Protobuf deserialization topics mapping config, get all Protobuf descriptor files
* from Protobuf descriptor folder or descriptor files in Base64 format and convert to bytes.
* For each descriptor file builds Descriptors list - full description with all dependencies.
*
* @return map where keys are topic regexes and values are Descriptors matching these regexes
*/
private Map<String, List<Descriptor>> buildAllDescriptors() {
List<TopicsMapping> topicsMapping = protobufDeserializationTopicsMapping.getTopicsMapping();
Map<String, List<Descriptor>> allDescriptors = new HashMap<>();
for (TopicsMapping mapping : topicsMapping) {
byte[] decodedDescriptorFile = Base64.getDecoder().decode(mapping.getDescriptorFileBase64());
byte[] fileBytes = new byte[0];
try {
fileBytes = getDescriptorFileAsBytes(mapping);
} catch (IOException e) {
throw new RuntimeException(String.format("Cannot get a descriptor file for the topics regex [%s]", mapping.getTopicRegex()), e);
}
try {
allDescriptors.put(mapping.getTopicRegex(), buildAllDescriptorsForDescriptorFile(decodedDescriptorFile));
allDescriptors.put(mapping.getTopicRegex(), buildAllDescriptorsForDescriptorFile(fileBytes));
} catch (IOException | DescriptorValidationException e) {
log.error("Cannot build Protobuf descriptors for topics regex [{}]", mapping.getTopicRegex(), e);
throw new RuntimeException(String.format("Cannot build Protobuf descriptors for the topics regex [%s]", mapping.getTopicRegex()), e);
}
}
return allDescriptors;
}

byte[] getDescriptorFileAsBytes(TopicsMapping mapping) throws IOException {
if (protobufDescriptorsFolder != null && Files.exists(Path.of(protobufDescriptorsFolder))) {
String descriptorFile = mapping.getDescriptorFile();
if (descriptorFile != null) {
String fullPath = protobufDescriptorsFolder + File.separator + descriptorFile;
return Files.readAllBytes(Path.of(fullPath));
}
}
String descriptorFileBase64 = mapping.getDescriptorFileBase64();
if (descriptorFileBase64 != null) {
return Base64.getDecoder().decode(descriptorFileBase64);
}
throw new FileNotFoundException("Protobuf descriptor file is not found for topic regex [" +
mapping.getTopicRegex() + "]. File name or Base64 file content is not specified.");
}

/**
* Builds Descriptors list for current descriptor file
*/
Expand Down Expand Up @@ -87,6 +114,7 @@ private List<Descriptor> buildAllDescriptorsForDescriptorFile(byte[] descriptorF
public String deserialize(String topic, byte[] buffer, boolean isKey) {
TopicsMapping matchingConfig = findMatchingConfig(topic);
if (matchingConfig == null) {
log.debug("Protobuf deserialization config is not found for topic [{}]", topic);
return null;
}
String messageType = matchingConfig.getValueMessageType();
Expand All @@ -95,23 +123,27 @@ public String deserialize(String topic, byte[] buffer, boolean isKey) {
}

if (messageType == null) {
return null;
throw new SerializationException(String.format("Protobuf deserialization is configured for topic [%s], " +
"but message type is not specified neither for a key, nor for a value.", topic));
}

String result = null;
String result;
try {
result = tryToDeserializeWithMessageType(buffer, matchingConfig.getTopicRegex(), messageType);
} catch (Exception e) {
log.error("Cannot deserialize message with Protobuf deserializer " +
"for topic [{}] and message type [{}]", topic, messageType, e);
throw new SerializationException(String.format("Cannot deserialize message with Protobuf deserializer " +
"for topic [%s] and message type [%s]", topic, messageType), e);
}
return result;
}

private TopicsMapping findMatchingConfig(String topic) {
for (TopicsMapping mapping : topicsMapping) {
if (topic.matches(mapping.getTopicRegex())) {
return new TopicsMapping(mapping.getTopicRegex(), mapping.getDescriptorFileBase64(), mapping.getKeyMessageType(), mapping.getValueMessageType());
return new TopicsMapping(
mapping.getTopicRegex(),
mapping.getDescriptorFile(), mapping.getDescriptorFileBase64(),
mapping.getKeyMessageType(), mapping.getValueMessageType());
}
}
return null;
Expand All @@ -124,6 +156,10 @@ private String tryToDeserializeWithMessageType(byte[] buffer, String topicRegex,
.filter(mp -> messageType.equals(mp.getName()))
.collect(Collectors.toList());

if (descriptorsForConfiguredMessageTypes.isEmpty()) {
throw new SerializationException(String.format("Not found descriptors for topic regex [%s] " +
"and message type [%s]", topicRegex, messageType));
}
for (Descriptor descriptor : descriptorsForConfiguredMessageTypes) {
String decodedMessage = tryToParseDataToJsonWithDescriptor(buffer, descriptor, descriptorsWithDependencies);
if (!decodedMessage.isEmpty()) {
Expand Down
Loading

0 comments on commit 96e7d97

Please sign in to comment.