Skip to content

Commit

Permalink
Fix stuck sink connector (#181)
Browse files Browse the repository at this point in the history
* Handle a null schema in the sink connector.

* Fix concurrent access on iteration of synchronized set.

* Reduce the ackIds iteration synchronization to the shortest period needed

* Clear list of ackIds as soon as acks are sent. This will make acks best effort, but also prevents us from forgetting to send some acks if a poll happens while there is an outstanding ack request

* Better error message when verifySubscription fails.

* When an exception happens on pull, just return an empty list to poll.

* Ensure partition number is non-negative in CPS source connector.

* Shade out the guava library so the connector can work with a newer version of Kafka that depends on a newer version of the guava library.

* Shade out the guava library so the connector can work with a newer version of Kafka that depends on a newer version of the guava library.

* Update versions of gRPC libraries used by Kafka Connector. This should hopefully fix issue #120.

* Remove bad oracle jdk7 environment from Travis runs.

* Minor formatting fixes.

* Add a new config property to the sink connector, metadata.publish. When this property is true, the following attributes are added to a message published to Cloud Pub/Sub via the sink connector:

kafka.topic: The Kafka topic on which the message was originally published
kafka.partition: The Kafka partition to which the message was originally published
kafka.offset: The offset of the message in the Kafka topic/partition
kafka.timestamp: The timestamp that Kafka associated with the message

* Calculate message size at the end rather than along the way.

* Remove the temporary variables for Kafka attributes

* Periodically recreate GRPC publishers and subscribers in order to avoid GOAWAY errors.

* Formatting/syntactic fixes

* Switch sink connector to client library.

* Remove commented-out code

* Fix java 7 compatibility

* Add parameters for timeouts for publishes to Cloud Pub/Sub.

* Fix handling of optional struct fields and error message for missing fields, which would show up as a message about unsupported nested types.

* Add test case for optional field where the value is present.

* Set maximum inbound message size on source connector to ensure it is possible to receive a message up to largest Pub/Sub message size (10MB).

* Added instructions to Kafka Connector about downloading from a release.

* Clear the list of futures whenever we flush. If we do not do this, then when an error occurs, all subsequent flush attempts will fail.
  • Loading branch information
kamalaboulhosn authored Oct 10, 2018
1 parent c37d128 commit f1abc91
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ public void flush(Map<TopicPartition, OffsetAndMetadata> partitionOffsets) {
ApiFutures.allAsList(outstandingFutures.futures).get();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
outstandingFutures.futures.clear();
}
}
allOutstandingFutures.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,33 @@ public void testKafkaMetadata() {
assertEquals(requestArgs, expectedMessages);
}

/**
* Tests that if a Future that is being processed in flush() failed with an exception and then a
* second Future is processed successfully in a subsequent flush, then the subsequent flush
* succeeds.
*/
@Test
public void testFlushExceptionThenNoExceptionCase() throws Exception {
task.start(props);
Map<TopicPartition, OffsetAndMetadata> partitionOffsets = new HashMap<>();
partitionOffsets.put(new TopicPartition(KAFKA_TOPIC, 0), null);
List<SinkRecord> records = getSampleRecords();
ApiFuture<String> badFuture = getFailedPublishFuture();
ApiFuture<String> goodFuture = getSuccessfulPublishFuture();
when(publisher.publish(any(PubsubMessage.class))).thenReturn(badFuture).thenReturn(badFuture).thenReturn(goodFuture);
task.put(records);
try {
task.flush(partitionOffsets);
} catch (RuntimeException e) {
}
records = getSampleRecords();
task.put(records);
task.flush(partitionOffsets);
verify(publisher, times(4)).publish(any(PubsubMessage.class));
verify(badFuture, times(2)).addListener(any(Runnable.class), any(Executor.class));
verify(goodFuture, times(2)).addListener(any(Runnable.class), any(Executor.class));
}

/** Get some sample SinkRecords's to use in the tests. */
private List<SinkRecord> getSampleRecords() {
List<SinkRecord> records = new ArrayList<>();
Expand Down

0 comments on commit f1abc91

Please sign in to comment.