Skip to content

Commit

Permalink
Merge pull request #268 from GoogleCloudPlatform/move_commit_batching
Browse files Browse the repository at this point in the history
feat: Make acks flushed faster than once per poll() call
  • Loading branch information
dpcollins-google authored Mar 29, 2021
2 parents 9ee22c5 + 1f0c2bb commit 8bc7d5f
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 96 deletions.
46 changes: 30 additions & 16 deletions kafka-connector/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.google.pubsub.kafka.connect</groupId>
<artifactId>kafka-connector</artifactId>
Expand Down Expand Up @@ -45,16 +46,11 @@
<artifactId>google-cloud-pubsub</artifactId>
<version>1.112.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>0.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.11.4</version>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.11.4</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
Expand All @@ -66,6 +62,11 @@
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>0.6.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-kafka</artifactId>
Expand All @@ -86,7 +87,13 @@
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
<version>1.1</version>
<version>1.1.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.truth.extensions</groupId>
<artifactId>truth-java8-extension</artifactId>
<version>1.1.2</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down Expand Up @@ -127,12 +134,15 @@
<configuration>
<artifactSet>
<excludes>
<exclude>com.fasterxml.jackson.core:jackson-core:jar:*</exclude>
<exclude>com.fasterxml.jackson.core:jackson-core:jar:*
</exclude>
</excludes>
</artifactSet>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" />
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
</transformers>
<finalName>pubsub-kafka-connector</finalName>
</configuration>
Expand All @@ -157,9 +167,13 @@
protobuf-java directly, you will be transitively depending on the
protobuf-java version that grpc depends on.
-->
<protocArtifact>com.google.protobuf:protoc:3.11.4:exe:${os.detected.classifier}</protocArtifact>
<protocArtifact>
com.google.protobuf:protoc:3.11.4:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.16.1:exe:${os.detected.classifier}</pluginArtifact>
<pluginArtifact>
io.grpc:protoc-gen-grpc-java:1.16.1:exe:${os.detected.classifier}
</pluginArtifact>
<protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.google.pubsub.kafka.source;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.Future;
import org.apache.commons.lang3.tuple.Pair;

public class AckBatchingSubscriber implements CloudPubSubSubscriber {
interface AlarmFactory {
Future<?> newAlarm(Runnable runnable);
}

private final CloudPubSubSubscriber underlying;
@GuardedBy("this")
private final Deque<Pair<Collection<String>, SettableApiFuture<Empty>>> toSend = new ArrayDeque<>();
private final Future<?> alarm;

public AckBatchingSubscriber(
CloudPubSubSubscriber underlying,
AlarmFactory alarmFactory) {
this.underlying = underlying;
this.alarm = alarmFactory.newAlarm(this::flush);
}

@Override
public ApiFuture<List<ReceivedMessage>> pull() {
return underlying.pull();
}

@Override
public synchronized ApiFuture<Empty> ackMessages(Collection<String> ackIds) {
SettableApiFuture<Empty> result = SettableApiFuture.create();
toSend.add(Pair.of(ackIds, result));
return result;
}

private void flush() {
List<String> ackIds = new ArrayList<>();
List<SettableApiFuture<Empty>> futures = new ArrayList<>();
synchronized (this) {
if (toSend.isEmpty()) {
return;
}
toSend.forEach(pair -> {
ackIds.addAll(pair.getLeft());
futures.add(pair.getRight());
});
toSend.clear();
}
ApiFuture<Empty> response = underlying.ackMessages(ackIds);
ApiFutures.addCallback(response, new ApiFutureCallback<Empty>() {
@Override
public void onFailure(Throwable t) {
futures.forEach(future -> future.setException(t));
}

@Override
public void onSuccess(Empty result) {
futures.forEach(future -> future.set(result));
}
}, MoreExecutors.directExecutor());
}

@Override
public void close() {
alarm.cancel(false);
try {
alarm.get();
} catch (Throwable ignored) {}
flush();
underlying.close();
}
}
Loading

0 comments on commit 8bc7d5f

Please sign in to comment.