Skip to content

Commit

Permalink
feat(topic-data): add support for json and protobuf schema types (#755)
Browse files Browse the repository at this point in the history
close #756
close #666
  • Loading branch information
bakjos authored and tchiotludo committed Oct 24, 2021
1 parent ff07a3d commit 0fb9bb7
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 25 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ dependencies {
implementation group: "org.apache.kafka", name: "kafka-clients", version: kafkaVersion
implementation group: "io.confluent", name: "kafka-schema-registry-client", version: confluentVersion
implementation group: "io.confluent", name: "kafka-avro-serializer", version: confluentVersion
implementation group: "io.confluent", name: "kafka-json-schema-serializer", version: confluentVersion
implementation group: "io.confluent", name: "kafka-protobuf-serializer", version: confluentVersion
implementation 'org.sourcelab:kafka-connect-client:3.1.1'

// strimzi
Expand Down
8 changes: 5 additions & 3 deletions client/src/containers/Schema/SchemaList/SchemaList.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import './styles.scss';
import AceEditor from 'react-ace';
import 'ace-builds/webpack-resolver';
import 'ace-builds/src-noconflict/mode-json';
import 'ace-builds/src-noconflict/mode-protobuf';
import 'ace-builds/src-noconflict/theme-merbivore_soft';
import { toast } from 'react-toastify';
import 'react-toastify/dist/ReactToastify.css';
Expand Down Expand Up @@ -106,7 +107,8 @@ class SchemaList extends Root {
subject: schema.subject,
version: schema.version,
exception: schema.exception,
schema: schema.schema ? JSON.stringify(JSON.parse(schema.schema), null, 2) : null
schemaType: schema.schemaType,
schema: schema.schemaType === "PROTOBUF" ? schema.schema : (schema.schema ? JSON.stringify(JSON.parse(schema.schema), null, 2) : null)
});
});
this.setState({ schemasRegistry: tableSchemaRegistry, loading: false });
Expand Down Expand Up @@ -217,7 +219,7 @@ class SchemaList extends Root {
extraRowContent: (obj, col, index) => {
return (
<AceEditor
mode="json"
mode={ obj.schemaType === "PROTOBUF"? "protobuf" : "json"}
id={'value' + index}
theme="merbivore_soft"
value={obj[col.accessor]}
Expand All @@ -233,7 +235,7 @@ class SchemaList extends Root {
return (
<pre className="mb-0 khq-data-highlight">
<code>
{JSON.stringify(JSON.parse(obj[col.accessor]))}
{ obj.schemaType === "PROTOBUF"? obj[col.accessor] : JSON.stringify(JSON.parse(obj[col.accessor]))}
</code>
</pre>
);
Expand Down
52 changes: 48 additions & 4 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
package org.akhq.models;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import lombok.*;
import org.akhq.configs.SchemaRegistryType;
import org.akhq.utils.AvroToJsonSerializer;
Expand Down Expand Up @@ -34,6 +42,14 @@ public class Record {
private Map<String, String> headers = new HashMap<>();
@JsonIgnore
private Deserializer kafkaAvroDeserializer;
@JsonIgnore
private Deserializer kafkaProtoDeserializer;
@JsonIgnore
private Deserializer kafkaJsonDeserializer;

@JsonIgnore
private SchemaRegistryClient client;

private ProtobufToJsonDeserializer protobufToJsonDeserializer;

@Getter(AccessLevel.NONE)
Expand Down Expand Up @@ -69,13 +85,15 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte
this.headers = headers;
}

public Record(ConsumerRecord<byte[], byte[]> record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer,
public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record, SchemaRegistryType schemaRegistryType, Deserializer kafkaAvroDeserializer,
Deserializer kafkaJsonDeserializer, Deserializer kafkaProtoDeserializer,
ProtobufToJsonDeserializer protobufToJsonDeserializer, byte[] bytesValue) {
if (schemaRegistryType == SchemaRegistryType.TIBCO) {
this.MAGIC_BYTE = (byte) 0x80;
} else {
this.MAGIC_BYTE = 0x0;
}
this.client = client;
this.topic = record.topic();
this.partition = record.partition();
this.offset = record.offset();
Expand All @@ -91,6 +109,8 @@ public Record(ConsumerRecord<byte[], byte[]> record, SchemaRegistryType schemaRe

this.kafkaAvroDeserializer = kafkaAvroDeserializer;
this.protobufToJsonDeserializer = protobufToJsonDeserializer;
this.kafkaProtoDeserializer = kafkaProtoDeserializer;
this.kafkaJsonDeserializer = kafkaJsonDeserializer;
}

public String getKey() {
Expand Down Expand Up @@ -123,15 +143,39 @@ private String convertToString(byte[] payload, Integer schemaId, boolean isKey)
return null;
} else if (schemaId != null) {
try {
Object toType = kafkaAvroDeserializer.deserialize(topic, payload);


Object toType = null;

if (client != null) {
ParsedSchema schema = client.getSchemaById(schemaId);
if ( schema.schemaType().equals(ProtobufSchema.TYPE) ) {
toType = kafkaProtoDeserializer.deserialize(topic, payload);
if (!(toType instanceof Message)) {
return String.valueOf(toType);
}

Message dynamicMessage = (Message)toType;
return AvroToJsonSerializer.getMapper().readTree(JsonFormat.printer().print(dynamicMessage)).toString();
} else if ( schema.schemaType().equals(JsonSchema.TYPE) ) {
toType = kafkaJsonDeserializer.deserialize(topic, payload);
if ( !(toType instanceof JsonNode) ) {
return String.valueOf(toType);
}
JsonNode node = (JsonNode) toType;
return node.toString();
}
}

toType = kafkaAvroDeserializer.deserialize(topic, payload);

//for primitive avro type
if (!(toType instanceof GenericRecord)){
if (!(toType instanceof GenericRecord)) {
return String.valueOf(toType);
}

GenericRecord record = (GenericRecord) toType;
return AvroToJsonSerializer.toJson(record);

} catch (Exception exception) {
this.exceptions.add(exception.getMessage());

Expand Down
17 changes: 16 additions & 1 deletion src/main/java/org/akhq/models/Schema.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package org.akhq.models;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference;
import io.confluent.kafka.schemaregistry.json.JsonSchema;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import lombok.*;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema.Parser;
Expand All @@ -26,11 +30,15 @@ public class Schema {
private Integer version;
private Config.CompatibilityLevelConfig compatibilityLevel;
private String schema;
private String schemaType;
private List<SchemaReference> references = new ArrayList<>();

@JsonIgnore
private org.apache.avro.Schema avroSchema;

@JsonIgnore
private JsonNode jsonSchema;

private String exception;

public Schema(Schema schema, Schema.Config config) {
Expand All @@ -55,7 +63,14 @@ public Schema(io.confluent.kafka.schemaregistry.client.rest.entities.Schema sche
}
this.references = parsedSchema.references();
this.schema = parsedSchema.rawSchema().toString();
this.avroSchema = parser.parse(this.schema);
this.schemaType = schema.getSchemaType();
if (schemaType.equals(AvroSchema.TYPE)) {
this.avroSchema = parser.parse(this.schema);
} else if ( schemaType.equals(JsonSchema.TYPE)) {
this.jsonSchema = ((JsonSchema)parsedSchema).toJsonNode();
} else if ( schemaType.equals(ProtobufSchema.TYPE)) {
this.schema = parsedSchema.canonicalString();
}
} catch (AvroTypeException e) {
this.schema = null;
this.exception = e.getMessage();
Expand Down
33 changes: 32 additions & 1 deletion src/main/java/org/akhq/modules/KafkaModule.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package org.akhq.modules;

import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.security.SslFactory;
import io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProvider;
import io.confluent.kafka.schemaregistry.client.security.basicauth.BasicAuthCredentialProviderFactory;
import io.confluent.kafka.schemaregistry.client.security.basicauth.UserInfoCredentialProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -25,6 +28,7 @@
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -146,6 +150,26 @@ public AvroSchemaProvider getAvroSchemaProvider(String clusterId) {
return avroSchemaProvider;
}

public JsonSchemaProvider getJsonSchemaProvider(String clusterId) {
JsonSchemaProvider jsonSchemaProvider = new JsonSchemaProvider();
jsonSchemaProvider.configure(Collections.singletonMap(
"schemaVersionFetcher",
new CachedSchemaRegistryClient(this.getRegistryRestClient(clusterId), 100)
));

return jsonSchemaProvider;
}

public ProtobufSchemaProvider getProtobufSchemaProvider(String clusterId) {
ProtobufSchemaProvider protobufSchemaProvider = new ProtobufSchemaProvider();
protobufSchemaProvider.configure(Collections.singletonMap(
"schemaVersionFetcher",
new CachedSchemaRegistryClient(this.getRegistryRestClient(clusterId), 100)
));

return protobufSchemaProvider;
}

public RestService getRegistryRestClient(String clusterId) {
Connection connection = this.getConnection(clusterId);

Expand Down Expand Up @@ -199,10 +223,17 @@ public SchemaRegistryClient getRegistryClient(String clusterId) {
if (!this.registryClient.containsKey(clusterId)) {
Connection connection = this.getConnection(clusterId);

List<SchemaProvider> providers = new ArrayList<>();
providers.add( new AvroSchemaProvider() );
providers.add( new JsonSchemaProvider() );
providers.add( new ProtobufSchemaProvider() );

SchemaRegistryClient client = new CachedSchemaRegistryClient(
this.getRegistryRestClient(clusterId),
Integer.MAX_VALUE,
connection.getSchemaRegistry() != null ? connection.getSchemaRegistry().getProperties() : null
providers,
connection.getSchemaRegistry() != null ? connection.getSchemaRegistry().getProperties() : null,
null
);

this.registryClient.put(clusterId, client);
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/org/akhq/repositories/RecordRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.env.Environment;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.sse.Event;
import io.reactivex.Flowable;
import lombok.*;
import lombok.extern.slf4j.Slf4j;
import org.akhq.configs.SchemaRegistryType;
import org.akhq.controllers.TopicController;
import org.akhq.models.Partition;
import org.akhq.models.Record;
Expand Down Expand Up @@ -417,23 +419,33 @@ private ConsumerRecords<byte[], byte[]> poll(KafkaConsumer<byte[], byte[]> consu
}

private Record newRecord(ConsumerRecord<byte[], byte[]> record, String clusterId) {
SchemaRegistryType schemaRegistryType = this.schemaRegistryRepository.getSchemaRegistryType(clusterId);
SchemaRegistryClient client = this.kafkaModule.getRegistryClient(clusterId);
return new Record(
client,
record,
this.schemaRegistryRepository.getSchemaRegistryType(clusterId),
this.schemaRegistryRepository.getKafkaAvroDeserializer(clusterId),
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaJsonDeserializer(clusterId):null,
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaProtoDeserializer(clusterId):null,
this.customDeserializerRepository.getProtobufToJsonDeserializer(clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, client,
this.schemaRegistryRepository.getSchemaRegistryType(clusterId))
);
}

private Record newRecord(ConsumerRecord<byte[], byte[]> record, BaseOptions options) {
SchemaRegistryType schemaRegistryType = this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId);
SchemaRegistryClient client = this.kafkaModule.getRegistryClient(options.clusterId);
return new Record(
client,
record,
this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId),
schemaRegistryType,
this.schemaRegistryRepository.getKafkaAvroDeserializer(options.clusterId),
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaJsonDeserializer(options.clusterId):null,
schemaRegistryType == SchemaRegistryType.CONFLUENT? this.schemaRegistryRepository.getKafkaProtoDeserializer(options.clusterId):null,
this.customDeserializerRepository.getProtobufToJsonDeserializer(options.clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, this.kafkaModule.getRegistryClient(options.clusterId),
avroWireFormatConverter.convertValueToWireFormat(record, client,
this.schemaRegistryRepository.getSchemaRegistryType(options.clusterId))
);
}
Expand Down
Loading

0 comments on commit 0fb9bb7

Please sign in to comment.