From f1abc91defae530d825e814e63d800e3de4c9649 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Wed, 10 Oct 2018 10:32:55 -0400 Subject: [PATCH] Fix stuck sink connector (#181) * Handle a null schema in the sink connector. * Fix concurrent access on iteration of synchronized set. * Reduce the ackIds iteration synchronization to the shortest period needed * Clear list of ackIds as soon as acks are sent. This will make acks best effort, but also prevents us from forgetting to send some acks if a poll happens while there is an outstanding ack request * Better error message when verifySubscription fails. * When an exception happens on pull, just return an empty list to poll. * Ensure partition number is non-negative in CPS source connector. * Shade out the guava library so the connector can work with a newer version of Kafka that depends on a newer version of the guava library. * Shade out the guava library so the connector can work with a newer version of Kafka that depends on a newer version of the guava library. * Update versions of gRPC libraries used by Kafka Connector. This should hopefully fix issue #120. * Remove bad oracle jdk7 environment from Travis runs. * Minor formatting fixes. * Add a new config property to the sink connector, metadata.publish. When this property is true, the following attributes are added to a message published to Cloud Pub/Sub via the sink connector: kafka.topic: The Kafka topic on which the message was originally published kafka.partition: The Kafka partition to which the message was originally published kafka.offset: The offset of the message in the Kafka topic/partition kafka.timestamp: The timestamp that Kafka associated with the message * Calculate message size at the end rather than along the way. * Remove the temporary variables for Kafka attributes * Periodically recreate GRPC publishers and subscribers in order to avoid GOAWAY errors. * Formatting/syntactic fixes * Switch sink connector to client library. * Remove commented-out code * Fix java 7 compatibility * Add parameters for timeouts for publishes to Cloud Pub/Sub. * Fix handling of optional struct fields and error message for missing fields, which would show up as a message about unsupported nested types. * Add test case for optional field where the value is present. * Set maximum inbound message size on source connector to ensure it is possible to receive a message up to largest Pub/Sub message size (10MB). * Added instructions to Kafka Connector about downloading from a release. * Clear the list of futures whenever we flush. If we do not do this, then when an error occurs, all subsequent flush attempts will fail. --- .../kafka/sink/CloudPubSubSinkTask.java | 2 ++ .../kafka/sink/CloudPubSubSinkTaskTest.java | 27 +++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java index 19e773e4..c9ed4269 100644 --- a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java +++ b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java @@ -269,6 +269,8 @@ public void flush(Map partitionOffsets) { ApiFutures.allAsList(outstandingFutures.futures).get(); } catch (Exception e) { throw new RuntimeException(e); + } finally { + outstandingFutures.futures.clear(); } } allOutstandingFutures.clear(); diff --git a/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java b/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java index 317de8c9..eec40a15 100644 --- a/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java +++ b/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java @@ -385,6 +385,33 @@ public void testKafkaMetadata() { assertEquals(requestArgs, expectedMessages); } + /** + * Tests that if a Future that is being processed in flush() failed with an exception and then a + * second Future is processed successfully in a subsequent flush, then the subsequent flush + * succeeds. + */ + @Test + public void testFlushExceptionThenNoExceptionCase() throws Exception { + task.start(props); + Map partitionOffsets = new HashMap<>(); + partitionOffsets.put(new TopicPartition(KAFKA_TOPIC, 0), null); + List records = getSampleRecords(); + ApiFuture badFuture = getFailedPublishFuture(); + ApiFuture goodFuture = getSuccessfulPublishFuture(); + when(publisher.publish(any(PubsubMessage.class))).thenReturn(badFuture).thenReturn(badFuture).thenReturn(goodFuture); + task.put(records); + try { + task.flush(partitionOffsets); + } catch (RuntimeException e) { + } + records = getSampleRecords(); + task.put(records); + task.flush(partitionOffsets); + verify(publisher, times(4)).publish(any(PubsubMessage.class)); + verify(badFuture, times(2)).addListener(any(Runnable.class), any(Executor.class)); + verify(goodFuture, times(2)).addListener(any(Runnable.class), any(Executor.class)); + } + /** Get some sample SinkRecords's to use in the tests. */ private List getSampleRecords() { List records = new ArrayList<>();