Skip to content

Commit

Permalink
Fix null key and exception about read-only ByteBuffer that happens wh… (
Browse files Browse the repository at this point in the history
#78)

* Fix null key and exception about read-only ByteBuffer that happens when using a JsonConverter in a source connector.

* Fix null key and exception about read-only ByteBuffer that happens when using a JsonConverter in a source connector.
  • Loading branch information
kamalaboulhosn authored Apr 7, 2017
1 parent 5070be6 commit e013cb3
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -129,7 +128,7 @@ public List<SourceRecord> poll() throws InterruptedException {
Map<String, String> messageAttributes = message.getAttributes();
String key = messageAttributes.get(kafkaMessageKeyAttribute);
ByteString messageData = message.getData();
ByteBuffer messageBytes = messageData.asReadOnlyByteBuffer();
byte[] messageBytes = messageData.toByteArray();

boolean hasAttributes =
messageAttributes.size() > 1 || (messageAttributes.size() > 0 && key == null);
Expand Down Expand Up @@ -165,7 +164,7 @@ record =
null,
kafkaTopic,
selectPartition(key, value),
Schema.STRING_SCHEMA,
Schema.OPTIONAL_STRING_SCHEMA,
key,
valueSchema,
value);
Expand All @@ -176,7 +175,7 @@ record =
null,
kafkaTopic,
selectPartition(key, messageBytes),
Schema.STRING_SCHEMA,
Schema.OPTIONAL_STRING_SCHEMA,
key,
Schema.BYTES_SCHEMA,
messageBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
////////////////////////////////////////////////////////////////////////////////
package com.google.pubsub.kafka.source;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
Expand All @@ -34,10 +35,10 @@
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -60,7 +61,7 @@ public class CloudPubSubSourceTaskTest {
private static final String KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE = "jumped";
private static final String KAFKA_PARTITIONS = "3";
private static final ByteString CPS_MESSAGE = ByteString.copyFromUtf8("over");
private static final ByteBuffer KAFKA_VALUE = CPS_MESSAGE.asReadOnlyByteBuffer();
private static final byte[] KAFKA_VALUE = CPS_MESSAGE.toByteArray();
private static final String ACK_ID1 = "ackID1";
private static final String ACK_ID2 = "ackID2";
private static final String ACK_ID3 = "ackID3";
Expand All @@ -70,6 +71,31 @@ public class CloudPubSubSourceTaskTest {
private Map<String, String> props;
private CloudPubSubSubscriber subscriber;

/**
* Compare two SourceRecords. This is necessary because the records' values contain a byte[] and
* the .equals on a SourceRecord does not take this into account.
*/
public void assertRecordsEqual(SourceRecord sr1, SourceRecord sr2) {
assertEquals(sr1.key(), sr2.key());
assertEquals(sr1.keySchema(), sr2.keySchema());
assertEquals(sr1.valueSchema(), sr2.valueSchema());
assertEquals(sr1.topic(), sr2.topic());

if (sr1.valueSchema() == Schema.BYTES_SCHEMA) {
assertArrayEquals((byte[])sr1.value(), (byte[])sr2.value());
} else {
for(Field f : sr1.valueSchema().fields()) {
if (f.name().equals(ConnectorUtils.KAFKA_MESSAGE_CPS_BODY_FIELD)) {
assertArrayEquals(((Struct)sr1.value()).getBytes(f.name()),
((Struct)sr2.value()).getBytes(f.name()));
} else {
assertEquals(((Struct)sr1.value()).getString(f.name()),
((Struct)sr2.value()).getString(f.name()));
}
}
}
}

@Before
public void setup() {
subscriber = mock(CloudPubSubSubscriber.class, RETURNS_DEEP_STUBS);
Expand Down Expand Up @@ -164,11 +190,11 @@ public void testPollWithNoMessageKeyAttribute() throws Exception {
null,
KAFKA_TOPIC,
0,
Schema.STRING_SCHEMA,
Schema.OPTIONAL_STRING_SCHEMA,
null,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
assertEquals(expected, result.get(0));
assertRecordsEqual(expected, result.get(0));
}

/**
Expand All @@ -192,11 +218,11 @@ public void testPollWithMessageKeyAttribute() throws Exception {
null,
KAFKA_TOPIC,
0,
Schema.STRING_SCHEMA,
Schema.OPTIONAL_STRING_SCHEMA,
KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
assertEquals(expected, result.get(0));
assertRecordsEqual(expected, result.get(0));
}

/**
Expand Down Expand Up @@ -232,11 +258,11 @@ public void testPollWithMultipleAttributes() throws Exception {
null,
KAFKA_TOPIC,
0,
Schema.STRING_SCHEMA,
Schema.OPTIONAL_STRING_SCHEMA,
KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE,
expectedSchema,
expectedValue);
assertEquals(expected, result.get(0));
assertRecordsEqual(expected, result.get(0));
}

/**
Expand Down Expand Up @@ -268,7 +294,7 @@ public void testPollWithPartitionSchemeHashKey() throws Exception {
null,
KAFKA_TOPIC,
KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE.hashCode() % Integer.parseInt(KAFKA_PARTITIONS),
Schema.STRING_SCHEMA,
Schema.OPTIONAL_STRING_SCHEMA,
KAFKA_MESSAGE_KEY_ATTRIBUTE_VALUE,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
Expand All @@ -278,13 +304,13 @@ public void testPollWithPartitionSchemeHashKey() throws Exception {
null,
KAFKA_TOPIC,
0,
Schema.STRING_SCHEMA,
Schema.OPTIONAL_STRING_SCHEMA,
null,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);

assertEquals(expectedForMessageWithKey, result.get(0));
assertEquals(expectedForMessageWithoutKey.value(), result.get(1).value());
assertRecordsEqual(expectedForMessageWithKey, result.get(0));
assertArrayEquals((byte[])expectedForMessageWithoutKey.value(), (byte[])result.get(1).value());
}

/** Tests that the correct partition is assigned when the partition scheme is "hash_value". */
Expand All @@ -306,11 +332,11 @@ public void testPollWithPartitionSchemeHashValue() throws Exception {
null,
KAFKA_TOPIC,
KAFKA_VALUE.hashCode() % Integer.parseInt(KAFKA_PARTITIONS),
Schema.STRING_SCHEMA,
Schema.OPTIONAL_STRING_SCHEMA,
null,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
assertEquals(expected, result.get(0));
assertRecordsEqual(expected, result.get(0));
}

/**
Expand Down Expand Up @@ -342,7 +368,7 @@ public void testPollWithPartitionSchemeRoundRobin() throws Exception {
null,
KAFKA_TOPIC,
0,
Schema.STRING_SCHEMA,
Schema.OPTIONAL_STRING_SCHEMA,
null,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
Expand All @@ -352,7 +378,7 @@ public void testPollWithPartitionSchemeRoundRobin() throws Exception {
null,
KAFKA_TOPIC,
1,
Schema.STRING_SCHEMA,
Schema.OPTIONAL_STRING_SCHEMA,
null,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
Expand All @@ -362,7 +388,7 @@ public void testPollWithPartitionSchemeRoundRobin() throws Exception {
null,
KAFKA_TOPIC,
2,
Schema.STRING_SCHEMA,
Schema.OPTIONAL_STRING_SCHEMA,
null,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
Expand All @@ -372,14 +398,14 @@ public void testPollWithPartitionSchemeRoundRobin() throws Exception {
null,
KAFKA_TOPIC,
0,
Schema.STRING_SCHEMA,
Schema.OPTIONAL_STRING_SCHEMA,
null,
Schema.BYTES_SCHEMA,
KAFKA_VALUE);
assertEquals(expected1, result.get(0));
assertEquals(expected2, result.get(1));
assertEquals(expected3, result.get(2));
assertEquals(expected4, result.get(3));
assertRecordsEqual(expected1, result.get(0));
assertRecordsEqual(expected2, result.get(1));
assertRecordsEqual(expected3, result.get(2));
assertRecordsEqual(expected4, result.get(3));
}

@Test(expected = RuntimeException.class)
Expand Down

0 comments on commit e013cb3

Please sign in to comment.