From 0fb9bb7d4f5fe15fd4e71fc66bb063009fb34911 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giovanny=20Guti=C3=A9rrez?= Date: Thu, 8 Jul 2021 15:54:50 -0500 Subject: [PATCH] feat(topic-data): add support for json and protobuf schema types (#755) close #756 close #666 --- build.gradle | 2 + .../Schema/SchemaList/SchemaList.jsx | 8 +- src/main/java/org/akhq/models/Record.java | 52 ++++++++++++- src/main/java/org/akhq/models/Schema.java | 17 +++- .../java/org/akhq/modules/KafkaModule.java | 33 +++++++- .../akhq/repositories/RecordRepository.java | 18 ++++- .../SchemaRegistryRepository.java | 77 +++++++++++++++---- .../org/akhq/utils/AvroToJsonSerializer.java | 4 + 8 files changed, 186 insertions(+), 25 deletions(-) diff --git a/build.gradle b/build.gradle index fc47a3dfe..20597d4ff 100644 --- a/build.gradle +++ b/build.gradle @@ -84,6 +84,8 @@ dependencies { implementation group: "org.apache.kafka", name: "kafka-clients", version: kafkaVersion implementation group: "io.confluent", name: "kafka-schema-registry-client", version: confluentVersion implementation group: "io.confluent", name: "kafka-avro-serializer", version: confluentVersion + implementation group: "io.confluent", name: "kafka-json-schema-serializer", version: confluentVersion + implementation group: "io.confluent", name: "kafka-protobuf-serializer", version: confluentVersion implementation 'org.sourcelab:kafka-connect-client:3.1.1' // strimzi diff --git a/client/src/containers/Schema/SchemaList/SchemaList.jsx b/client/src/containers/Schema/SchemaList/SchemaList.jsx index 0ba89e3f8..b9c5d45f4 100644 --- a/client/src/containers/Schema/SchemaList/SchemaList.jsx +++ b/client/src/containers/Schema/SchemaList/SchemaList.jsx @@ -11,6 +11,7 @@ import './styles.scss'; import AceEditor from 'react-ace'; import 'ace-builds/webpack-resolver'; import 'ace-builds/src-noconflict/mode-json'; +import 'ace-builds/src-noconflict/mode-protobuf'; import 'ace-builds/src-noconflict/theme-merbivore_soft'; import { toast } from 'react-toastify'; import 'react-toastify/dist/ReactToastify.css'; @@ -106,7 +107,8 @@ class SchemaList extends Root { subject: schema.subject, version: schema.version, exception: schema.exception, - schema: schema.schema ? JSON.stringify(JSON.parse(schema.schema), null, 2) : null + schemaType: schema.schemaType, + schema: schema.schemaType === "PROTOBUF" ? schema.schema : (schema.schema ? JSON.stringify(JSON.parse(schema.schema), null, 2) : null) }); }); this.setState({ schemasRegistry: tableSchemaRegistry, loading: false }); @@ -217,7 +219,7 @@ class SchemaList extends Root { extraRowContent: (obj, col, index) => { return ( - {JSON.stringify(JSON.parse(obj[col.accessor]))} + { obj.schemaType === "PROTOBUF"? obj[col.accessor] : JSON.stringify(JSON.parse(obj[col.accessor]))} ); diff --git a/src/main/java/org/akhq/models/Record.java b/src/main/java/org/akhq/models/Record.java index a99e61681..2ce77cf4d 100644 --- a/src/main/java/org/akhq/models/Record.java +++ b/src/main/java/org/akhq/models/Record.java @@ -1,6 +1,14 @@ package org.akhq.models; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.protobuf.DynamicMessage; +import com.google.protobuf.Message; +import com.google.protobuf.util.JsonFormat; +import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.json.JsonSchema; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import lombok.*; import org.akhq.configs.SchemaRegistryType; import org.akhq.utils.AvroToJsonSerializer; @@ -34,6 +42,14 @@ public class Record { private Map headers = new HashMap<>(); @JsonIgnore private Deserializer kafkaAvroDeserializer; + @JsonIgnore + private Deserializer kafkaProtoDeserializer; + @JsonIgnore + private Deserializer kafkaJsonDeserializer; + + @JsonIgnore + private SchemaRegistryClient client; + private ProtobufToJsonDeserializer protobufToJsonDeserializer; @Getter(AccessLevel.NONE) @@ -69,13 +85,15 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte this.headers = headers; } - public Record(ConsumerRecord record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer, + public Record(SchemaRegistryClient client, ConsumerRecord record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer, + Deserializer kafkaJsonDeserializer, Deserializer kafkaProtoDeserializer, ProtobufToJsonDeserializer protobufToJsonDeserializer, byte[] bytesValue) { if (schemaRegistryType == SchemaRegistryType.TIBCO) { this.MAGIC_BYTE = (byte) 0x80; } else { this.MAGIC_BYTE = 0x0; } + this.client = client; this.topic = record.topic(); this.partition = record.partition(); this.offset = record.offset(); @@ -91,6 +109,8 @@ public Record(ConsumerRecord record, SchemaRegistryType schemaRe this.kafkaAvroDeserializer = kafkaAvroDeserializer; this.protobufToJsonDeserializer = protobufToJsonDeserializer; + this.kafkaProtoDeserializer = kafkaProtoDeserializer; + this.kafkaJsonDeserializer = kafkaJsonDeserializer; } public String getKey() { @@ -123,15 +143,39 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey) return null; } else if (schemaId != null) { try { - Object toType = kafkaAvroDeserializer.deserialize(topic, payload); - + + Object toType = null; + + if (client != null) { + ParsedSchema schema = client.getSchemaById(schemaId); + if ( schema.schemaType().equals(ProtobufSchema.TYPE) ) { + toType = kafkaProtoDeserializer.deserialize(topic, payload); + if (!(toType instanceof Message)) { + return String.valueOf(toType); + } + + Message dynamicMessage = (Message)toType; + return AvroToJsonSerializer.getMapper().readTree(JsonFormat.printer().print(dynamicMessage)).toString(); + } else if ( schema.schemaType().equals(JsonSchema.TYPE) ) { + toType = kafkaJsonDeserializer.deserialize(topic, payload); + if ( !(toType instanceof JsonNode) ) { + return String.valueOf(toType); + } + JsonNode node = (JsonNode) toType; + return node.toString(); + } + } + + toType = kafkaAvroDeserializer.deserialize(topic, payload); + //for primitive avro type - if (!(toType instanceof GenericRecord)){ + if (!(toType instanceof GenericRecord)) { return String.valueOf(toType); } GenericRecord record = (GenericRecord) toType; return AvroToJsonSerializer.toJson(record); + } catch (Exception exception) { this.exceptions.add(exception.getMessage()); diff --git a/src/main/java/org/akhq/models/Schema.java b/src/main/java/org/akhq/models/Schema.java index 3d3c71a55..14f9ebee9 100644 --- a/src/main/java/org/akhq/models/Schema.java +++ b/src/main/java/org/akhq/models/Schema.java @@ -1,9 +1,13 @@ package org.akhq.models; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import io.confluent.kafka.schemaregistry.ParsedSchema; +import io.confluent.kafka.schemaregistry.avro.AvroSchema; import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference; +import io.confluent.kafka.schemaregistry.json.JsonSchema; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import lombok.*; import org.apache.avro.AvroTypeException; import org.apache.avro.Schema.Parser; @@ -26,11 +30,15 @@ public class Schema { private Integer version; private Config.CompatibilityLevelConfig compatibilityLevel; private String schema; + private String schemaType; private List references = new ArrayList<>(); @JsonIgnore private org.apache.avro.Schema avroSchema; + @JsonIgnore + private JsonNode jsonSchema; + private String exception; public Schema(Schema schema, Schema.Config config) { @@ -55,7 +63,14 @@ public Schema(io.confluent.kafka.schemaregistry.client.rest.entities.Schema sche } this.references = parsedSchema.references(); this.schema = parsedSchema.rawSchema().toString(); - this.avroSchema = parser.parse(this.schema); + this.schemaType = schema.getSchemaType(); + if (schemaType.equals(AvroSchema.TYPE)) { + this.avroSchema = parser.parse(this.schema); + } else if ( schemaType.equals(JsonSchema.TYPE)) { + this.jsonSchema = ((JsonSchema)parsedSchema).toJsonNode(); + } else if ( schemaType.equals(ProtobufSchema.TYPE)) { + this.schema = parsedSchema.canonicalString(); + } } catch (AvroTypeException e) { this.schema = null; this.exception = e.getMessage(); diff --git a/src/main/java/org/akhq/modules/KafkaModule.java b/src/main/java/org/akhq/modules/KafkaModule.java index b360fad89..1181c57cc 100644 --- a/src/main/java/org/akhq/modules/KafkaModule.java +++ b/src/main/java/org/akhq/modules/KafkaModule.java @@ -1,7 +1,9 @@ package org.akhq.modules; import com.google.common.collect.ImmutableMap; +import io.confluent.kafka.schemaregistry.SchemaProvider; import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider; +import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.RestService; @@ -9,6 +11,7 @@ import io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProvider; import io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProviderFactory; import io.confluent.kafka.schemaregistry.client.security.basicauth.UserInfoCredentialProvider; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -25,6 +28,7 @@ import javax.inject.Inject; import javax.inject.Singleton; import java.io.File; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -146,6 +150,26 @@ public AvroSchemaProvider getAvroSchemaProvider(String clusterId) { return avroSchemaProvider; } + public JsonSchemaProvider getJsonSchemaProvider(String clusterId) { + JsonSchemaProvider jsonSchemaProvider = new JsonSchemaProvider(); + jsonSchemaProvider.configure(Collections.singletonMap( + "schemaVersionFetcher", + new CachedSchemaRegistryClient(this.getRegistryRestClient(clusterId), 100) + )); + + return jsonSchemaProvider; + } + + public ProtobufSchemaProvider getProtobufSchemaProvider(String clusterId) { + ProtobufSchemaProvider protobufSchemaProvider = new ProtobufSchemaProvider(); + protobufSchemaProvider.configure(Collections.singletonMap( + "schemaVersionFetcher", + new CachedSchemaRegistryClient(this.getRegistryRestClient(clusterId), 100) + )); + + return protobufSchemaProvider; + } + public RestService getRegistryRestClient(String clusterId) { Connection connection = this.getConnection(clusterId); @@ -199,10 +223,17 @@ public SchemaRegistryClient getRegistryClient(String clusterId) { if (!this.registryClient.containsKey(clusterId)) { Connection connection = this.getConnection(clusterId); + List providers = new ArrayList<>(); + providers.add( new AvroSchemaProvider() ); + providers.add( new JsonSchemaProvider() ); + providers.add( new ProtobufSchemaProvider() ); + SchemaRegistryClient client = new CachedSchemaRegistryClient( this.getRegistryRestClient(clusterId), Integer.MAX_VALUE, - connection.getSchemaRegistry() != null ? connection.getSchemaRegistry().getProperties() : null + providers, + connection.getSchemaRegistry() != null ? connection.getSchemaRegistry().getProperties() : null, + null ); this.registryClient.put(clusterId, client); diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index 5f35bf7bc..e627313ad 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.micronaut.context.annotation.Value; import io.micronaut.context.env.Environment; import io.micronaut.core.util.StringUtils; @@ -10,6 +11,7 @@ import io.reactivex.Flowable; import lombok.*; import lombok.extern.slf4j.Slf4j; +import org.akhq.configs.SchemaRegistryType; import org.akhq.controllers.TopicController; import org.akhq.models.Partition; import org.akhq.models.Record; @@ -417,23 +419,33 @@ private ConsumerRecords poll(KafkaConsumer consu } private Record newRecord(ConsumerRecord record, String clusterId) { + SchemaRegistryType schemaRegistryType = this.schemaRegistryRepository.getSchemaRegistryType(clusterId); + SchemaRegistryClient client = this.kafkaModule.getRegistryClient(clusterId); return new Record( + client, record, this.schemaRegistryRepository.getSchemaRegistryType(clusterId), this.schemaRegistryRepository.getKafkaAvroDeserializer(clusterId), + schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaJsonDeserializer(clusterId):null, + schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaProtoDeserializer(clusterId):null, this.customDeserializerRepository.getProtobufToJsonDeserializer(clusterId), - avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(clusterId), + avroWireFormatConverter.convertValueToWireFormat(record, client, this.schemaRegistryRepository.getSchemaRegistryType(clusterId)) ); } private Record newRecord(ConsumerRecord record, BaseOptions options) { + SchemaRegistryType schemaRegistryType = this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId); + SchemaRegistryClient client = this.kafkaModule.getRegistryClient(options.clusterId); return new Record( + client, record, - this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId), + schemaRegistryType, this.schemaRegistryRepository.getKafkaAvroDeserializer(options.clusterId), + schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaJsonDeserializer(options.clusterId):null, + schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaProtoDeserializer(options.clusterId):null, this.customDeserializerRepository.getProtobufToJsonDeserializer(options.clusterId), - avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(options.clusterId), + avroWireFormatConverter.convertValueToWireFormat(record, client, this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId)) ); } diff --git a/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java b/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java index 450da6c07..8f4ffc244 100644 --- a/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java +++ b/src/main/java/org/akhq/repositories/SchemaRegistryRepository.java @@ -6,8 +6,13 @@ import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference; import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.schemaregistry.json.JsonSchema; +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; import io.confluent.kafka.schemaregistry.utils.JacksonMapper; import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaJsonDeserializer; +import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; import org.akhq.configs.Connection; import org.akhq.configs.SchemaRegistryType; import org.akhq.models.Schema; @@ -36,6 +41,8 @@ public class SchemaRegistryRepository extends AbstractRepository { @Inject private KafkaModule kafkaModule; private final Map kafkaAvroDeserializers = new HashMap<>(); + private final Map kafkaJsonDeserializers = new HashMap<>(); + private final Map kafkaProtoDeserializers = new HashMap<>(); private AvroSerializer avroSerializer; public PagedList list(String clusterId, Pagination pagination, Optional search) throws IOException, RestClientException, ExecutionException, InterruptedException { @@ -58,6 +65,28 @@ private List toSchemasLatestVersion(List subjectList, String clu .collect(Collectors.toList()); } + private ParsedSchema getParsedSchema(io.confluent.kafka.schemaregistry.client.rest.entities.Schema schema, String clusterId) { + ParsedSchema parsedSchema; + if ( schema.getSchemaType().equals(JsonSchema.TYPE) ) { + parsedSchema = this.kafkaModule + .getJsonSchemaProvider(clusterId) + .parseSchema(schema.getSchema(), schema.getReferences()) + .orElse(null); + + } else if( schema.getSchemaType().equals(ProtobufSchema.TYPE)) { + parsedSchema = this.kafkaModule + .getProtobufSchemaProvider(clusterId) + .parseSchema(schema.getSchema(), schema.getReferences()) + .orElse(null); + } else { + parsedSchema = this.kafkaModule + .getAvroSchemaProvider(clusterId) + .parseSchema(schema.getSchema(), schema.getReferences()) + .orElse(null); + } + return parsedSchema; + } + public List all(String clusterId, Optional search) throws IOException, RestClientException { Optional maybeRegistryRestClient = Optional.ofNullable(kafkaModule .getRegistryRestClient(clusterId)); @@ -104,10 +133,7 @@ public Schema getLatestVersion(String clusterId, String subject) throws IOExcept .getRegistryRestClient(clusterId) .getLatestVersion(subject); - ParsedSchema parsedSchema = this.kafkaModule - .getAvroSchemaProvider(clusterId) - .parseSchema(latestVersion.getSchema(), latestVersion.getReferences()) - .orElse(null); + ParsedSchema parsedSchema = getParsedSchema(latestVersion, clusterId); return new Schema(latestVersion, parsedSchema, this.getConfig(clusterId, subject)); } @@ -127,11 +153,7 @@ public List getAllVersions(String clusterId, String subject) throws IOEx } }) .map(schema -> { - ParsedSchema parsedSchema = this.kafkaModule - .getAvroSchemaProvider(clusterId) - .parseSchema(schema.getSchema(), schema.getReferences()) - .orElse(null); - + ParsedSchema parsedSchema = getParsedSchema(schema, clusterId); return new Schema(schema, parsedSchema, config); }) .collect(Collectors.toList()); @@ -142,10 +164,7 @@ public Schema lookUpSubjectVersion(String clusterId, String subject, org.apache. .getRegistryRestClient(clusterId) .lookUpSubjectVersion(schema.toString(), subject, deleted); - ParsedSchema parsedSchema = this.kafkaModule - .getAvroSchemaProvider(clusterId) - .parseSchema(find.getSchema(), find.getReferences()) - .orElse(null); + ParsedSchema parsedSchema = getParsedSchema(find, clusterId); return new Schema(find, parsedSchema, this.getConfig(clusterId, subject)); } @@ -253,6 +272,38 @@ public Deserializer getKafkaAvroDeserializer(String clusterId) { return this.kafkaAvroDeserializers.get(clusterId); } + public Deserializer getKafkaJsonDeserializer(String clusterId) { + if (!this.kafkaJsonDeserializers.containsKey(clusterId)) { + Deserializer deserializer; + SchemaRegistryType schemaRegistryType = getSchemaRegistryType(clusterId); + if (schemaRegistryType == SchemaRegistryType.TIBCO) { + throw new IllegalArgumentException("Configured schema registry type was 'tibco', but TIBCO JSON client is not supported"); + } else { + deserializer = new KafkaJsonSchemaDeserializer(this.kafkaModule.getRegistryClient(clusterId)); + } + + this.kafkaJsonDeserializers.put(clusterId, deserializer); + } + + return this.kafkaJsonDeserializers.get(clusterId); + } + + public Deserializer getKafkaProtoDeserializer(String clusterId) { + if (!this.kafkaProtoDeserializers.containsKey(clusterId)) { + Deserializer deserializer; + SchemaRegistryType schemaRegistryType = getSchemaRegistryType(clusterId); + if (schemaRegistryType == SchemaRegistryType.TIBCO) { + throw new IllegalArgumentException("Configured schema registry type was 'tibco', but TIBCO JSON client is not supported"); + } else { + deserializer = new KafkaProtobufDeserializer(this.kafkaModule.getRegistryClient(clusterId)); + } + + this.kafkaProtoDeserializers.put(clusterId, deserializer); + } + + return this.kafkaProtoDeserializers.get(clusterId); + } + public AvroSerializer getAvroSerializer(String clusterId) { if(this.avroSerializer == null){ this.avroSerializer = new AvroSerializer(this.kafkaModule.getRegistryClient(clusterId), diff --git a/src/main/java/org/akhq/utils/AvroToJsonSerializer.java b/src/main/java/org/akhq/utils/AvroToJsonSerializer.java index 3cf487b29..4ec3bfb55 100644 --- a/src/main/java/org/akhq/utils/AvroToJsonSerializer.java +++ b/src/main/java/org/akhq/utils/AvroToJsonSerializer.java @@ -25,4 +25,8 @@ public static String toJson(GenericRecord record) throws IOException { return MAPPER.writeValueAsString(map); } + + public static ObjectMapper getMapper() { + return MAPPER; + } }