From c9445efaae0c62a8c648f4a8875c20cf22bf3b8a Mon Sep 17 00:00:00 2001 From: TongruiLi Date: Thu, 7 May 2026 22:24:17 +0000 Subject: [PATCH 1/4] Aded portable runner to python and go runners --- sdks/go/pkg/beam/runners/dataflow/dataflow.go | 10 ++++-- .../beam/runners/dataflow/dataflow_test.go | 34 +++++++++++++++++++ .../runners/dataflow/dataflow_runner.py | 4 +++ 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index e968911fcca1..1519cabf673e 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -335,7 +335,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions experiments := jobopts.GetExperiments() // Ensure that we enable the same set of experiments across all SDKs // for runner v2. - var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool + var fnApiSet, v2set, uwSet, portableRunnerSet, portaSubmission, seSet, wsSet bool for _, e := range experiments { if strings.Contains(e, "beam_fn_api") { fnApiSet = true @@ -349,7 +349,10 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if strings.Contains(e, "use_portable_job_submission") { portaSubmission = true } - if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") { + if strings.Contains(e, "enable_portable_runner") { + portableRunnerSet = true + } + if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") || strings.Contains(e, "disable_portable_runner") || strings.Contains(e, "enable_streaming_java_runner") { return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam version 2.45.0+") } } @@ -366,6 +369,9 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if !portaSubmission { experiments = append(experiments, "use_portable_job_submission") } + if !portableRunnerSet { + // As this option is not documented, we do not set it by default. This behavior will be fixed in later versions. + } // Ensure that streaming specific experiments are set for streaming pipelines // since runner v2 only supports using streaming engine. diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go index 23dcd034120a..529ae5c087cd 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow_test.go @@ -244,6 +244,40 @@ func TestGetJobOptions_DisableRunnerV2ExperimentsSet(t *testing.T) { } } +func TestGetJobOptions_DisablePortableRunnerExperimentsSet(t *testing.T) { + resetGlobals() + *stagingLocation = "gs://testStagingLocation" + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + *jobopts.Experiments = "disable_portable_runner" + + opts, err := getJobOptions(context.Background(), false) + + if err == nil { + t.Error("getJobOptions() returned error nil, want an error") + } + if opts != nil { + t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) + } +} + +func TestGetJobOptions_EnableStreamingJavaRunnerExperimentsSet(t *testing.T) { + resetGlobals() + *stagingLocation = "gs://testStagingLocation" + *gcpopts.Project = "testProject" + *gcpopts.Region = "testRegion" + *jobopts.Experiments = "enable_streaming_java_runner" + + opts, err := getJobOptions(context.Background(), false) + + if err == nil { + t.Error("getJobOptions() returned error nil, want an error") + } + if opts != nil { + t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) + } +} + func TestGetJobOptions_NoStagingLocation(t *testing.T) { resetGlobals() *stagingLocation = "" diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 0c23e6024dc6..58da4461593c 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -591,6 +591,8 @@ def _add_runner_v2_missing_options(options): debug_options.add_experiment('use_unified_worker') debug_options.add_experiment('use_runner_v2') debug_options.add_experiment('use_portable_job_submission') + # enable_portable_runner is not added by default as it is not documented. + # This behavior will be fixed in later versions. def _check_and_add_missing_options(options): @@ -662,6 +664,8 @@ def _is_runner_v2_disabled(options): """Returns true if runner v2 is disabled.""" debug_options = options.view_as(DebugOptions) return ( + debug_options.lookup_experiment('disable_portable_runner') or + debug_options.lookup_experiment('enable_streaming_java_runner') or debug_options.lookup_experiment('disable_runner_v2') or debug_options.lookup_experiment('disable_runner_v2_until_2023') or debug_options.lookup_experiment('disable_runner_v2_until_v2.50') or From dbb982648f4cf82adc641db5d635422de989b53a Mon Sep 17 00:00:00 2001 From: TongruiLi <12992126+TongruiLi@users.noreply.github.com> Date: Fri, 8 May 2026 21:42:28 +0000 Subject: [PATCH 2/4] Removed empty if, added disable unit test for python. --- sdks/go/pkg/beam/runners/dataflow/dataflow.go | 4 +--- .../runners/dataflow/dataflow_runner_test.py | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index 1519cabf673e..651a86b9ee42 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -369,9 +369,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if !portaSubmission { experiments = append(experiments, "use_portable_job_submission") } - if !portableRunnerSet { - // As this option is not documented, we do not set it by default. This behavior will be fixed in later versions. - } + // As portable_runner is not documented, we do not set it by default. This behavior will be fixed in later versions. // Ensure that streaming specific experiments are set for streaming pipelines // since runner v2 only supports using streaming engine. diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index e1b8be6682f9..2d8f840a4d00 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -41,6 +41,7 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_options from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_streaming_options +from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2_disabled from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api from apache_beam.runners.internal import names from apache_beam.runners.runner import PipelineState @@ -734,5 +735,27 @@ def test_explicit_streaming_no_unbounded(self): apiclient.dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING) +class DataflowRunnerV2DisabledTest(unittest.TestCase): + + def test_runner_v2_disabled_experiments_raise(self): + disable_experiments = [ + 'disable_portable_runner', + 'enable_streaming_java_runner', + 'disable_runner_v2', + 'disable_runner_v2_until_2023', + 'disable_runner_v2_until_v2.50', + 'disable_prime_runner_v2', + ] + for experiment in disable_experiments: + options = PipelineOptions([f'--experiments={experiment}']) + self.assertTrue( + _is_runner_v2_disabled(options), + f'Expected {experiment} to disable runner v2') + with self.assertRaisesRegex( + ValueError, + 'Disabling Runner V2 no longer supported'): + DataflowRunner().run_pipeline(None, options) + + if __name__ == '__main__': unittest.main() From ac3716d950fd3420819adfdf359a0be188639012 Mon Sep 17 00:00:00 2001 From: TongruiLi <12992126+TongruiLi@users.noreply.github.com> Date: Fri, 8 May 2026 21:43:19 +0000 Subject: [PATCH 3/4] Removed unnessary class in unit test --- .../apache_beam/runners/dataflow/dataflow_runner_test.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 2d8f840a4d00..b3035d38c7c0 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -734,9 +734,6 @@ def test_explicit_streaming_no_unbounded(self): p.result.job.proto.type, apiclient.dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING) - -class DataflowRunnerV2DisabledTest(unittest.TestCase): - def test_runner_v2_disabled_experiments_raise(self): disable_experiments = [ 'disable_portable_runner', From 1069559f292757e3b0cbfe94d7bf590caa003fb4 Mon Sep 17 00:00:00 2001 From: TongruiLi <12992126+TongruiLi@users.noreply.github.com> Date: Fri, 8 May 2026 21:50:35 +0000 Subject: [PATCH 4/4] Removed unused variable --- sdks/go/pkg/beam/runners/dataflow/dataflow.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go index 651a86b9ee42..852613569801 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go @@ -335,7 +335,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions experiments := jobopts.GetExperiments() // Ensure that we enable the same set of experiments across all SDKs // for runner v2. - var fnApiSet, v2set, uwSet, portableRunnerSet, portaSubmission, seSet, wsSet bool + var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool for _, e := range experiments { if strings.Contains(e, "beam_fn_api") { fnApiSet = true @@ -349,9 +349,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if strings.Contains(e, "use_portable_job_submission") { portaSubmission = true } - if strings.Contains(e, "enable_portable_runner") { - portableRunnerSet = true - } + // enable_portable_runner is not documented and hence wont be set by default. This will be fixed in later versions. if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") || strings.Contains(e, "disable_portable_runner") || strings.Contains(e, "enable_streaming_java_runner") { return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam version 2.45.0+") } @@ -369,7 +367,6 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions if !portaSubmission { experiments = append(experiments, "use_portable_job_submission") } - // As portable_runner is not documented, we do not set it by default. This behavior will be fixed in later versions. // Ensure that streaming specific experiments are set for streaming pipelines // since runner v2 only supports using streaming engine.