Skip to content

Commit

Permalink
Merge pull request #361 from matt-kwong/pullers
Browse files Browse the repository at this point in the history
Add builder options to source/sink
  • Loading branch information
clmccart authored Mar 22, 2024
2 parents 910f932 + 17eef69 commit 3991c82
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 26 deletions.
86 changes: 60 additions & 26 deletions flink-connector/docs/content/docs/connectors/datastream/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,29 +113,54 @@ successful checkpoint are unacknowledged and will automatically be redelivered.
Note that there is no message delivery state stored in checkpoints, so retained
checkpoints are not necessary to resume using Pub/Sub source.

### Flow Control

Each Pub/Sub source subtask manages a `StreamingPull` connection to Google Cloud
Pub/Sub. That is, a Pub/Sub source with parallelism of 20 manages 20 separate
`StreamingPull` connections to Google Cloud Pub/Sub. The flow control settings
described in this section are applied to **individual** connections.
### StreamingPull Connections and Flow Control

Each Pub/Sub source subtask opens and manages `StreamingPull` connections to
Google Cloud Pub/Sub. The number of connections per subtask can be set using
`PubSubSource.<OutputT>builder().setParallelPullCount` (defaults to 1). Opening
more connections can increase the message throughput delivered to each subtask.
Note that the underlying subscriber client library creates an executor with 5
threads for each connection opened, so too many connections can be detrimental
to performance.

Google Cloud Pub/Sub servers pause message delivery to a `StreamingPull`
connection when a flow control limit is exceeded. There are two forms of flow
control:

1) Message delivery throughput
2) Outstanding message count / bytes

`StreamingPull` connections are limited to pulling messages at 10 MB/s. This
limit cannot be configured. Opening more connections is recommended when
observing message throughput flow control. See
[Pub/Sub quotas and limits](https://cloud.google.com/pubsub/quotas) for a full
list of limitations.

The other form of flow control is based on outstanding messages--when a message
has been delivered but not yet acknowledged. Since outstanding messages are
acknowledged when a checkpoint completes, flow control limits for outstanding
messages are effectively per-checkpoint interval limits. Infrequent
checkpointing can cause connections to be flow controlled due to too many
outstanding messages.

Several Pub/Sub source options configure flow control. These options are
illustrated below:
The snippet below shows how to configure connection count and flow control
settings.

```java
PubSubSource.<OutputT>builder()
// Open 5 StreamingPull connections.
.setParallelPullCount(5)
// Allow up to 10,000 message deliveries per checkpoint interval.
.setMaxOutstandingMessagesCount(10_000L)
// Allow up to 1000 MB in cumulatitive message size per checkpoint interval.
.setMaxOutstandingMessagesBytes(1000L * 1024L * 1024L) // 1000 MB
```

Google Cloud Pub/Sub servers pause message delivery when a flow control setting
is exceeded. A message is considered outstanding from when Google Cloud Pub/Sub
delivers it until the subscriber acknowledges it. Since outstanding messages are
acknowledged when a checkpoint completes, flow control limits for outstanding
messages are effectively per-checkpoint interval limits.
A Pub/Sub source subtask with these options is able to:

- Pull messages at up to 50 MB/s
- Receive up to 50,000 messages **or** 5000 MB in cumulative message size per
checkpoint interval

### Message Leasing

Expand All @@ -144,17 +169,6 @@ means a checkpointing interval can be longer than the acknowledge deadline
without causing messages redelivery. Note that acknowledge deadlines can be
extended to at most 1h.

### Performance Considerations

- Infrequent checkpointing can cause performance issues, including
subscription backlog growth and increased rate of duplicate message
delivery.
- `StreamingPull` connections have a 10 MB/s throughput limit. See
[Pub/Sub quotas and limits](https://cloud.google.com/pubsub/quotas) for all
restrictions.

<!-- TODO(matt-kwong) Add threading details. -->

### All Options

#### Required Builder Options
Expand Down Expand Up @@ -200,18 +214,28 @@ extended to at most 1h.
<tr>
<td>setMaxOutstandingMessagesCount(Long count)</td>
<td><code>1000L</code></td>
<td>The maximum number of messages that can be delivered to a Pub/Sub source subtask in a checkpoint interval.</td>
<td>The maximum number of messages that can be delivered to a StreamingPull connection within a checkpoint interval.</td>
</tr>
<tr>
<td>setMaxOutstandingMessagesBytes(Long bytes)</td>
<td><code>100L * 1024L * 1024L</code> (100 MB)</td>
<td>The maximum number of cumulative bytes that can be delivered to a Pub/Sub source subtask in a checkpoint interval.</td>
<td>The maximum number of cumulative bytes that can be delivered to a StreamingPull connection within a checkpoint interval.</td>
</tr>
<tr>
<td>setParallelPullCount(Integer parallelPullCount)</td>
<td>1</td>
<td>The number of StreamingPull connections to open for pulling messages from Google Cloud Pub/Sub.</td>
</tr>
<tr>
<td>setCredentials(Credentials credentials)</td>
<td>(none)</td>
<td>The credentials attached to requests sent to Google Cloud Pub/Sub. The identity in the credentials must be authorized to pull messages from the subscription. If not set, then Pub/Sub source uses Application Default Credentials.</td>
</tr>
<tr>
<td>setEndpoint(String endpoint)</td>
<td>pubsub.googleapis.com:443</td>
<td>The Google Cloud Pub/Sub gRPC endpoint from which messages are pulled. Defaults to the global endpoint, which routes requests to the nearest regional endpoint.</td>
</tr>
</tbody>
</table>

Expand Down Expand Up @@ -292,6 +316,16 @@ possible data loss.
<td>(none)</td>
<td>The credentials attached to requests sent to Google Cloud Pub/Sub. The identity in the credentials must be authorized to publish messages to the topic. If not set, then Pub/Sub sink uses Application Default Credentials.</td>
</tr>
<tr>
<td>setEnableMessageOrdering(Boolean enableMessageOrdering)</td>
<td>false</td>
<td>This must be set to true when publishing messages with an ordering key.</td>
</tr>
<tr>
<td>setEndpoint(String endpoint)</td>
<td>pubsub.googleapis.com:443</td>
<td>The Google Cloud Pub/Sub gRPC endpoint to which messages are published. Defaults to the global endpoint, which routes requests to the nearest regional endpoint.</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;

/**
* Google Cloud Pub/Sub sink to publish messages to a Pub/Sub topic.
*
* <p>{@link PubSubSink} is constructed and configured using {@link Builder}. {@link PubSubSink}
* cannot be configured after it is built. See {@link Builder} for how {@link PubSubSink} can be
* configured.
*/
@AutoValue
public abstract class PubSubSink<T> implements Sink<T> {
public abstract String projectName();
Expand All @@ -38,6 +45,10 @@ public abstract class PubSubSink<T> implements Sink<T> {

public abstract Optional<Credentials> credentials();

public abstract Optional<Boolean> enableMessageOrdering();

public abstract Optional<String> endpoint();

public static <T> Builder<T> builder() {
return new AutoValue_PubSubSink.Builder<T>();
}
Expand All @@ -47,6 +58,12 @@ private Publisher createPublisher(TopicName topicName) throws IOException {
if (credentials().isPresent()) {
builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials().get()));
}
if (enableMessageOrdering().isPresent()) {
builder.setEnableMessageOrdering(enableMessageOrdering().get());
}
if (endpoint().isPresent()) {
builder.setEndpoint(endpoint().get());
}
return builder.build();
}

Expand All @@ -64,17 +81,54 @@ public SinkWriter<T> createWriter(InitContext initContext) throws IOException {
serializationSchema());
}

/** Builder to construct {@link PubSubSink}. */
@AutoValue.Builder
public abstract static class Builder<T> {
/**
* Sets the GCP project ID that owns the topic to which messages are published.
*
* <p>Setting this option is required to build {@link PubSubSink}.
*/
public abstract Builder<T> setProjectName(String projectName);

/**
* Sets the Pub/Sub topic to which messages are published.
*
* <p>Setting this option is required to build {@link PubSubSink}.
*/
public abstract Builder<T> setTopicName(String topicName);

/**
* Sets the serialization schema used to serialize incoming data into a {@link PubsubMessage}.
*
* <p>Setting this option is required to build {@link PubSubSink}.
*/
public abstract Builder<T> setSerializationSchema(
PubSubSerializationSchema<T> serializationSchema);

/**
* Sets the credentials used when publishing messages to Google Cloud Pub/Sub.
*
* <p>If not set, then Application Default Credentials are used for authentication.
*/
public abstract Builder<T> setCredentials(Credentials credentials);

/**
* Sets whether to enable ordered publishing.
*
* <p>The default value is {@code false}. This must be set to {@code true} when publishing an
* {@link PubsubMessage} with an ordering key set.
*/
public abstract Builder<T> setEnableMessageOrdering(Boolean enableMessageOrdering);

/**
* Sets the Google Cloud Pub/Sub service endpoint to which messages are published.
*
* <p>Defaults to connecting to the global endpoint, which routes requests to the nearest
* regional endpoint.
*/
public abstract Builder<T> setEndpoint(String endpoint);

public abstract PubSubSink<T> build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.util.UserCodeClassLoader;

/**
* Google Cloud Pub/Sub source to pull messages from a Pub/Sub subscription.
*
* <p>{@link PubSubSource} is constructed and configured using {@link Builder}. {@link PubSubSource}
* cannot be configured after it is built. See {@link Builder} for how {@link PubSubSource} can be
* configured.
*/
@AutoValue
public abstract class PubSubSource<OutputT>
implements Source<OutputT, SubscriptionSplit, PubSubEnumeratorCheckpoint>,
Expand All @@ -67,8 +74,12 @@ public abstract class PubSubSource<OutputT>

public abstract Optional<Long> maxOutstandingMessagesBytes();

public abstract Optional<Integer> parallelPullCount();

public abstract Optional<Credentials> credentials();

public abstract Optional<String> endpoint();

public static <OutputT> Builder<OutputT> builder() {
return new AutoValue_PubSubSource.Builder<OutputT>();
}
Expand All @@ -83,9 +94,15 @@ Subscriber createSubscriber(MessageReceiver receiver) {
.setMaxOutstandingRequestBytes(
maxOutstandingMessagesBytes().or(100L * 1024L * 1024L)) // 100MB
.build());
if (parallelPullCount().isPresent()) {
builder.setParallelPullCount(parallelPullCount().get());
}
if (credentials().isPresent()) {
builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials().get()));
}
if (endpoint().isPresent()) {
builder.setEndpoint(endpoint().get());
}

// Assume we should connect to the Pub/Sub emulator if PUBSUB_EMULATOR_HOST is set.
String emulatorEndpoint = System.getenv("PUBSUB_EMULATOR_HOST");
Expand Down Expand Up @@ -168,21 +185,77 @@ public TypeInformation<OutputT> getProducedType() {
return deserializationSchema().getProducedType();
}

/** Builder to construct {@link PubSubSource}. */
@AutoValue.Builder
public abstract static class Builder<OutputT> {
/**
* Sets the GCP project ID that owns the subscription from which messages are pulled.
*
* <p>Setting this option is required to build {@link PubSubSource}.
*/
public abstract Builder<OutputT> setProjectName(String projectName);

/**
* Sets the Pub/Sub subscription to which messages are pulled.
*
* <p>Setting this option is required to build {@link PubSubSource}.
*/
public abstract Builder<OutputT> setSubscriptionName(String subscriptionName);

/**
* Sets the deserialization schema used to deserialize {@link PubsubMessage} for processing.
*
* <p>Setting this option is required to build {@link PubSubSource}.
*/
public abstract Builder<OutputT> setDeserializationSchema(
PubSubDeserializationSchema<OutputT> deserializationSchema);

/**
* Sets the max number of messages that can be outstanding to a StreamingPull connection.
*
* <p>Defaults to 1,000 outstanding messages. A message is considered outstanding when it is
* delivered and waiting to be acknowledged in the next successful checkpoint. Google Cloud
* Pub/Sub suspends message delivery to StreamingPull connections that reach this limit.
*
* <p>If set, this value must be > 0. Otherwise, calling {@link build} will throw an exception.
*/
public abstract Builder<OutputT> setMaxOutstandingMessagesCount(Long count);

/**
* Sets the max cumulative message bytes that can be outstanding to a StreamingPull connection.
*
* <p>Defaults to 100 MB. A message is considered outstanding when it is delivered and waiting
* to be acknowledged in the next successful checkpoint. Google Cloud Pub/Sub suspends message
* delivery to StreamingPull connections that reach this limit.
*
* <p>If set, this value must be > 0. Otherwise, calling {@link build} will throw an exception.
*/
public abstract Builder<OutputT> setMaxOutstandingMessagesBytes(Long bytes);

/**
* Sets the number of StreamingPull connections opened by each {@link PubSubSource} subtask.
*
* <p>Defaults to 1.
*
* <p>If set, this value must be > 0. Otherwise, calling {@link build} will throw an exception.
*/
public abstract Builder<OutputT> setParallelPullCount(Integer parallelPullCount);

/**
* Sets the credentials used when pulling messages from Google Cloud Pub/Sub.
*
* <p>If not set, then Application Default Credentials are used for authentication.
*/
public abstract Builder<OutputT> setCredentials(Credentials credentials);

/**
* Sets the Google Cloud Pub/Sub service endpoint from which messages are pulled.
*
* <p>Defaults to connecting to the global endpoint, which routes requests to the nearest
* regional endpoint.
*/
public abstract Builder<OutputT> setEndpoint(String endpoint);

abstract PubSubSource<OutputT> autoBuild();

public final PubSubSource<OutputT> build() {
Expand All @@ -193,6 +266,9 @@ public final PubSubSource<OutputT> build() {
Preconditions.checkArgument(
source.maxOutstandingMessagesBytes().or(1L) > 0,
"maxOutstandingMessagesBytes, if set, must be a value greater than 0.");
Preconditions.checkArgument(
source.parallelPullCount().or(1) > 0,
"parallelPullCount, if set, must be a value greater than 0.");
return source;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,16 @@ public void build_invalidCreds() throws Exception {
assertThrows(
NullPointerException.class, () -> PubSubSink.<String>builder().setCredentials(null));
}

@Test
public void build_invalidEnableMessageOrdering() throws Exception {
assertThrows(
NullPointerException.class,
() -> PubSubSink.<String>builder().setEnableMessageOrdering(null));
}

@Test
public void build_invalidEndpoint() throws Exception {
assertThrows(NullPointerException.class, () -> PubSubSink.<String>builder().setEndpoint(null));
}
}
Loading

0 comments on commit 3991c82

Please sign in to comment.