diff --git a/.test-infra/jenkins/job_PostCommit_Python_Examples_Flink.groovy b/.test-infra/jenkins/job_PostCommit_Python_Examples_Flink.groovy index c1a44b8e9d43..779395bf7093 100644 --- a/.test-infra/jenkins/job_PostCommit_Python_Examples_Flink.groovy +++ b/.test-infra/jenkins/job_PostCommit_Python_Examples_Flink.groovy @@ -37,7 +37,6 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Python_Examples_Flink', gradle { rootBuildScriptDir(commonJobProperties.checkoutDir) tasks(":sdks:python:test-suites:portable:flinkExamplesPostCommit") - switches("-PflinkConfDir=$WORKSPACE/src/runners/flink/src/test/resources") commonJobProperties.setGradleSwitches(delegate) } } diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 26acc5137e60..2253df597bb4 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -137,8 +137,6 @@ var portableFilters = []string{ var flinkFilters = []string{ // TODO(https://github.com/apache/beam/issues/20723): Flink tests timing out on reads. "TestXLang_Combine.*", - // TODO(https://github.com/apache/beam/issues/21094): Test fails on post commits: "Insufficient number of network buffers". - "TestXLang_Multi", "TestDebeziumIO_BasicRead", // TODO(BEAM-13215): GCP IOs currently do not work in non-Dataflow portable runners. "TestBigQueryIO.*", diff --git a/sdks/python/apache_beam/examples/complete/tfidf_it_test.py b/sdks/python/apache_beam/examples/complete/tfidf_it_test.py index fe1649bbfa35..3ecbd0c1ecae 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf_it_test.py +++ b/sdks/python/apache_beam/examples/complete/tfidf_it_test.py @@ -42,6 +42,7 @@ class TfIdfIT(unittest.TestCase): @pytest.mark.examples_postcommit + @pytest.mark.sickbay_flink def test_basics(self): test_pipeline = TestPipeline(is_integration_test=True) diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_it_test.py index 1f58e82e47d9..bce8584db21c 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_it_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_it_test.py @@ -45,6 +45,7 @@ def setUp(self): @pytest.mark.no_xdist @pytest.mark.examples_postcommit + @pytest.mark.sickbay_flink def test_bigquery_side_input_it(self): state_verifier = PipelineStateMatcher(PipelineState.DONE) NUM_GROUPS = 3 diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py index e996084b81cd..a53318a94a85 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts_test.py @@ -136,6 +136,7 @@ def normalize_tsv_results(self, tsv_data): return '\n'.join(sorted(lines_out)) + '\n' @pytest.mark.examples_postcommit + @pytest.mark.sickbay_flink def test_mergecontacts(self): test_pipeline = TestPipeline(is_integration_test=True) diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py index 706cff70ba70..d4f540163a38 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo_test.py @@ -49,6 +49,7 @@ def get_wordcount_results(self, result_path): return results @pytest.mark.examples_postcommit + @pytest.mark.sickbay_flink def test_multiple_output_pardo(self): test_pipeline = TestPipeline(is_integration_test=True) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index b51a3c93bb86..627b3c27ef20 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1464,6 +1464,7 @@ def _add_argparse_args(cls, parser): ' directly, rather than starting up a job server.' ' Only applies when flink_master is set to a' ' cluster address. Requires Python 3.6+.') + parser.add_argument('--flink_conf_dir', help='Path to flink-conf-dir') parser.add_argument( '--parallelism', type=int, diff --git a/sdks/python/apache_beam/runners/portability/flink_runner.py b/sdks/python/apache_beam/runners/portability/flink_runner.py index efa17cd01a93..5425369eff8c 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner.py @@ -87,6 +87,7 @@ def __init__(self, options): self._jar = options.flink_job_server_jar self._master_url = options.flink_master self._flink_version = options.flink_version + self.flink_conf_dir = options.flink_conf_dir def path_to_jar(self): if self._jar: @@ -106,7 +107,7 @@ def path_to_jar(self): def java_arguments( self, job_port, artifact_port, expansion_port, artifacts_dir): - return [ + args = [ '--flink-master', self._master_url, '--artifacts-dir', @@ -118,3 +119,6 @@ def java_arguments( '--expansion-port', expansion_port ] + if self.flink_conf_dir is not None: + args += ['--flink-conf-dir', self.flink_conf_dir] + return args diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index 27e4ca4973ee..48a8d382e598 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -18,6 +18,7 @@ import argparse import logging +import pathlib import shlex import typing import unittest @@ -55,6 +56,8 @@ Row = typing.NamedTuple("Row", [("col1", int), ("col2", str)]) beam.coders.registry.register_coder(Row, beam.coders.RowCoder) +default_flink_conf_path = pathlib.Path(__file__).parent.joinpath( + '../flink/src/test/resources/').resolve() class FlinkRunnerTest(portable_runner_test.PortableRunnerTest): @@ -144,6 +147,9 @@ def _create_conf_dir(cls): 'metrics.reporter.file.class: %s' % file_reporter, 'metrics.reporter.file.path: %s' % cls.test_metrics_path, 'metrics.scope.operator: ', + 'parallelism.default: 23', + 'taskmanager.memory.network.fraction: 0.2', + 'taskmanager.memory.network.max: 2gb', ])) @classmethod @@ -164,7 +170,7 @@ def _subprocess_command(cls, job_port, expansion_port): '--flink-master', '[local]', '--flink-conf-dir', - cls.conf_dir, + cls.conf_dir if cls.conf_dir is not None else default_flink_conf_path, '--artifacts-dir', tmp_dir, '--job-port', diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle index 9585d0922046..ef4982b8a947 100644 --- a/sdks/python/test-suites/portable/common.gradle +++ b/sdks/python/test-suites/portable/common.gradle @@ -230,6 +230,7 @@ project.tasks.register("flinkExamples") { "--environment_type=LOOPBACK", "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it", "--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}", + "--flink_conf_dir=${rootDir}/runners/flink/src/test/resources", '--sdk_harness_log_level_overrides=' + // suppress info level flink.runtime log flood '{\\"org.apache.flink.runtime\\":\\"WARN\\",' + @@ -309,6 +310,7 @@ project.tasks.register("postCommitPy${pythonVersionSuffix}IT") { "--environment_type=LOOPBACK", "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it", "--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}", + "--flink_conf_dir=${rootDir}/runners/flink/src/test/resources", '--sdk_harness_log_level_overrides=' + // suppress info level flink.runtime log flood '{\\"org.apache.flink.runtime\\":\\"WARN\\",' + @@ -317,9 +319,6 @@ project.tasks.register("postCommitPy${pythonVersionSuffix}IT") { // suppress metric name collision warning logs '\\"org.apache.flink.runtime.metrics.groups\\":\\"ERROR\\"}' ] - if (project.hasProperty('flinkConfDir')) { - pipelineOpts += ["--flink-conf-dir=${project.property('flinkConfDir')}"] - } def cmdArgs = mapToArgString([ "test_opts": testOpts, "suite": "postCommitIT-flink-py${pythonVersionSuffix}", @@ -358,6 +357,7 @@ project.tasks.register("xlangSpannerIOIT") { "--environment_type=LOOPBACK", "--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it", "--flink_job_server_jar=${project(":runners:flink:${latestFlinkVersion}:job-server").shadowJar.archivePath}", + "--flink_conf_dir=${rootDir}/runners/flink/src/test/resources", '--sdk_harness_log_level_overrides=' + // suppress info level flink.runtime log flood '{\\"org.apache.flink.runtime\\":\\"WARN\\",' +