diff --git a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java index 19e773e4..c9ed4269 100644 --- a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java +++ b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java @@ -269,6 +269,8 @@ public void flush(Map partitionOffsets) { ApiFutures.allAsList(outstandingFutures.futures).get(); } catch (Exception e) { throw new RuntimeException(e); + } finally { + outstandingFutures.futures.clear(); } } allOutstandingFutures.clear(); diff --git a/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java b/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java index 317de8c9..eec40a15 100644 --- a/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java +++ b/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java @@ -385,6 +385,33 @@ public void testKafkaMetadata() { assertEquals(requestArgs, expectedMessages); } + /** + * Tests that if a Future that is being processed in flush() failed with an exception and then a + * second Future is processed successfully in a subsequent flush, then the subsequent flush + * succeeds. + */ + @Test + public void testFlushExceptionThenNoExceptionCase() throws Exception { + task.start(props); + Map partitionOffsets = new HashMap<>(); + partitionOffsets.put(new TopicPartition(KAFKA_TOPIC, 0), null); + List records = getSampleRecords(); + ApiFuture badFuture = getFailedPublishFuture(); + ApiFuture goodFuture = getSuccessfulPublishFuture(); + when(publisher.publish(any(PubsubMessage.class))).thenReturn(badFuture).thenReturn(badFuture).thenReturn(goodFuture); + task.put(records); + try { + task.flush(partitionOffsets); + } catch (RuntimeException e) { + } + records = getSampleRecords(); + task.put(records); + task.flush(partitionOffsets); + verify(publisher, times(4)).publish(any(PubsubMessage.class)); + verify(badFuture, times(2)).addListener(any(Runnable.class), any(Executor.class)); + verify(goodFuture, times(2)).addListener(any(Runnable.class), any(Executor.class)); + } + /** Get some sample SinkRecords's to use in the tests. */ private List getSampleRecords() { List records = new ArrayList<>();