From 91593b1da4ecb01874414c96baf6a6a143a70850 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Sat, 27 Mar 2021 01:14:05 -0400 Subject: [PATCH] feat: Make acks flushed faster than once per poll() call --- kafka-connector/pom.xml | 41 ++++--- .../kafka/source/AckBatchingSubscriber.java | 19 ++-- .../source/CloudPubSubSourceConnector.java | 18 ---- .../kafka/source/CloudPubSubSourceTask.java | 10 +- .../source/AckBatchingSubscriberTest.java | 101 ++++++++++++++++++ 5 files changed, 146 insertions(+), 43 deletions(-) diff --git a/kafka-connector/pom.xml b/kafka-connector/pom.xml index 36038cdc..c200a3f7 100644 --- a/kafka-connector/pom.xml +++ b/kafka-connector/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 com.google.pubsub.kafka.connect kafka-connector @@ -45,16 +46,11 @@ google-cloud-pubsub 1.112.0 - - com.google.cloud - google-cloud-pubsublite - 0.6.0 - - com.google.protobuf - protobuf-java - 3.11.4 + com.google.protobuf + protobuf-java + 3.11.4 com.google.auth @@ -86,7 +82,13 @@ com.google.truth truth - 1.1 + 1.1.2 + test + + + com.google.truth.extensions + truth-java8-extension + 1.1.2 test @@ -127,12 +129,15 @@ - com.fasterxml.jackson.core:jackson-core:jar:* + com.fasterxml.jackson.core:jackson-core:jar:* + - - + + pubsub-kafka-connector @@ -157,9 +162,13 @@ protobuf-java directly, you will be transitively depending on the protobuf-java version that grpc depends on. --> - com.google.protobuf:protoc:3.11.4:exe:${os.detected.classifier} + + com.google.protobuf:protoc:3.11.4:exe:${os.detected.classifier} + grpc-java - io.grpc:protoc-gen-grpc-java:1.16.1:exe:${os.detected.classifier} + + io.grpc:protoc-gen-grpc-java:1.16.1:exe:${os.detected.classifier} + ${project.basedir}/src/main/proto diff --git a/kafka-connector/src/main/java/com/google/pubsub/kafka/source/AckBatchingSubscriber.java b/kafka-connector/src/main/java/com/google/pubsub/kafka/source/AckBatchingSubscriber.java index ba8cc8c0..9157bdd3 100644 --- a/kafka-connector/src/main/java/com/google/pubsub/kafka/source/AckBatchingSubscriber.java +++ b/kafka-connector/src/main/java/com/google/pubsub/kafka/source/AckBatchingSubscriber.java @@ -13,22 +13,24 @@ import java.util.Collection; import java.util.Deque; import java.util.List; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Future; import org.apache.commons.lang3.tuple.Pair; public class AckBatchingSubscriber implements CloudPubSubSubscriber { + interface AlarmFactory { + Future newAlarm(Runnable runnable); + } private final CloudPubSubSubscriber underlying; @GuardedBy("this") private final Deque, SettableApiFuture>> toSend = new ArrayDeque<>(); - private final ScheduledFuture alarm; + private final Future alarm; - public AckBatchingSubscriber(CloudPubSubSubscriber underlying, - ScheduledExecutorService executor) { + public AckBatchingSubscriber( + CloudPubSubSubscriber underlying, + AlarmFactory alarmFactory) { this.underlying = underlying; - this.alarm = executor.scheduleAtFixedRate(this::flush, 100, 100, TimeUnit.MILLISECONDS); + this.alarm = alarmFactory.newAlarm(this::flush); } @Override @@ -73,6 +75,9 @@ public void onSuccess(Empty result) { @Override public void close() { alarm.cancel(false); + try { + alarm.get(); + } catch (Throwable ignored) {} flush(); underlying.close(); } diff --git a/kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java b/kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java index bc111412..0de1f101 100644 --- a/kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java +++ b/kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceConnector.java @@ -215,24 +215,6 @@ public ConfigDef config() { 100L * 1024 * 1024, Importance.MEDIUM, "The maximum number of outstanding message bytes per task when using streaming pull.") - .define( - CPS_STREAMING_PULL_ENABLED, - Type.BOOLEAN, - false, - Importance.MEDIUM, - "Whether to use streaming pull for the connector to connect to Cloud Pub/Sub. If provided, cps.maxBatchSize is ignored.") - .define( - CPS_STREAMING_PULL_FLOW_CONTROL_MESSAGES, - Type.LONG, - 1000L, - Importance.MEDIUM, - "The maximum number of outstanding messages per task when using streaming pull.") - .define( - CPS_STREAMING_PULL_FLOW_CONTROL_BYTES, - Type.LONG, - 100L * 1024 * 1024, - Importance.MEDIUM, - "The maximum number of outstanding message bytes per task when using streaming pull.") .define( KAFKA_MESSAGE_KEY_CONFIG, Type.STRING, diff --git a/kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java b/kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java index 2e2675e1..232479af 100644 --- a/kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java +++ b/kafka-connector/src/main/java/com/google/pubsub/kafka/source/CloudPubSubSourceTask.java @@ -15,8 +15,10 @@ //////////////////////////////////////////////////////////////////////////////// package com.google.pubsub.kafka.source; +import com.google.api.core.ApiFutures; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController.LimitExceededBehavior; +import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsub.v1.Subscriber; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -39,6 +41,7 @@ import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -152,7 +155,7 @@ public void start(Map props) { subscriber = new AckBatchingSubscriber( new CloudPubSubRoundRobinSubscriber(NUM_CPS_SUBSCRIBERS, gcpCredentialsProvider, - cpsEndpoint, cpsSubscription, cpsMaxBatchSize), ACK_EXECUTOR); + cpsEndpoint, cpsSubscription, cpsMaxBatchSize), runnable -> ACK_EXECUTOR.scheduleAtFixedRate(runnable, 100, 100, TimeUnit.MILLISECONDS)); } } standardAttributes.add(kafkaMessageKeyAttribute); @@ -341,7 +344,10 @@ public void stop() { @Override public void commitRecord(SourceRecord record) { String ackId = record.sourceOffset().get(cpsSubscription.toString()).toString(); - subscriber.ackMessages(ImmutableList.of(ackId)); + ApiFutures.catching(subscriber.ackMessages(ImmutableList.of(ackId)), ApiException.class, e -> { + log.warn("Failed to acknowledge message: " + e); + return null; + }, MoreExecutors.directExecutor()); log.trace("Committed {}", ackId); } } diff --git a/kafka-connector/src/test/java/com/google/pubsub/kafka/source/AckBatchingSubscriberTest.java b/kafka-connector/src/test/java/com/google/pubsub/kafka/source/AckBatchingSubscriberTest.java index ecd692dd..b98b8f10 100644 --- a/kafka-connector/src/test/java/com/google/pubsub/kafka/source/AckBatchingSubscriberTest.java +++ b/kafka-connector/src/test/java/com/google/pubsub/kafka/source/AckBatchingSubscriberTest.java @@ -1,5 +1,106 @@ package com.google.pubsub.kafka.source; +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth8.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import com.google.api.core.ApiFuture; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.StatusCode.Code; +import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.ExtractStatus; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.google.protobuf.Empty; +import com.google.pubsub.kafka.source.AckBatchingSubscriber.AlarmFactory; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; + +@RunWith(JUnit4.class) public class AckBatchingSubscriberTest { + private final AlarmFactory alarmFactory = mock(AlarmFactory.class); + private final CloudPubSubSubscriber underlying = mock(CloudPubSubSubscriber.class); + private Runnable onAlarm; + private CloudPubSubSubscriber subscriber; + + @Before + public void setUp() { + when(alarmFactory.newAlarm(any())).thenAnswer(args -> { + onAlarm = args.getArgument(0); + return Futures.immediateVoidFuture(); + }); + subscriber = new AckBatchingSubscriber(underlying, alarmFactory); + assertThat(onAlarm).isNotNull(); + } + + @Test + public void pullProxies() { + subscriber.pull(); + verify(underlying, times(1)).pull(); + verifyNoMoreInteractions(underlying); + } + + @Test + public void closeProxies() { + subscriber.close(); + verify(underlying, times(1)).close(); + verifyNoMoreInteractions(underlying); + } + + public static void assertFutureThrowsCode(Future f, Code code) { + ExecutionException exception = assertThrows(ExecutionException.class, f::get); + assertThrowableMatches(exception.getCause(), code); + } + + public static void assertThrowableMatches(Throwable t, Code code) { + Optional statusOr = ExtractStatus.extract(t); + assertThat(statusOr).isPresent(); + assertThat(statusOr.get().code()).isEqualTo(code); + } + + @Test + public void partialFlushFailure() { + ApiFuture future1 = subscriber.ackMessages(ImmutableList.of("a", "b")); + ApiFuture future2 = subscriber.ackMessages(ImmutableList.of("c")); + SettableApiFuture batchDone = SettableApiFuture.create(); + when(underlying.ackMessages(ImmutableList.of("a", "b", "c"))).thenReturn(batchDone); + onAlarm.run(); + ApiFuture future3 = subscriber.ackMessages(ImmutableList.of("d")); + assertThat(future1.isDone()).isFalse(); + assertThat(future2.isDone()).isFalse(); + assertThat(future3.isDone()).isFalse(); + batchDone.setException(new CheckedApiException(Code.INTERNAL).underlying); + assertFutureThrowsCode(future1, Code.INTERNAL); + assertFutureThrowsCode(future2, Code.INTERNAL); + assertThat(future3.isDone()).isFalse(); + } + @Test + public void flushOnClose() throws Exception { + ApiFuture future1 = subscriber.ackMessages(ImmutableList.of("a", "b")); + ApiFuture future2 = subscriber.ackMessages(ImmutableList.of("c")); + SettableApiFuture batchDone = SettableApiFuture.create(); + when(underlying.ackMessages(ImmutableList.of("a", "b", "c"))).thenReturn(batchDone); + subscriber.close(); + verify(underlying).ackMessages(any()); + verify(underlying).close(); + assertThat(future1.isDone()).isFalse(); + assertThat(future2.isDone()).isFalse(); + batchDone.set(null); + future1.get(); + future2.get(); + } }