From e08578a93c13c17c2000d683f9abe0397812dd34 Mon Sep 17 00:00:00 2001 From: Kanishk Karanawat Date: Wed, 20 Sep 2023 12:06:16 -0700 Subject: [PATCH] flush buffer during drain operation for requiresStableInput operator correct log message format apply spotless flush buffer when requiresStableInput is set re-organize imports add flag in FlinkPipelineOptions to allow draining for pipelines with RequiresStableInput apply spotless again --- .../beam/runners/flink/FlinkPipelineOptions.java | 8 ++++++++ .../shortcodes/flink_java_pipeline_options.html | 15 +++++++++++++++ .../shortcodes/flink_python_pipeline_options.html | 15 +++++++++++++++ 3 files changed, 38 insertions(+) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index d0e16e3e530d..4c9262013771 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -328,6 +328,14 @@ public interface FlinkPipelineOptions void setFileInputSplitMaxSizeMB(Long fileInputSplitMaxSizeMB); + @Description( + "Allow drain operation for flink pipelines that contain RequiresStableInput operator. Note that at time of draining," + + "the RequiresStableInput contract might be violated if there any processing related failures in the DoFn operator.") + @Default.Boolean(false) + Boolean getEnableStableInputDrain(); + + void setEnableStableInputDrain(Boolean enableStableInputDrain); + static FlinkPipelineOptions defaults() { return PipelineOptionsFactory.as(FlinkPipelineOptions.class); } diff --git a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html index 87d69ee60fe3..a6b7ed28f403 100644 --- a/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_java_pipeline_options.html @@ -22,6 +22,11 @@ Flag indicating whether non restored state is allowed if the savepoint contains state for an operator that is no longer part of the pipeline. Default: false + + attachedMode + Specifies if the pipeline is submitted in attached or detached mode + Default: true + autoBalanceWriteFilesShardingEnabled Flag indicating whether auto-balance sharding for WriteFiles transform should be enabled. This might prove useful in streaming use-case, where pipeline needs to write quite many events into files, typically divided into N shards. Default behavior on Flink would be, that some workers will receive more shards to take care of than others. This cause workers to go out of balance in terms of processing backlog and memory usage. Enabling this feature will make shards to be spread evenly among available workers in improve throughput and memory usage stability. @@ -82,6 +87,11 @@ Remove unneeded deep copy between operators. See https://issues.apache.org/jira/browse/BEAM-11146 Default: false + + fileInputSplitMaxSizeMB + Set the maximum size of input split when data is read from a filesystem. 0 implies no max size. + Default: 0 + finishBundleBeforeCheckpointing If set, finishes the current bundle and flushes all output before checkpointing the state of the operators. By default, starts checkpointing immediately and buffers any remaining bundle output as part of the checkpoint. The setting may affect the checkpoint alignment. @@ -97,6 +107,11 @@ Address of the Flink Master where the Pipeline should be executed. Can either be of the form "host:port" or one of the special values [local], [collection] or [auto]. Default: [auto] + + jobCheckIntervalInSecs + Set job check interval in seconds under detached mode in method waitUntilFinish, by default it is 5 seconds + Default: 5 + latencyTrackingInterval Interval in milliseconds for sending latency tracking marks from the sources to the sinks. Interval value <= 0 disables the feature. diff --git a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html index 27ae27ad05a3..494b01bc1d02 100644 --- a/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html +++ b/website/www/site/layouts/shortcodes/flink_python_pipeline_options.html @@ -22,6 +22,11 @@ Flag indicating whether non restored state is allowed if the savepoint contains state for an operator that is no longer part of the pipeline. Default: false + + attached_mode + Specifies if the pipeline is submitted in attached or detached mode + Default: true + auto_balance_write_files_sharding_enabled Flag indicating whether auto-balance sharding for WriteFiles transform should be enabled. This might prove useful in streaming use-case, where pipeline needs to write quite many events into files, typically divided into N shards. Default behavior on Flink would be, that some workers will receive more shards to take care of than others. This cause workers to go out of balance in terms of processing backlog and memory usage. Enabling this feature will make shards to be spread evenly among available workers in improve throughput and memory usage stability. @@ -82,6 +87,11 @@ Remove unneeded deep copy between operators. See https://issues.apache.org/jira/browse/BEAM-11146 Default: false + + file_input_split_max_size_m_b + Set the maximum size of input split when data is read from a filesystem. 0 implies no max size. + Default: 0 + finish_bundle_before_checkpointing If set, finishes the current bundle and flushes all output before checkpointing the state of the operators. By default, starts checkpointing immediately and buffers any remaining bundle output as part of the checkpoint. The setting may affect the checkpoint alignment. @@ -97,6 +107,11 @@ Address of the Flink Master where the Pipeline should be executed. Can either be of the form "host:port" or one of the special values [local], [collection] or [auto]. Default: [auto] + + job_check_interval_in_secs + Set job check interval in seconds under detached mode in method waitUntilFinish, by default it is 5 seconds + Default: 5 + latency_tracking_interval Interval in milliseconds for sending latency tracking marks from the sources to the sinks. Interval value <= 0 disables the feature.