Skip to content

Fix flaky BigQuery file loads by safely handling concurrent mkdirs#38426

Open
shunping wants to merge 1 commit intoapache:masterfrom
shunping:fix-bq-file-loads
Open

Fix flaky BigQuery file loads by safely handling concurrent mkdirs#38426
shunping wants to merge 1 commit intoapache:masterfrom
shunping:fix-bq-file-loads

Conversation

@shunping
Copy link
Copy Markdown
Collaborator

@shunping shunping commented May 9, 2026

The test TestBigQueryFileLoads.test_reshuffle_before_load_0 is flaky (https://github.com/apache/beam/actions/runs/25584850292/job/75111278535?pr=38423) with the following traceback:

E       RuntimeError: Pipeline job-007 failed in state FAILED: bundle inst005 stage-005 failed:Traceback (most recent call last):
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/io/localfilesystem.py", line 79, in mkdirs
E           os.makedirs(path)
E         File "/opt/hostedtoolcache/Python/3.10.20/x64/lib/python3.10/os.py", line 225, in makedirs
E           mkdir(name, mode)
E       FileExistsError: [Errno 17] File exists: '/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-cloud/py310-cloud/tmp/tmpsbxy358s/bq_load/91debe7ffc444e4789363222a9eb638e/project1.dataset1.table1'
E       
E       During handling of the above exception, another exception occurred:
E       
E       Traceback (most recent call last):
E         File "apache_beam/runners/common.py", line 1499, in apache_beam.runners.common.DoFnRunner.process
E           return self.do_fn_invoker.invoke_process(windowed_value)
E         File "apache_beam/runners/common.py", line 913, in apache_beam.runners.common.PerWindowInvoker.invoke_process
E           self._invoke_process_per_window(
E         File "apache_beam/runners/common.py", line 1056, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
E           self.output_handler.handle_process_outputs(
E         File "apache_beam/runners/common.py", line 1674, in apache_beam.runners.common._OutputHandler.handle_process_outputs
E         File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
E           self.consumer.process(windowed_value)
E         File "apache_beam/runners/worker/operations.py", line 955, in apache_beam.runners.worker.operations.DoOperation.process
E           with self.scoped_process_state:
E         File "apache_beam/runners/worker/operations.py", line 956, in apache_beam.runners.worker.operations.DoOperation.process
E           delayed_applications = self.dofn_runner.process(o)
E         File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process
E           self._reraise_augmented(exn, windowed_value)
E         File "apache_beam/runners/common.py", line 1589, in apache_beam.runners.common.DoFnRunner._reraise_augmented
E           raise exn
E         File "apache_beam/runners/common.py", line 1499, in apache_beam.runners.common.DoFnRunner.process
E           return self.do_fn_invoker.invoke_process(windowed_value)
E         File "apache_beam/runners/common.py", line 913, in apache_beam.runners.common.PerWindowInvoker.invoke_process
E           self._invoke_process_per_window(
E         File "apache_beam/runners/common.py", line 1056, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
E           self.output_handler.handle_process_outputs(
E         File "apache_beam/runners/common.py", line 1684, in apache_beam.runners.common._OutputHandler.handle_process_outputs
E           self._write_value_to_tag(tag, windowed_value, watermark_estimator)
E         File "apache_beam/runners/common.py", line 1797, in apache_beam.runners.common._OutputHandler._write_value_to_tag
E           self.main_receivers.receive(windowed_value)
E         File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
E           self.consumer.process(windowed_value)
E         File "apache_beam/runners/worker/operations.py", line 955, in apache_beam.runners.worker.operations.DoOperation.process
E           with self.scoped_process_state:
E         File "apache_beam/runners/worker/operations.py", line 956, in apache_beam.runners.worker.operations.DoOperation.process
E           delayed_applications = self.dofn_runner.process(o)
E         File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process
E           self._reraise_augmented(exn, windowed_value)
E         File "apache_beam/runners/common.py", line 1589, in apache_beam.runners.common.DoFnRunner._reraise_augmented
E           raise exn
E         File "apache_beam/runners/common.py", line 1499, in apache_beam.runners.common.DoFnRunner.process
E           return self.do_fn_invoker.invoke_process(windowed_value)
E         File "apache_beam/runners/common.py", line 685, in apache_beam.runners.common.SimpleInvoker.invoke_process
E           self.output_handler.handle_process_outputs(
E         File "apache_beam/runners/common.py", line 1684, in apache_beam.runners.common._OutputHandler.handle_process_outputs
E           self._write_value_to_tag(tag, windowed_value, watermark_estimator)
E         File "apache_beam/runners/common.py", line 1797, in apache_beam.runners.common._OutputHandler._write_value_to_tag
E           self.main_receivers.receive(windowed_value)
E         File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
E           self.consumer.process(windowed_value)
E         File "apache_beam/runners/worker/operations.py", line 955, in apache_beam.runners.worker.operations.DoOperation.process
E           with self.scoped_process_state:
E         File "apache_beam/runners/worker/operations.py", line 956, in apache_beam.runners.worker.operations.DoOperation.process
E           delayed_applications = self.dofn_runner.process(o)
E         File "apache_beam/runners/common.py", line 1501, in apache_beam.runners.common.DoFnRunner.process
E           self._reraise_augmented(exn, windowed_value)
E         File "apache_beam/runners/common.py", line 1610, in apache_beam.runners.common.DoFnRunner._reraise_augmented
E           raise new_exn
E         File "apache_beam/runners/common.py", line 1499, in apache_beam.runners.common.DoFnRunner.process
E           return self.do_fn_invoker.invoke_process(windowed_value)
E         File "apache_beam/runners/common.py", line 913, in apache_beam.runners.common.PerWindowInvoker.invoke_process
E           self._invoke_process_per_window(
E         File "apache_beam/runners/common.py", line 1056, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
E           self.output_handler.handle_process_outputs(
E         File "apache_beam/runners/common.py", line 1674, in apache_beam.runners.common._OutputHandler.handle_process_outputs
E           for result in results:
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 248, in process
E           self._destination_to_file_writer[destination] = _make_new_file_writer(
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 137, in _make_new_file_writer
E           fs.FileSystems.mkdirs(directory)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/io/filesystems.py", line 201, in mkdirs
E           return filesystem.mkdirs(path)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-cloud/py310-cloud/lib/python3.10/site-packages/apache_beam/io/localfilesystem.py", line 81, in mkdirs
E           raise IOError(err)
E       RuntimeError: OSError: [Errno 17] File exists: '/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-cloud/py310-cloud/tmp/tmpsbxy358s/bq_load/91debe7ffc444e4789363222a9eb638e/project1.dataset1.table1' [while running 'BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)']

When writing files for BigQuery loads, multiple workers or bundles processing records for the same destination can run concurrently. They all share the same file_prefix and construct the exact same destination directory path.

When two workers concurrently execute the code at https://github.com/shunping/beam/blob/a8e7ffab3716459518aeea2e81fe7181c6885179/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L136:

if not fs.FileSystems.exists(directory):
    fs.FileSystems.mkdirs(directory)

this could lead to a race condition when both workers see the directory non-existent and try to create it. As a result, only one worker will be able to create the directory successfully, while the other will raise a "File exists" error.

In this PR, we fix it by ignoring if a directory exists in the concurrent worker scenario.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a race condition in the BigQuery file loading process where multiple concurrent workers attempt to create the same destination directory simultaneously. By catching and validating the IOError during directory creation, the system now safely allows multiple workers to proceed if the directory has already been initialized by a peer, thereby resolving the observed flakiness in the TestBigQueryFileLoads test suite.

Highlights

  • Race Condition Resolution: Added error handling in _make_new_file_writer to gracefully handle concurrent directory creation attempts by multiple workers.
  • IOError Suppression: Implemented a try-except block that ignores IOError during mkdirs if the directory is confirmed to exist, preventing unnecessary pipeline failures.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request updates the directory creation logic in bigquery_file_loads.py to handle potential race conditions between concurrent workers. It wraps the mkdirs call in a try-except block to ignore IOError if the directory was successfully created by another worker. No review comments were provided for this change, and I have no additional feedback.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

Assigning reviewers:

R: @damccorm for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant