Skip to content

Commit

Permalink
feat: Make acks flushed faster than once per poll() call
Browse files Browse the repository at this point in the history
  • Loading branch information
dpcollins-google committed Mar 29, 2021
1 parent c2eb727 commit 91593b1
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 43 deletions.
41 changes: 25 additions & 16 deletions kafka-connector/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.pubsub.kafka.connect</groupId>
<artifactId>kafka-connector</artifactId>
Expand Down Expand Up @@ -45,16 +46,11 @@
<artifactId>google-cloud-pubsub</artifactId>
<version>1.112.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>0.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.11.4</version>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.11.4</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
Expand Down Expand Up @@ -86,7 +82,13 @@
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.1</version>
<version>1.1.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.truth.extensions</groupId>
<artifactId>truth-java8-extension</artifactId>
<version>1.1.2</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down Expand Up @@ -127,12 +129,15 @@
<configuration>
<artifactSet>
<excludes>
<exclude>com.fasterxml.jackson.core:jackson-core:jar:*</exclude>
<exclude>com.fasterxml.jackson.core:jackson-core:jar:*
</exclude>
</excludes>
</artifactSet>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
</transformers>
<finalName>pubsub-kafka-connector</finalName>
</configuration>
Expand All @@ -157,9 +162,13 @@
protobuf-java directly, you will be transitively depending on the
protobuf-java version that grpc depends on.
-->
<protocArtifact>com.google.protobuf:protoc:3.11.4:exe:${os.detected.classifier}</protocArtifact>
<protocArtifact>
com.google.protobuf:protoc:3.11.4:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.16.1:exe:${os.detected.classifier}</pluginArtifact>
<pluginArtifact>
io.grpc:protoc-gen-grpc-java:1.16.1:exe:${os.detected.classifier}
</pluginArtifact>
<protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,24 @@
import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Future;
import org.apache.commons.lang3.tuple.Pair;

public class AckBatchingSubscriber implements CloudPubSubSubscriber {
interface AlarmFactory {
Future<?> newAlarm(Runnable runnable);
}

private final CloudPubSubSubscriber underlying;
@GuardedBy("this")
private final Deque<Pair<Collection<String>, SettableApiFuture<Empty>>> toSend = new ArrayDeque<>();
private final ScheduledFuture<?> alarm;
private final Future<?> alarm;

public AckBatchingSubscriber(CloudPubSubSubscriber underlying,
ScheduledExecutorService executor) {
public AckBatchingSubscriber(
CloudPubSubSubscriber underlying,
AlarmFactory alarmFactory) {
this.underlying = underlying;
this.alarm = executor.scheduleAtFixedRate(this::flush, 100, 100, TimeUnit.MILLISECONDS);
this.alarm = alarmFactory.newAlarm(this::flush);
}

@Override
Expand Down Expand Up @@ -73,6 +75,9 @@ public void onSuccess(Empty result) {
@Override
public void close() {
alarm.cancel(false);
try {
alarm.get();
} catch (Throwable ignored) {}
flush();
underlying.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,24 +215,6 @@ public ConfigDef config() {
100L * 1024 * 1024,
Importance.MEDIUM,
"The maximum number of outstanding message bytes per task when using streaming pull.")
.define(
CPS_STREAMING_PULL_ENABLED,
Type.BOOLEAN,
false,
Importance.MEDIUM,
"Whether to use streaming pull for the connector to connect to Cloud Pub/Sub. If provided, cps.maxBatchSize is ignored.")
.define(
CPS_STREAMING_PULL_FLOW_CONTROL_MESSAGES,
Type.LONG,
1000L,
Importance.MEDIUM,
"The maximum number of outstanding messages per task when using streaming pull.")
.define(
CPS_STREAMING_PULL_FLOW_CONTROL_BYTES,
Type.LONG,
100L * 1024 * 1024,
Importance.MEDIUM,
"The maximum number of outstanding message bytes per task when using streaming pull.")
.define(
KAFKA_MESSAGE_KEY_CONFIG,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
////////////////////////////////////////////////////////////////////////////////
package com.google.pubsub.kafka.source;

import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
Expand All @@ -39,6 +41,7 @@
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
Expand Down Expand Up @@ -152,7 +155,7 @@ public void start(Map<String, String> props) {
subscriber = new AckBatchingSubscriber(
new CloudPubSubRoundRobinSubscriber(NUM_CPS_SUBSCRIBERS,
gcpCredentialsProvider,
cpsEndpoint, cpsSubscription, cpsMaxBatchSize), ACK_EXECUTOR);
cpsEndpoint, cpsSubscription, cpsMaxBatchSize), runnable -> ACK_EXECUTOR.scheduleAtFixedRate(runnable, 100, 100, TimeUnit.MILLISECONDS));
}
}
standardAttributes.add(kafkaMessageKeyAttribute);
Expand Down Expand Up @@ -341,7 +344,10 @@ public void stop() {
@Override
public void commitRecord(SourceRecord record) {
String ackId = record.sourceOffset().get(cpsSubscription.toString()).toString();
subscriber.ackMessages(ImmutableList.of(ackId));
ApiFutures.catching(subscriber.ackMessages(ImmutableList.of(ackId)), ApiException.class, e -> {
log.warn("Failed to acknowledge message: " + e);
return null;
}, MoreExecutors.directExecutor());
log.trace("Committed {}", ackId);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,106 @@
package com.google.pubsub.kafka.source;

import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth8.assertThat;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.protobuf.Empty;
import com.google.pubsub.kafka.source.AckBatchingSubscriber.AlarmFactory;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;

@RunWith(JUnit4.class)
public class AckBatchingSubscriberTest {
private final AlarmFactory alarmFactory = mock(AlarmFactory.class);
private final CloudPubSubSubscriber underlying = mock(CloudPubSubSubscriber.class);
private Runnable onAlarm;
private CloudPubSubSubscriber subscriber;

@Before
public void setUp() {
when(alarmFactory.newAlarm(any())).thenAnswer(args -> {
onAlarm = args.getArgument(0);
return Futures.immediateVoidFuture();
});
subscriber = new AckBatchingSubscriber(underlying, alarmFactory);
assertThat(onAlarm).isNotNull();
}

@Test
public void pullProxies() {
subscriber.pull();
verify(underlying, times(1)).pull();
verifyNoMoreInteractions(underlying);
}

@Test
public void closeProxies() {
subscriber.close();
verify(underlying, times(1)).close();
verifyNoMoreInteractions(underlying);
}

public static void assertFutureThrowsCode(Future<?> f, Code code) {
ExecutionException exception = assertThrows(ExecutionException.class, f::get);
assertThrowableMatches(exception.getCause(), code);
}

public static void assertThrowableMatches(Throwable t, Code code) {
Optional<CheckedApiException> statusOr = ExtractStatus.extract(t);
assertThat(statusOr).isPresent();
assertThat(statusOr.get().code()).isEqualTo(code);
}

@Test
public void partialFlushFailure() {
ApiFuture<Empty> future1 = subscriber.ackMessages(ImmutableList.of("a", "b"));
ApiFuture<Empty> future2 = subscriber.ackMessages(ImmutableList.of("c"));
SettableApiFuture<Empty> batchDone = SettableApiFuture.create();
when(underlying.ackMessages(ImmutableList.of("a", "b", "c"))).thenReturn(batchDone);
onAlarm.run();
ApiFuture<Empty> future3 = subscriber.ackMessages(ImmutableList.of("d"));
assertThat(future1.isDone()).isFalse();
assertThat(future2.isDone()).isFalse();
assertThat(future3.isDone()).isFalse();
batchDone.setException(new CheckedApiException(Code.INTERNAL).underlying);
assertFutureThrowsCode(future1, Code.INTERNAL);
assertFutureThrowsCode(future2, Code.INTERNAL);
assertThat(future3.isDone()).isFalse();
}

@Test
public void flushOnClose() throws Exception {
ApiFuture<Empty> future1 = subscriber.ackMessages(ImmutableList.of("a", "b"));
ApiFuture<Empty> future2 = subscriber.ackMessages(ImmutableList.of("c"));
SettableApiFuture<Empty> batchDone = SettableApiFuture.create();
when(underlying.ackMessages(ImmutableList.of("a", "b", "c"))).thenReturn(batchDone);
subscriber.close();
verify(underlying).ackMessages(any());
verify(underlying).close();
assertThat(future1.isDone()).isFalse();
assertThat(future2.isDone()).isFalse();
batchDone.set(null);
future1.get();
future2.get();
}
}

0 comments on commit 91593b1

Please sign in to comment.