Skip to content

Commit

Permalink
Merge pull request #282 from jordanbull/Feature/flow-control-max-pending
Browse files Browse the repository at this point in the history
Add blocking flow controls to the PubSub Sink connector
  • Loading branch information
samarthsingal authored Jul 21, 2021
2 parents 879a480 + 390e961 commit 432fda9
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 7 deletions.
2 changes: 2 additions & 0 deletions kafka-connector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ Connector supports the following configs:
| cps.endpoint | String | "pubsub.googleapis.com:443" | The [Cloud Pub/Sub endpoint](https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints) to use. |
| maxBufferSize | Integer | 100 | The maximum number of messages that can be received for the messages on a topic partition before publishing them to Cloud Pub/Sub. |
| maxBufferBytes | Long | 10000000 | The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Cloud Pub/Sub. |
| maxOutstandingRequestBytes | Long | Long.MAX_VALUE | The maximum number of outstanding bytes for incomplete and unsent messages before the publisher will block further publishing. |
| maxOutstandingMessages | Long | Long.MAX_VALUE | The maximum number of incomplete and unsent messages before the publisher will block further publishing. |
| maxDelayThresholdMs | Integer | 100 | The maximum amount of time to wait to reach maxBufferSize or maxBufferBytes before publishing outstanding messages to Cloud Pub/Sub. |
| maxRequestTimeoutMs | Integer | 10000 | The timeout for individual publish requests to Cloud Pub/Sub. |
| maxTotalTimeoutMs | Integer | 60000| The total timeout for a call to publish (including retries) to Cloud Pub/Sub. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class CloudPubSubSinkConnector extends SinkConnector {

public static final String MAX_BUFFER_SIZE_CONFIG = "maxBufferSize";
public static final String MAX_BUFFER_BYTES_CONFIG = "maxBufferBytes";
public static final String MAX_OUTSTANDING_REQUEST_BYTES = "maxOutstandingRequestBytes";
public static final String MAX_OUTSTANDING_MESSAGES = "maxOutstandingMessages";
public static final String MAX_DELAY_THRESHOLD_MS = "delayThresholdMs";
public static final String MAX_REQUEST_TIMEOUT_MS = "maxRequestTimeoutMs";
public static final String MAX_TOTAL_TIMEOUT_MS = "maxTotalTimeoutMs";
Expand All @@ -51,6 +53,8 @@ public class CloudPubSubSinkConnector extends SinkConnector {
public static final int DEFAULT_REQUEST_TIMEOUT_MS = 10000;
public static final int DEFAULT_TOTAL_TIMEOUT_MS = 60000;
public static final int DEFAULT_SHUTDOWN_TIMEOUT_MS = 60000;
public static final long DEFAULT_MAX_OUTSTANDING_REQUEST_BYTES = Long.MAX_VALUE;
public static final long DEFAULT_MAX_OUTSTANDING_MESSAGES = Long.MAX_VALUE;
public static final String CPS_MESSAGE_BODY_NAME = "messageBodyName";
public static final String DEFAULT_MESSAGE_BODY_NAME = "cps_message_body";
public static final String PUBLISH_KAFKA_METADATA = "metadata.publish";
Expand Down Expand Up @@ -163,6 +167,18 @@ public ConfigDef config() {
Importance.MEDIUM,
"The maximum number of bytes that can be received for the messages on a topic "
+ "partition before publishing the messages to Cloud Pub/Sub.")
.define(MAX_OUTSTANDING_REQUEST_BYTES,
Type.LONG,
DEFAULT_MAX_OUTSTANDING_REQUEST_BYTES,
Importance.MEDIUM,
"The maximum outstanding bytes from incomplete requests before the task blocks."
)
.define(MAX_OUTSTANDING_MESSAGES,
Type.LONG,
DEFAULT_MAX_OUTSTANDING_MESSAGES,
Importance.MEDIUM,
"The maximum outstanding incomplete messages before the task blocks."
)
.define(
MAX_DELAY_THRESHOLD_MS,
Type.INT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -51,7 +53,6 @@
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
Expand All @@ -74,6 +75,8 @@ public class CloudPubSubSinkTask extends SinkTask {
private String messageBodyName;
private long maxBufferSize;
private long maxBufferBytes;
private long maxOutstandingRequestBytes;
private long maxOutstandingMessages;
private int maxDelayThresholdMs;
private int maxRequestTimeoutMs;
private int maxTotalTimeoutMs;
Expand All @@ -84,6 +87,8 @@ public class CloudPubSubSinkTask extends SinkTask {
private ConnectorCredentialsProvider gcpCredentialsProvider;
private com.google.cloud.pubsub.v1.Publisher publisher;



/** Holds a list of the publishing futures that have not been processed for a single partition. */
private class OutstandingFuturesForPartition {
public List<ApiFuture<String>> futures = new ArrayList<>();
Expand Down Expand Up @@ -118,6 +123,10 @@ public void start(Map<String, String> props) {
cpsEndpoint = validatedProps.get(ConnectorUtils.CPS_ENDPOINT).toString();
maxBufferSize = (Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_SIZE_CONFIG);
maxBufferBytes = (Long) validatedProps.get(CloudPubSubSinkConnector.MAX_BUFFER_BYTES_CONFIG);
maxOutstandingRequestBytes =
(Long) validatedProps.get(CloudPubSubSinkConnector.MAX_OUTSTANDING_REQUEST_BYTES);
maxOutstandingMessages =
(Long) validatedProps.get(CloudPubSubSinkConnector.MAX_OUTSTANDING_MESSAGES);
maxDelayThresholdMs =
(Integer) validatedProps.get(CloudPubSubSinkConnector.MAX_DELAY_THRESHOLD_MS);
maxRequestTimeoutMs =
Expand Down Expand Up @@ -387,15 +396,24 @@ private void addPendingMessageFuture(String topic, Integer partition, ApiFuture<

private void createPublisher() {
ProjectTopicName fullTopic = ProjectTopicName.of(cpsProject, cpsTopic);

BatchingSettings.Builder batchingSettings = BatchingSettings.newBuilder()
.setDelayThreshold(Duration.ofMillis(maxDelayThresholdMs))
.setElementCountThreshold(maxBufferSize)
.setRequestByteThreshold(maxBufferBytes);

if (useFlowControl()) {
batchingSettings.setFlowControlSettings(FlowControlSettings.newBuilder()
.setMaxOutstandingRequestBytes(maxOutstandingRequestBytes)
.setMaxOutstandingElementCount(maxOutstandingMessages)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.build());
}

com.google.cloud.pubsub.v1.Publisher.Builder builder =
com.google.cloud.pubsub.v1.Publisher.newBuilder(fullTopic)
.setCredentialsProvider(gcpCredentialsProvider)
.setBatchingSettings(
BatchingSettings.newBuilder()
.setDelayThreshold(Duration.ofMillis(maxDelayThresholdMs))
.setElementCountThreshold(maxBufferSize)
.setRequestByteThreshold(maxBufferBytes)
.build())
.setBatchingSettings(batchingSettings.build())
.setRetrySettings(
RetrySettings.newBuilder()
// All values that are not configurable come from the defaults for the publisher
Expand All @@ -420,6 +438,12 @@ private void createPublisher() {
}
}

private boolean useFlowControl() {
// only enable flow control if at least one flow control config has been set
return maxOutstandingRequestBytes != CloudPubSubSinkConnector.DEFAULT_MAX_OUTSTANDING_REQUEST_BYTES
|| maxOutstandingRequestBytes != CloudPubSubSinkConnector.DEFAULT_MAX_OUTSTANDING_MESSAGES;
}

@Override
public void stop() {
log.info("Stopping CloudPubSubSinkTask");
Expand Down

0 comments on commit 432fda9

Please sign in to comment.