Skip to content

Commit

Permalink
flush buffer during drain operation for requiresStableInput operator
Browse files Browse the repository at this point in the history
correct log message format

apply spotless

flush buffer when requiresStableInput is set

re-organize imports

add flag in FlinkPipelineOptions to allow draining for pipelines with RequiresStableInput

apply spotless again
  • Loading branch information
Kanishk Karanawat committed Oct 21, 2023
1 parent 0a44a2b commit e08578a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,14 @@ public interface FlinkPipelineOptions

void setFileInputSplitMaxSizeMB(Long fileInputSplitMaxSizeMB);

@Description(
"Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining,"
+ "the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.")
@Default.Boolean(false)
Boolean getEnableStableInputDrain();

void setEnableStableInputDrain(Boolean enableStableInputDrain);

static FlinkPipelineOptions defaults() {
return PipelineOptionsFactory.as(FlinkPipelineOptions.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
<td>Flag indicating whether non restored state is allowed if the savepoint contains state for an operator that is no longer part of the pipeline.</td>
<td>Default: <code>false</code></td>
</tr>
<tr>
<td><code>attachedMode</code></td>
<td>Specifies if the pipeline is submitted in attached or detached mode</td>
<td>Default: <code>true</code></td>
</tr>
<tr>
<td><code>autoBalanceWriteFilesShardingEnabled</code></td>
<td>Flag indicating whether auto-balance sharding for WriteFiles transform should be enabled. This might prove useful in streaming use-case, where pipeline needs to write quite many events into files, typically divided into N shards. Default behavior on Flink would be, that some workers will receive more shards to take care of than others. This cause workers to go out of balance in terms of processing backlog and memory usage. Enabling this feature will make shards to be spread evenly among available workers in improve throughput and memory usage stability.</td>
Expand Down Expand Up @@ -82,6 +87,11 @@
<td>Remove unneeded deep copy between operators. See https://issues.apache.org/jira/browse/BEAM-11146</td>
<td>Default: <code>false</code></td>
</tr>
<tr>
<td><code>fileInputSplitMaxSizeMB</code></td>
<td>Set the maximum size of input split when data is read from a filesystem. 0 implies no max size.</td>
<td>Default: <code>0</code></td>
</tr>
<tr>
<td><code>finishBundleBeforeCheckpointing</code></td>
<td>If set, finishes the current bundle and flushes all output before checkpointing the state of the operators. By default, starts checkpointing immediately and buffers any remaining bundle output as part of the checkpoint. The setting may affect the checkpoint alignment.</td>
Expand All @@ -97,6 +107,11 @@
<td>Address of the Flink Master where the Pipeline should be executed. Can either be of the form "host:port" or one of the special values [local], [collection] or [auto].</td>
<td>Default: <code>[auto]</code></td>
</tr>
<tr>
<td><code>jobCheckIntervalInSecs</code></td>
<td>Set job check interval in seconds under detached mode in method waitUntilFinish, by default it is 5 seconds</td>
<td>Default: <code>5</code></td>
</tr>
<tr>
<td><code>latencyTrackingInterval</code></td>
<td>Interval in milliseconds for sending latency tracking marks from the sources to the sinks. Interval value <= 0 disables the feature.</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
<td>Flag indicating whether non restored state is allowed if the savepoint contains state for an operator that is no longer part of the pipeline.</td>
<td>Default: <code>false</code></td>
</tr>
<tr>
<td><code>attached_mode</code></td>
<td>Specifies if the pipeline is submitted in attached or detached mode</td>
<td>Default: <code>true</code></td>
</tr>
<tr>
<td><code>auto_balance_write_files_sharding_enabled</code></td>
<td>Flag indicating whether auto-balance sharding for WriteFiles transform should be enabled. This might prove useful in streaming use-case, where pipeline needs to write quite many events into files, typically divided into N shards. Default behavior on Flink would be, that some workers will receive more shards to take care of than others. This cause workers to go out of balance in terms of processing backlog and memory usage. Enabling this feature will make shards to be spread evenly among available workers in improve throughput and memory usage stability.</td>
Expand Down Expand Up @@ -82,6 +87,11 @@
<td>Remove unneeded deep copy between operators. See https://issues.apache.org/jira/browse/BEAM-11146</td>
<td>Default: <code>false</code></td>
</tr>
<tr>
<td><code>file_input_split_max_size_m_b</code></td>
<td>Set the maximum size of input split when data is read from a filesystem. 0 implies no max size.</td>
<td>Default: <code>0</code></td>
</tr>
<tr>
<td><code>finish_bundle_before_checkpointing</code></td>
<td>If set, finishes the current bundle and flushes all output before checkpointing the state of the operators. By default, starts checkpointing immediately and buffers any remaining bundle output as part of the checkpoint. The setting may affect the checkpoint alignment.</td>
Expand All @@ -97,6 +107,11 @@
<td>Address of the Flink Master where the Pipeline should be executed. Can either be of the form "host:port" or one of the special values [local], [collection] or [auto].</td>
<td>Default: <code>[auto]</code></td>
</tr>
<tr>
<td><code>job_check_interval_in_secs</code></td>
<td>Set job check interval in seconds under detached mode in method waitUntilFinish, by default it is 5 seconds</td>
<td>Default: <code>5</code></td>
</tr>
<tr>
<td><code>latency_tracking_interval</code></td>
<td>Interval in milliseconds for sending latency tracking marks from the sources to the sinks. Interval value <= 0 disables the feature.</td>
Expand Down

0 comments on commit e08578a

Please sign in to comment.