Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Protobuf descriptor files in addition to Base64 format #529

Merged
merged 1 commit into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,12 @@ These parameters are the default values used in the topic creation page.
#### Protobuf deserialization

To deserialize topics containing data in Protobuf format, you can set topics mapping:
for each `topic-regex` you can specify `descriptor-file-base64` (descriptor file encoded to Base64 format)
and corresponding message types for keys and values. If, for example, keys are not in Protobuf format,
`key-message-type` can be omitted, the same for `value-message-type`. This configuration can be specified
for each Kafka cluster.
for each `topic-regex` you can specify `descriptor-file-base64` (descriptor file encoded to Base64 format),
or you can put descriptor files in `descriptors-folder` and specify `descriptor-file` name,
also specify corresponding message types for keys and values.
If, for example, keys are not in Protobuf format, `key-message-type` can be omitted,
the same for `value-message-type`.
This configuration can be specified for each Kafka cluster.

Example configuration can look like as follows:

Expand All @@ -313,6 +315,7 @@ akhq:
# standard kafka properties
deserialization:
protobuf:
descriptors-folder: "/app/protobuf_desc"
topics-mapping:
- topic-regex: "album.*"
descriptor-file-base64: "Cs4BCgthbGJ1bS5wcm90bxIXY29tLm5ldGNyYWNrZXIucHJvdG9idWYidwoFQWxidW0SFAoFdGl0bGUYASABKAlSBXRpdGxlEhYKBmFydGlzdBgCIAMoCVIGYXJ0aXN0EiEKDHJlbGVhc2VfeWVhchgDIAEoBVILcmVsZWFzZVllYXISHQoKc29uZ190aXRsZRgEIAMoCVIJc29uZ1RpdGxlQiUKF2NvbS5uZXRjcmFja2VyLnByb3RvYnVmQgpBbGJ1bVByb3RvYgZwcm90bzM="
Expand All @@ -321,7 +324,7 @@ akhq:
descriptor-file-base64: "CuEBCgpmaWxtLnByb3RvEhRjb20uY29tcGFueS5wcm90b2J1ZiKRAQoERmlsbRISCgRuYW1lGAEgASgJUgRuYW1lEhoKCHByb2R1Y2VyGAIgASgJUghwcm9kdWNlchIhCgxyZWxlYXNlX3llYXIYAyABKAVSC3JlbGVhc2VZZWFyEhoKCGR1cmF0aW9uGAQgASgFUghkdXJhdGlvbhIaCghzdGFycmluZxgFIAMoCVIIc3RhcnJpbmdCIQoUY29tLmNvbXBhbnkucHJvdG9idWZCCUZpbG1Qcm90b2IGcHJvdG8z"
value-message-type: "Film"
- topic-regex: "test.*"
descriptor-file-base64: "Cs4LChhzdHJlYW1pbmctcHJvdG9jb2wucHJvdG8SLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wiIwoFUG9pbnQSDAoBeBgBIAEoAVIBeBIMCgF5GAIgASgBUgF5IscFCgVEYXR1bRIfCgtjb2x1bW5fbmFtZRgBIAEoCVIKY29sdW1uTmFtZRJbCgtjb2x1bW5fdHlwZRgCIAEoDjI6LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuQ29sdW1uVHlwZVIKY29sdW1uVHlwZRIfCgtzY2hlbWFfbmFtZRgLIAEoCVIKc2NoZW1hTmFtZRJ4ChFzY2hlbWFfcGFyYW1ldGVycxgMIAMoCzJLLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW0uU2NoZW1hUGFyYW1ldGVyc0VudHJ5UhBzY2hlbWFQYXJhbWV0ZXJzEiUKDWRhdHVtX2ludGVnZXIYAyABKAVIAFIMZGF0dW1JbnRlZ2VyEh8KCmRhdHVtX2xvbmcYBCABKANIAFIJZGF0dW1Mb25nEiEKC2RhdHVtX2Zsb2F0GAUgASgCSABSCmRhdHVtRmxvYXQSIwoMZGF0dW1fZG91YmxlGAYgASgBSABSC2RhdHVtRG91YmxlEiUKDWRhdHVtX2Jvb2xlYW4YByABKAhIAFIMZGF0dW1Cb29sZWFuEiMKDGRhdHVtX3N0cmluZxgIIAEoCUgAUgtkYXR1bVN0cmluZxIhCgtkYXR1bV9ieXRlcxgJIAEoDEgAUgpkYXR1bUJ5dGVzElgKC2RhdHVtX3BvaW50GAogASgLMjUuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Qb2ludEgAUgpkYXR1bVBvaW50GkMKFVNjaGVtYVBhcmFtZXRlcnNFbnRyeRIQCgNrZXkYASABKAlSA2tleRIUCgV2YWx1ZRgCIAEoCVIFdmFsdWU6AjgBQgcKBWRhdHVtIlIKA1JvdxJLCgVkYXR1bRgBIAMoCzI1LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW1SBWRhdHVtImkKBlNvdXJjZRIcCgljb25uZWN0b3IYASABKAlSCWNvbm5lY3RvchIcCgl0aW1lc3RhbXAYAiABKARSCXRpbWVzdGFtcBIjCg1sYXN0X3NuYXBzaG90GAMgASgIUgxsYXN0U25hcHNob3QijwIKCEVudmVsb3BlEhwKCXRpbWVzdGFtcBgBIAEoBFIJdGltZXN0YW1wEhwKCW9wZXJhdGlvbhgCIAEoCVIJb3BlcmF0aW9uElAKCGRhdGFfcm93GAMgASgLMjMuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Sb3dIAFIHZGF0YVJvdxIdCglkYXRhX2pzb24YBCABKAlIAFIIZGF0YUpzb24STgoGc291cmNlGAUgASgLMjYuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Tb3VyY2VSBnNvdXJjZUIGCgRkYXRhKnMKCkNvbHVtblR5cGUSCwoHSU5URUdFUhAAEggKBExPTkcQARIJCgVGTE9BVBACEgoKBkRPVUJMRRADEgsKB0JPT0xFQU4QBBIKCgZTVFJJTkcQBRIICgRKU09OEAYSCQoFQllURVMQBxIJCgVQT0lOVBAIQkUKLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2xCEVN0cmVhbWluZ1Byb3RvY29sSAFiBnByb3RvMw=="
descriptor-file: "other.desc"
key-message-type: "Row"
value-message-type: "Envelope"
```
Expand Down
4 changes: 3 additions & 1 deletion application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ akhq:
ssl-key-store-password: key-store-password
deserialization:
protobuf:
# (optional) if descriptor-file properties are used
descriptors-folder: "/app/protobuf_desc"
topics-mapping:
- topic-regex: "album.*"
descriptor-file-base64: "Cs4BCgthbGJ1bS5wcm90bxIXY29tLm5ldGNyYWNrZXIucHJvdG9idWYidwoFQWxidW0SFAoFdGl0bGUYASABKAlSBXRpdGxlEhYKBmFydGlzdBgCIAMoCVIGYXJ0aXN0EiEKDHJlbGVhc2VfeWVhchgDIAEoBVILcmVsZWFzZVllYXISHQoKc29uZ190aXRsZRgEIAMoCVIJc29uZ1RpdGxlQiUKF2NvbS5uZXRjcmFja2VyLnByb3RvYnVmQgpBbGJ1bVByb3RvYgZwcm90bzM="
Expand All @@ -90,7 +92,7 @@ akhq:
descriptor-file-base64: "CuEBCgpmaWxtLnByb3RvEhRjb20uY29tcGFueS5wcm90b2J1ZiKRAQoERmlsbRISCgRuYW1lGAEgASgJUgRuYW1lEhoKCHByb2R1Y2VyGAIgASgJUghwcm9kdWNlchIhCgxyZWxlYXNlX3llYXIYAyABKAVSC3JlbGVhc2VZZWFyEhoKCGR1cmF0aW9uGAQgASgFUghkdXJhdGlvbhIaCghzdGFycmluZxgFIAMoCVIIc3RhcnJpbmdCIQoUY29tLmNvbXBhbnkucHJvdG9idWZCCUZpbG1Qcm90b2IGcHJvdG8z"
value-message-type: "Film"
- topic-regex: "test.*"
descriptor-file-base64: "Cs4LChhzdHJlYW1pbmctcHJvdG9jb2wucHJvdG8SLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wiIwoFUG9pbnQSDAoBeBgBIAEoAVIBeBIMCgF5GAIgASgBUgF5IscFCgVEYXR1bRIfCgtjb2x1bW5fbmFtZRgBIAEoCVIKY29sdW1uTmFtZRJbCgtjb2x1bW5fdHlwZRgCIAEoDjI6LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuQ29sdW1uVHlwZVIKY29sdW1uVHlwZRIfCgtzY2hlbWFfbmFtZRgLIAEoCVIKc2NoZW1hTmFtZRJ4ChFzY2hlbWFfcGFyYW1ldGVycxgMIAMoCzJLLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW0uU2NoZW1hUGFyYW1ldGVyc0VudHJ5UhBzY2hlbWFQYXJhbWV0ZXJzEiUKDWRhdHVtX2ludGVnZXIYAyABKAVIAFIMZGF0dW1JbnRlZ2VyEh8KCmRhdHVtX2xvbmcYBCABKANIAFIJZGF0dW1Mb25nEiEKC2RhdHVtX2Zsb2F0GAUgASgCSABSCmRhdHVtRmxvYXQSIwoMZGF0dW1fZG91YmxlGAYgASgBSABSC2RhdHVtRG91YmxlEiUKDWRhdHVtX2Jvb2xlYW4YByABKAhIAFIMZGF0dW1Cb29sZWFuEiMKDGRhdHVtX3N0cmluZxgIIAEoCUgAUgtkYXR1bVN0cmluZxIhCgtkYXR1bV9ieXRlcxgJIAEoDEgAUgpkYXR1bUJ5dGVzElgKC2RhdHVtX3BvaW50GAogASgLMjUuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Qb2ludEgAUgpkYXR1bVBvaW50GkMKFVNjaGVtYVBhcmFtZXRlcnNFbnRyeRIQCgNrZXkYASABKAlSA2tleRIUCgV2YWx1ZRgCIAEoCVIFdmFsdWU6AjgBQgcKBWRhdHVtIlIKA1JvdxJLCgVkYXR1bRgBIAMoCzI1LmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2wuRGF0dW1SBWRhdHVtImkKBlNvdXJjZRIcCgljb25uZWN0b3IYASABKAlSCWNvbm5lY3RvchIcCgl0aW1lc3RhbXAYAiABKARSCXRpbWVzdGFtcBIjCg1sYXN0X3NuYXBzaG90GAMgASgIUgxsYXN0U25hcHNob3QijwIKCEVudmVsb3BlEhwKCXRpbWVzdGFtcBgBIAEoBFIJdGltZXN0YW1wEhwKCW9wZXJhdGlvbhgCIAEoCVIJb3BlcmF0aW9uElAKCGRhdGFfcm93GAMgASgLMjMuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Sb3dIAFIHZGF0YVJvdxIdCglkYXRhX2pzb24YBCABKAlIAFIIZGF0YUpzb24STgoGc291cmNlGAUgASgLMjYuY29tLm5ldGNyYWNrZXIucHJvdG9idWYuc2VyaWFsaXphdGlvbi5wcm90b2NvbC5Tb3VyY2VSBnNvdXJjZUIGCgRkYXRhKnMKCkNvbHVtblR5cGUSCwoHSU5URUdFUhAAEggKBExPTkcQARIJCgVGTE9BVBACEgoKBkRPVUJMRRADEgsKB0JPT0xFQU4QBBIKCgZTVFJJTkcQBRIICgRKU09OEAYSCQoFQllURVMQBxIJCgVQT0lOVBAIQkUKLmNvbS5uZXRjcmFja2VyLnByb3RvYnVmLnNlcmlhbGl6YXRpb24ucHJvdG9jb2xCEVN0cmVhbWluZ1Byb3RvY29sSAFiBnByb3RvMw=="
descriptor-file: "other.desc"
key-message-type: "Row"
value-message-type: "Envelope"
# Ui Cluster Options (optional)
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/akhq/configs/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public static class SchemaRegistry {
@Data
@ConfigurationProperties("deserialization.protobuf")
public static class ProtobufDeserializationTopicsMapping {
String descriptorsFolder;
List<TopicsMapping> topicsMapping = new ArrayList<>();
}

Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/akhq/configs/TopicsMapping.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
@NoArgsConstructor
public class TopicsMapping {
String topicRegex;
String descriptorFile;
String descriptorFileBase64;
String keyMessageType;
String valueMessageType;
Expand Down
12 changes: 9 additions & 3 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,15 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey)
}
} else {
if (protobufToJsonDeserializer != null) {
String record = protobufToJsonDeserializer.deserialize(topic, payload, isKey);
if (record != null) {
return record;
try {
String record = protobufToJsonDeserializer.deserialize(topic, payload, isKey);
if (record != null) {
return record;
}
} catch (Exception exception) {
this.exceptions.add(exception.getMessage());

return new String(payload);
}
}
return new String(payload);
Expand Down
64 changes: 50 additions & 14 deletions src/main/java/org/akhq/utils/ProtobufToJsonDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.Connection;
import org.akhq.configs.TopicsMapping;
import org.apache.kafka.common.errors.SerializationException;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Collectors;

Expand All @@ -20,41 +25,63 @@
*/
@Slf4j
public class ProtobufToJsonDeserializer {
private final Connection.ProtobufDeserializationTopicsMapping protobufDeserializationTopicsMapping;
private final Map<String, List<Descriptor>> descriptors;
private final List<TopicsMapping> topicsMapping;
private final String protobufDescriptorsFolder;

public ProtobufToJsonDeserializer(Connection.ProtobufDeserializationTopicsMapping protobufDeserializationTopicsMapping) {
this.protobufDeserializationTopicsMapping = protobufDeserializationTopicsMapping;
if (protobufDeserializationTopicsMapping == null) {
this.descriptors = new HashMap<>();
this.topicsMapping = new ArrayList<>();
this.protobufDescriptorsFolder = null;
} else {
this.descriptors = buildAllDescriptors();
this.protobufDescriptorsFolder = protobufDeserializationTopicsMapping.getDescriptorsFolder();
this.topicsMapping = protobufDeserializationTopicsMapping.getTopicsMapping();
this.descriptors = buildAllDescriptors();
}
}

/**
* Check protobuf deserialization topics mapping config, get all protobuf descriptor files in Base64 format and convert to bytes
* For each descriptor file builds Descriptors list - full description with all dependencies
* Check Protobuf deserialization topics mapping config, get all Protobuf descriptor files
* from Protobuf descriptor folder or descriptor files in Base64 format and convert to bytes.
* For each descriptor file builds Descriptors list - full description with all dependencies.
*
* @return map where keys are topic regexes and values are Descriptors matching these regexes
*/
private Map<String, List<Descriptor>> buildAllDescriptors() {
List<TopicsMapping> topicsMapping = protobufDeserializationTopicsMapping.getTopicsMapping();
Map<String, List<Descriptor>> allDescriptors = new HashMap<>();
for (TopicsMapping mapping : topicsMapping) {
byte[] decodedDescriptorFile = Base64.getDecoder().decode(mapping.getDescriptorFileBase64());
byte[] fileBytes = new byte[0];
try {
fileBytes = getDescriptorFileAsBytes(mapping);
} catch (IOException e) {
throw new RuntimeException(String.format("Cannot get a descriptor file for the topics regex [%s]", mapping.getTopicRegex()), e);
}
try {
allDescriptors.put(mapping.getTopicRegex(), buildAllDescriptorsForDescriptorFile(decodedDescriptorFile));
allDescriptors.put(mapping.getTopicRegex(), buildAllDescriptorsForDescriptorFile(fileBytes));
} catch (IOException | DescriptorValidationException e) {
log.error("Cannot build Protobuf descriptors for topics regex [{}]", mapping.getTopicRegex(), e);
throw new RuntimeException(String.format("Cannot build Protobuf descriptors for the topics regex [%s]", mapping.getTopicRegex()), e);
}
}
return allDescriptors;
}

byte[] getDescriptorFileAsBytes(TopicsMapping mapping) throws IOException {
if (protobufDescriptorsFolder != null && Files.exists(Path.of(protobufDescriptorsFolder))) {
String descriptorFile = mapping.getDescriptorFile();
if (descriptorFile != null) {
String fullPath = protobufDescriptorsFolder + File.separator + descriptorFile;
return Files.readAllBytes(Path.of(fullPath));
}
}
String descriptorFileBase64 = mapping.getDescriptorFileBase64();
if (descriptorFileBase64 != null) {
return Base64.getDecoder().decode(descriptorFileBase64);
}
throw new FileNotFoundException("Protobuf descriptor file is not found for topic regex [" +
mapping.getTopicRegex() + "]. File name or Base64 file content is not specified.");
}

/**
* Builds Descriptors list for current descriptor file
*/
Expand Down Expand Up @@ -87,6 +114,7 @@ private List<Descriptor> buildAllDescriptorsForDescriptorFile(byte[] descriptorF
public String deserialize(String topic, byte[] buffer, boolean isKey) {
TopicsMapping matchingConfig = findMatchingConfig(topic);
if (matchingConfig == null) {
log.debug("Protobuf deserialization config is not found for topic [{}]", topic);
return null;
}
String messageType = matchingConfig.getValueMessageType();
Expand All @@ -95,23 +123,27 @@ public String deserialize(String topic, byte[] buffer, boolean isKey) {
}

if (messageType == null) {
return null;
throw new SerializationException(String.format("Protobuf deserialization is configured for topic [%s], " +
"but message type is not specified neither for a key, nor for a value.", topic));
}

String result = null;
String result;
try {
result = tryToDeserializeWithMessageType(buffer, matchingConfig.getTopicRegex(), messageType);
} catch (Exception e) {
log.error("Cannot deserialize message with Protobuf deserializer " +
"for topic [{}] and message type [{}]", topic, messageType, e);
throw new SerializationException(String.format("Cannot deserialize message with Protobuf deserializer " +
"for topic [%s] and message type [%s]", topic, messageType), e);
}
return result;
}

private TopicsMapping findMatchingConfig(String topic) {
for (TopicsMapping mapping : topicsMapping) {
if (topic.matches(mapping.getTopicRegex())) {
return new TopicsMapping(mapping.getTopicRegex(), mapping.getDescriptorFileBase64(), mapping.getKeyMessageType(), mapping.getValueMessageType());
return new TopicsMapping(
mapping.getTopicRegex(),
mapping.getDescriptorFile(), mapping.getDescriptorFileBase64(),
mapping.getKeyMessageType(), mapping.getValueMessageType());
}
}
return null;
Expand All @@ -124,6 +156,10 @@ private String tryToDeserializeWithMessageType(byte[] buffer, String topicRegex,
.filter(mp -> messageType.equals(mp.getName()))
.collect(Collectors.toList());

if (descriptorsForConfiguredMessageTypes.isEmpty()) {
throw new SerializationException(String.format("Not found descriptors for topic regex [%s] " +
"and message type [%s]", topicRegex, messageType));
}
for (Descriptor descriptor : descriptorsForConfiguredMessageTypes) {
String decodedMessage = tryToParseDataToJsonWithDescriptor(buffer, descriptor, descriptorsWithDependencies);
if (!decodedMessage.isEmpty()) {
Expand Down
Loading