Skip to content

Commit

Permalink
Adding support for Kafka Schema Registry JSON messages.
Browse files Browse the repository at this point in the history
Utilise the Content-Type to indicate KSR messages.
Deal with the 5 "magic" bytes at the start of the JSON.
  • Loading branch information
muirandy authored and uglyog committed Aug 2, 2022
1 parent a895b9a commit 270808a
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import au.com.dius.pact.core.support.Json
import au.com.dius.pact.core.support.json.JsonException
import au.com.dius.pact.core.support.json.JsonParser
import au.com.dius.pact.core.support.json.JsonValue
import au.com.dius.pact.core.support.json.KafkaSchemaRegistryJsonParser
import mu.KLogging
import org.apache.commons.codec.binary.Base64
import org.apache.commons.lang3.StringUtils
Expand Down Expand Up @@ -60,6 +61,15 @@ class Message @JvmOverloads constructor(
contents.valueAsString()
}
}
isKafkaSchemaRegistryCompliantJsonContents() -> {
try {
val json = KafkaSchemaRegistryJsonParser.parseString(contents.valueAsString())
Json.fromJson(json)
} catch (ex: JsonException) {
logger.trace(ex) { "Failed to parse Kafka Schema Registry Compliant JSON body" }
contents.valueAsString()
}
}
else -> formatContents()
}
}
Expand All @@ -83,10 +93,19 @@ class Message @JvmOverloads constructor(
}
}

private fun isKafkaSchemaRegistryCompliantJsonContents(): Boolean {
return if (contents.isPresent()) {
contentType(metaData).or(contents.contentType).isKafkaSchemaRegistryJson()
} else {
false
}
}

fun formatContents(): String {
return if (contents.isPresent()) {
val contentType = contentType(metaData).or(contents.contentType)
when {
contentType.isKafkaSchemaRegistryJson() -> KafkaSchemaRegistryJsonParser.parseString(contents.valueAsString()).prettyPrint()
contentType.isJson() -> JsonParser.parseString(contents.valueAsString()).prettyPrint()
contentType.isOctetStream() -> Base64.encodeBase64String(contentsAsBytes())
else -> contents.valueAsString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,15 @@ class MessageSpec extends Specification {

where:

body | contentType | contents
'{"A": "Value A", "B": "Value B"}' | 'application/json' | [A: 'Value A', B: 'Value B']
'{"A": "Value A", "B": "Value B"}' | '' | [A: 'Value A', B: 'Value B']
'1 2 3 4' | 'text/plain' | '1 2 3 4'
'1 2 3 4' | '' | '1 2 3 4'
new String([1, 2, 3, 4] as byte[]) | 'application/octet-stream' | 'AQIDBA=='
body | contentType | contents
'{"A": "Value A", "B": "Value B"}' | 'application/json' | [A: 'Value A', B: 'Value B']
'{"A": "Value A", "B": "Value B"}' | '' | [A: 'Value A', B: 'Value B']
'1 2 3 4' | 'text/plain' | '1 2 3 4'
'1 2 3 4' | '' | '1 2 3 4'
new String([1, 2, 3, 4] as byte[]) | 'application/octet-stream' | 'AQIDBA=='
kStr('{"A": "Value A", "B": "Value B"}') | 'application/vnd.schemaregistry.v1+json' | [A: 'Value A', B: 'Value B']
kStr('{"A": "Value A", "B": "Value B"}') | '' | [A: 'Value A', B: 'Value B']
kStr('invalid json') | 'application/vnd.schemaregistry.v1+json' | kStr('invalid json')

message = new Message('test', [], OptionalBody.body(body.bytes, new ContentType(contentType)),
new MatchingRulesImpl(), new Generators(), [contentType: contentType])
Expand Down Expand Up @@ -204,4 +207,33 @@ class MessageSpec extends Specification {
new MatchingRulesImpl(), new Generators(), ['contentType': contentType])
}

def 'kafka schema registry content type should be handled - #contentType'() {
expect:
message.formatContents() == result

where:

contentType | result
'application/vnd.schemaregistry.v1+json' | '{\n "a": 100.0,\n "b": "test"\n}'

message = new Message('test',
[],
OptionalBody.body(createKafkaSchemaRegistryCompliantBytes('{"a": 100.0, "b": "test"}'),
ContentType.fromString(contentType)),
new MatchingRulesImpl(), new Generators(), ['contentType': contentType])
}

private byte[] createKafkaSchemaRegistryCompliantBytes(String json) {
((kafkaSchemaRegistryMagicBytes() as List) << (json.bytes as List)).flatten()
}

private String kStr(String json) {
new String(kafkaSchemaRegistryMagicBytes()) + json
}

private byte[] kafkaSchemaRegistryMagicBytes() {
def zero = (byte) 0x00
def one = (byte) 0x01
new byte[] {zero, zero, zero, zero, one}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package au.com.dius.pact.core.support.json

import java.io.InputStream
import java.io.Reader

object KafkaSchemaRegistryJsonParser {


@Throws(JsonException::class)
@JvmStatic
fun parseString(json: String): JsonValue {
if (json.length > 5)
return JsonParser.parseString(removeMagicBytes(json))
return JsonParser.parseString(json)
}

private fun removeMagicBytes(contentIncludingMagicBytes: String): String {
val contentOnly = contentIncludingMagicBytes.drop(5)
return contentOnly
}

@Throws(JsonException::class)
@JvmStatic
fun parseStream(json: InputStream): JsonValue {
return JsonParser.parseStream(json)
}

@Throws(JsonException::class)
@JvmStatic
fun parseReader(reader: Reader): JsonValue {
return JsonParser.parseReader(reader)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package au.com.dius.pact.core.support.json

import spock.lang.Specification
import spock.lang.Unroll

@SuppressWarnings('LineLength')
class KafkaSchemaRegistryJsonParserSpec extends Specification {

@Unroll
def 'invalid document - #description'() {
when:
KafkaSchemaRegistryJsonParser.INSTANCE.parseString(json)

then:
thrown(JsonException)

where:

description | json
'empty document' | ''
'empty document after magic bytes' | ' '
'whitespace' | ' \t\n\r'
'whitespace after magic bytes' | ' \t\n\r'
'minus with no following digits' | ' -'
'invalid case' | 'Null'
'invalid value after other' | ' null true'
'unterminated string' | ' "null true'
'unterminated array' | '["null", true'
'unterminated object' | '{"null": true'
'invalid end array' | '12]'
'invalid end object' | 'true}'
'invalid comma' | '1234,'
'invalid object key' | '{null: true}'
'unterminated object key' | '{"null: true}'
'invalid object key' | '{"nu\\ll": true}'
'missing colon' | '{"null" true}'
'missing comma in array' | '["null" true]'
'missing comma in object' | '{"null": true "other": false}'
}

@Unroll
def 'valid document - #description'() {
when:
def value = KafkaSchemaRegistryJsonParser.INSTANCE.parseString(json)

then:
value == result

where:

description | json | result
'integer' | ' 1234' | new JsonValue.Integer('1234'.chars)
'decimal' | ' 1234.56 ' | new JsonValue.Decimal('1234.56'.chars)
'true' | ' true' | JsonValue.True.INSTANCE
'false' | ' false' | JsonValue.False.INSTANCE
'null' | ' null' | JsonValue.Null.INSTANCE
'string' | ' "null"' | new JsonValue.StringValue('null'.chars)
'array' | ' [1, 200, 3, "4"]' | new JsonValue.Array([new JsonValue.Integer('1'.chars), new JsonValue.Integer('200'.chars), new JsonValue.Integer('3'.chars), new JsonValue.StringValue('4'.chars)])
'2d array' | ' [[1, 2], 3, "4"]' | new JsonValue.Array([new JsonValue.Array([new JsonValue.Integer('1'.chars), new JsonValue.Integer('2'.chars)]), new JsonValue.Integer('3'.chars), new JsonValue.StringValue('4'.chars)])
'object' | ' {"1": 200, "3": "4"}' | new JsonValue.Object(['1': new JsonValue.Integer('200'.chars), '3': new JsonValue.StringValue('4'.chars)])
'object with decimal value 1' | ' {"1": 20.25}' | new JsonValue.Object(['1': new JsonValue.Decimal('20.25'.chars)])
'object with decimal value 2' | ' {"1": 200.25}' | new JsonValue.Object(['1': new JsonValue.Decimal('200.25'.chars)])
'2d object' | ' {"1": 2, "3": {"4":5}}' | new JsonValue.Object(['1': new JsonValue.Integer('2'.chars), '3': new JsonValue.Object(['4': new JsonValue.Integer('5'.chars)])])
'empty object' | ' {}' | new JsonValue.Object([:])
'empty array' | ' []' | new JsonValue.Array([])
'empty string' | ' ""' | new JsonValue.StringValue(''.chars)
'keys with special chars' | ' {"ä": "äbc"}' | new JsonValue.Object(['ä': new JsonValue.StringValue('äbc'.chars)])
}

@SuppressWarnings('TrailingWhitespace')
def 'parse a basic message pact'() {
given:
def pact = ''' {
"consumer": {
"name": "consumer"
},
"provider": {
"name": "provider"
},
"messages": [
{
"metaData": {
"contentType": "application/json"
},
"providerStates": [
{
"name": "message exists",
"params": {}
}
],
"contents": "Hello",
"matchingRules": {
},
"description": "a hello message"
}
],
"metadata": {
"pactSpecification": {
"version": "3.0.0"
},
"pact-jvm": {
"version": "4.0.10"
}
}
}
'''

when:
def value = KafkaSchemaRegistryJsonParser.INSTANCE.parseString(pact)

then:
value instanceof JsonValue.Object
value.entries.keySet() == ['consumer', 'provider', 'messages', 'metadata'] as Set
value.entries['consumer'] == new JsonValue.Object(['name': new JsonValue.StringValue('consumer'.chars)])
value.entries['provider'] == new JsonValue.Object(['name': new JsonValue.StringValue('provider'.chars)])
value.entries['metadata'] == new JsonValue.Object([
'pactSpecification': new JsonValue.Object(['version': new JsonValue.StringValue('3.0.0'.chars)]),
'pact-jvm': new JsonValue.Object(['version': new JsonValue.StringValue('4.0.10'.chars)])
])
}
}

0 comments on commit 270808a

Please sign in to comment.