From 2fcdd6bfb8d35e53e3732cd42b51a13657c05065 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 12:38:15 +0800 Subject: [PATCH 01/26] [python] Split manifest files by target size during commit and compaction Manifest files written by pypaimon commit and minor compaction were not split by manifest-target-file-size. A single OVERWRITE commit writing 400 partitions produced a 138 MiB manifest (17x the 8 MiB default target), slowing every subsequent commit that reads it. Add ManifestFileManager.rolling_write() that estimates the total serialized size, then splits entries across multiple files so each stays near the target. Apply it in FileStoreCommit delta/changelog manifest writing and ManifestFileMerger minor compaction. --- .../manifest/manifest_file_manager.py | 59 ++++++++++++++++--- .../pypaimon/manifest/manifest_file_merger.py | 10 ++-- .../tests/manifest/manifest_manager_test.py | 40 +++++++++++++ .../pypaimon/write/file_store_commit.py | 13 ++-- 4 files changed, 101 insertions(+), 21 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index af710f94d6ba..6d22a67de241 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -208,9 +208,47 @@ def _get_value_stats_fields(self, file_dict: dict, schema_fields: list) -> List: return fields def write(self, file_name, entries: List[ManifestEntry]): - avro_records = [] + manifest_path = f"{self.manifest_path}/{file_name}" + try: + buffer = BytesIO() + fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, self._to_avro_records(entries)) + with self.file_io.new_output_stream(manifest_path) as output_stream: + output_stream.write(buffer.getvalue()) + except Exception as e: + self.file_io.delete_quietly(manifest_path) + raise RuntimeError(f"Failed to write manifest file: {e}") from e + + def rolling_write(self, entries: List[ManifestEntry], + suggested_file_size: int, + base_name: str) -> List[ManifestFileMeta]: + if not entries: + return [] + + avro_records = self._to_avro_records(entries) + buf = BytesIO() + fastavro.writer(buf, MANIFEST_ENTRY_SCHEMA, avro_records) + total_size = buf.tell() + + if total_size <= suggested_file_size: + return [self._flush_and_build_meta(base_name, entries, buf.getvalue())] + + num_files = max(1, (total_size + suggested_file_size - 1) // suggested_file_size) + chunk_size = max(1, (len(entries) + num_files - 1) // num_files) + result = [] + name_prefix = base_name.rsplit('-', 1)[0] if base_name[-1].isdigit() and '-' in base_name else base_name + for i in range(0, len(entries), chunk_size): + chunk_entries = entries[i:i + chunk_size] + chunk_records = avro_records[i:i + chunk_size] + file_name = f"{name_prefix}-{len(result)}" + chunk_buf = BytesIO() + fastavro.writer(chunk_buf, MANIFEST_ENTRY_SCHEMA, chunk_records) + result.append(self._flush_and_build_meta(file_name, chunk_entries, chunk_buf.getvalue())) + return result + + def _to_avro_records(self, entries: List[ManifestEntry]) -> List[dict]: + records = [] for entry in entries: - avro_record = { + records.append({ "_VERSION": 2, "_KIND": entry.kind, "_PARTITION": GenericRowSerializer.to_bytes(entry.partition), @@ -246,22 +284,21 @@ def write(self, file_name, entries: List[ManifestEntry]): "_FIRST_ROW_ID": entry.file.first_row_id, "_WRITE_COLS": entry.file.write_cols, } - } - avro_records.append(avro_record) + }) + return records + def _flush_and_build_meta(self, file_name: str, entries: List[ManifestEntry], + avro_bytes: bytes) -> ManifestFileMeta: manifest_path = f"{self.manifest_path}/{file_name}" try: - buffer = BytesIO() - fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, avro_records) - avro_bytes = buffer.getvalue() with self.file_io.new_output_stream(manifest_path) as output_stream: output_stream.write(avro_bytes) except Exception as e: self.file_io.delete_quietly(manifest_path) raise RuntimeError(f"Failed to write manifest file: {e}") from e + return self._build_meta(file_name, entries) - def write_with_meta(self, file_name, entries: List[ManifestEntry]) -> ManifestFileMeta: - self.write(file_name, entries) + def _build_meta(self, file_name: str, entries: List[ManifestEntry]) -> ManifestFileMeta: added_file_count = 0 deleted_file_count = 0 schema_id = None @@ -317,3 +354,7 @@ def write_with_meta(self, file_name, entries: List[ManifestEntry]) -> ManifestFi min_row_id=min_row_id, max_row_id=max_row_id, ) + + def write_with_meta(self, file_name, entries: List[ManifestEntry]) -> ManifestFileMeta: + self.write(file_name, entries) + return self._build_meta(file_name, entries) diff --git a/paimon-python/pypaimon/manifest/manifest_file_merger.py b/paimon-python/pypaimon/manifest/manifest_file_merger.py index 47cc66b72e37..adad641b4551 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_merger.py +++ b/paimon-python/pypaimon/manifest/manifest_file_merger.py @@ -86,12 +86,10 @@ def _merge_candidates(self, candidates: List[ManifestFileMeta], return manifest_file = "manifest-{}-0".format(str(uuid.uuid4())) - merged_meta = self.manifest_file_manager.write_with_meta( - manifest_file, - merged_entries, - ) - result.append(merged_meta) - new_files.append(merged_meta) + merged_metas = self.manifest_file_manager.rolling_write( + merged_entries, self.suggested_meta_size, manifest_file) + result.extend(merged_metas) + new_files.extend(merged_metas) def _delete_manifests(self, manifests: List[ManifestFileMeta]): for manifest in manifests: diff --git a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py index 2c6423af3f90..c3caef525d52 100644 --- a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py +++ b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py @@ -325,6 +325,46 @@ def test_read_write_cols_with_system_field(self): self.assertEqual(read_stats.max_values.get_field(0), 10) self.assertEqual(read_stats.null_counts, [2]) + def test_commit_manifest_exceeds_target_size(self): + target_size = 16 * 1024 + pa_schema = pa.schema([ + ('pt', pa.string()), + ('pk', pa.int32()), + ('val', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + primary_keys=['pk'], + partition_keys=['pt'], + options={ + 'bucket': '1', + 'manifest.target-file-size': '16 kb', + }, + ) + self.catalog.create_table('default.rolling_bug', schema, False) + table = self.catalog.get_table('default.rolling_bug') + + wb = table.new_batch_write_builder() + w = wb.new_write() + for pt in range(200): + rows = [{'pt': f'p{pt}', 'pk': i, 'val': f'v{i}'} for i in range(5)] + w.write_arrow(pa.Table.from_pylist(rows, schema=pa_schema)) + wb.new_commit().commit(w.prepare_commit()) + w.close() + + snap = table.snapshot_manager().get_latest_snapshot() + metas = ManifestListManager(table).read_all(snap) + max_allowed = target_size * 2 + oversized = [m for m in metas if m.file_size > max_allowed] + self.assertEqual( + len(oversized), 0, + f"{len(oversized)} manifest file(s) exceed 2x target ({max_allowed} bytes): " + f"{[(m.file_name, m.file_size) for m in oversized]}. " + f"Java uses RollingFileWriter to split; Python writes one file.") + self.assertGreater(len(metas), 1, + f"Expected multiple manifest files but got {len(metas)} " + f"with total {sum(m.file_size for m in metas)} bytes") + class ManifestListManagerTest(_ManifestManagerSetup): """Tests for ManifestListManager.""" diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 304792b4124e..715fabd77a3d 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -424,17 +424,17 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str changelog_record_count = None new_manifest_files_for_abort = [] try: - new_manifest_file_meta = self._write_manifest_file(commit_entries, new_manifest_file) - self.manifest_list_manager.write(delta_manifest_list, [new_manifest_file_meta]) + new_manifest_file_metas = self._write_manifest_files(commit_entries, new_manifest_file) + self.manifest_list_manager.write(delta_manifest_list, new_manifest_file_metas) # Write changelog manifest if changelog entries exist if changelog_entries: changelog_manifest_file = f"manifest-{str(uuid.uuid4())}-changelog-0" - changelog_manifest_file_meta = self._write_manifest_file( + changelog_manifest_file_metas = self._write_manifest_files( changelog_entries, changelog_manifest_file) changelog_manifest_list_name = f"manifest-list-{unique_id}-changelog" self.manifest_list_manager.write( - changelog_manifest_list_name, [changelog_manifest_file_meta]) + changelog_manifest_list_name, changelog_manifest_file_metas) manifest_path = self.manifest_list_manager.manifest_path changelog_manifest_list_size = self.table.file_io.get_file_size( f"{manifest_path}/{changelog_manifest_list_name}") @@ -543,8 +543,9 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str return SuccessResult() - def _write_manifest_file(self, commit_entries, new_manifest_file): - return self.manifest_file_manager.write_with_meta(new_manifest_file, commit_entries) + def _write_manifest_files(self, commit_entries, base_name): + return self.manifest_file_manager.rolling_write( + commit_entries, self.manifest_target_size, base_name) def _is_duplicate_commit(self, retry_result, latest_snapshot, commit_identifier, commit_kind) -> bool: if retry_result is not None and latest_snapshot is not None: From 9af4710de4aac71e955c9f2cec5e138468eaf874 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 12:51:30 +0800 Subject: [PATCH 02/26] [python] Fix rolling_write: clean up on partial failure, remove double serialization - Add try/except around chunk loop to delete already-written files on failure - Estimate entries_per_file from avg entry size instead of serializing twice - Extract _flush() and simplify write() to reuse it --- .../manifest/manifest_file_manager.py | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index 6d22a67de241..4407c010e64a 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -208,15 +208,9 @@ def _get_value_stats_fields(self, file_dict: dict, schema_fields: list) -> List: return fields def write(self, file_name, entries: List[ManifestEntry]): - manifest_path = f"{self.manifest_path}/{file_name}" - try: - buffer = BytesIO() - fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, self._to_avro_records(entries)) - with self.file_io.new_output_stream(manifest_path) as output_stream: - output_stream.write(buffer.getvalue()) - except Exception as e: - self.file_io.delete_quietly(manifest_path) - raise RuntimeError(f"Failed to write manifest file: {e}") from e + buf = BytesIO() + fastavro.writer(buf, MANIFEST_ENTRY_SCHEMA, self._to_avro_records(entries)) + self._flush(file_name, buf.getvalue()) def rolling_write(self, entries: List[ManifestEntry], suggested_file_size: int, @@ -230,19 +224,27 @@ def rolling_write(self, entries: List[ManifestEntry], total_size = buf.tell() if total_size <= suggested_file_size: - return [self._flush_and_build_meta(base_name, entries, buf.getvalue())] + self._flush(base_name, buf.getvalue()) + return [self._build_meta(base_name, entries)] - num_files = max(1, (total_size + suggested_file_size - 1) // suggested_file_size) - chunk_size = max(1, (len(entries) + num_files - 1) // num_files) - result = [] + # Estimate chunk size from average entry size to avoid a second full serialization. + avg_entry_size = total_size / len(entries) + entries_per_file = max(1, int(suggested_file_size / avg_entry_size)) name_prefix = base_name.rsplit('-', 1)[0] if base_name[-1].isdigit() and '-' in base_name else base_name - for i in range(0, len(entries), chunk_size): - chunk_entries = entries[i:i + chunk_size] - chunk_records = avro_records[i:i + chunk_size] - file_name = f"{name_prefix}-{len(result)}" - chunk_buf = BytesIO() - fastavro.writer(chunk_buf, MANIFEST_ENTRY_SCHEMA, chunk_records) - result.append(self._flush_and_build_meta(file_name, chunk_entries, chunk_buf.getvalue())) + result = [] + try: + for i in range(0, len(entries), entries_per_file): + chunk_records = avro_records[i:i + entries_per_file] + chunk_entries = entries[i:i + entries_per_file] + file_name = f"{name_prefix}-{len(result)}" + chunk_buf = BytesIO() + fastavro.writer(chunk_buf, MANIFEST_ENTRY_SCHEMA, chunk_records) + self._flush(file_name, chunk_buf.getvalue()) + result.append(self._build_meta(file_name, chunk_entries)) + except Exception: + for meta in result: + self.file_io.delete_quietly(f"{self.manifest_path}/{meta.file_name}") + raise return result def _to_avro_records(self, entries: List[ManifestEntry]) -> List[dict]: @@ -287,8 +289,7 @@ def _to_avro_records(self, entries: List[ManifestEntry]) -> List[dict]: }) return records - def _flush_and_build_meta(self, file_name: str, entries: List[ManifestEntry], - avro_bytes: bytes) -> ManifestFileMeta: + def _flush(self, file_name: str, avro_bytes: bytes): manifest_path = f"{self.manifest_path}/{file_name}" try: with self.file_io.new_output_stream(manifest_path) as output_stream: @@ -296,7 +297,6 @@ def _flush_and_build_meta(self, file_name: str, entries: List[ManifestEntry], except Exception as e: self.file_io.delete_quietly(manifest_path) raise RuntimeError(f"Failed to write manifest file: {e}") from e - return self._build_meta(file_name, entries) def _build_meta(self, file_name: str, entries: List[ManifestEntry]) -> ManifestFileMeta: added_file_count = 0 From 96e8169eed66df426dde8fe67f4d8b6ee5199bda Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 13:17:54 +0800 Subject: [PATCH 03/26] [python] Use size-aware adaptive rolling instead of avg-based chunk splitting When entry sizes are skewed, avg-based splitting can produce manifest files that exceed 2x the target. Switch to adaptive rolling: serialize each chunk, check actual size, shrink and re-serialize if overshooting, then adjust entries_per_chunk for the next iteration based on actual size. Add a skewed-entry test to cover this. --- .../manifest/manifest_file_manager.py | 30 +++++++++++++------ .../tests/manifest/manifest_manager_test.py | 30 +++++++++++++++++++ 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index 4407c010e64a..adc209c53093 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -227,20 +227,32 @@ def rolling_write(self, entries: List[ManifestEntry], self._flush(base_name, buf.getvalue()) return [self._build_meta(base_name, entries)] - # Estimate chunk size from average entry size to avoid a second full serialization. - avg_entry_size = total_size / len(entries) - entries_per_file = max(1, int(suggested_file_size / avg_entry_size)) name_prefix = base_name.rsplit('-', 1)[0] if base_name[-1].isdigit() and '-' in base_name else base_name + entries_per_chunk = max(1, int(len(entries) * suggested_file_size / total_size)) + pos = 0 result = [] try: - for i in range(0, len(entries), entries_per_file): - chunk_records = avro_records[i:i + entries_per_file] - chunk_entries = entries[i:i + entries_per_file] - file_name = f"{name_prefix}-{len(result)}" + while pos < len(entries): + end = min(pos + entries_per_chunk, len(entries)) chunk_buf = BytesIO() - fastavro.writer(chunk_buf, MANIFEST_ENTRY_SCHEMA, chunk_records) + fastavro.writer(chunk_buf, MANIFEST_ENTRY_SCHEMA, avro_records[pos:end]) + chunk_size = chunk_buf.tell() + + # Adaptive: if chunk overshoots, shrink and re-serialize + if chunk_size > suggested_file_size and end - pos > 1: + end = pos + max(1, int((end - pos) * suggested_file_size / chunk_size)) + chunk_buf = BytesIO() + fastavro.writer(chunk_buf, MANIFEST_ENTRY_SCHEMA, avro_records[pos:end]) + chunk_size = chunk_buf.tell() + + file_name = f"{name_prefix}-{len(result)}" self._flush(file_name, chunk_buf.getvalue()) - result.append(self._build_meta(file_name, chunk_entries)) + result.append(self._build_meta(file_name, entries[pos:end])) + + # Adapt entries_per_chunk for next iteration based on actual size + if chunk_size > 0: + entries_per_chunk = max(1, int((end - pos) * suggested_file_size / chunk_size)) + pos = end except Exception: for meta in result: self.file_io.delete_quietly(f"{self.manifest_path}/{meta.file_name}") diff --git a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py index c3caef525d52..8dbb767e0005 100644 --- a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py +++ b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py @@ -365,6 +365,36 @@ def test_commit_manifest_exceeds_target_size(self): f"Expected multiple manifest files but got {len(metas)} " f"with total {sum(m.file_size for m in metas)} bytes") + def test_rolling_write_with_skewed_entries(self): + target_size = 16 * 1024 + manager = ManifestFileManager(self.table) + small = self._create_manifest_entry("small.parquet") + big = ManifestEntry( + kind=0, partition=_EMPTY_ROW, bucket=0, total_buckets=1, + file=DataFileMeta( + file_name="big.parquet", file_size=1024, row_count=100, + min_key=_EMPTY_ROW, max_key=_EMPTY_ROW, + key_stats=_EMPTY_STATS, value_stats=_EMPTY_STATS, + min_sequence_number=1, max_sequence_number=100, + schema_id=0, level=0, + extra_files=[f"extra-{i}.idx" for i in range(50)], + creation_time=Timestamp.from_epoch_millis(0), + delete_row_count=0, embedded_index=b'\x00' * 2000, + file_source=None, value_stats_cols=None, + external_path=None, first_row_id=None, write_cols=None, + ), + ) + entries = [big if i % 5 == 0 else small for i in range(300)] + metas = manager.rolling_write(entries, target_size, "manifest-skew-0") + + max_allowed = target_size * 2 + oversized = [m for m in metas if m.file_size > max_allowed] + self.assertEqual(len(oversized), 0, + f"Skewed entries: {len(oversized)} file(s) exceed 2x target: " + f"{[(m.file_name, m.file_size) for m in oversized]}") + total_entries = sum(m.num_added_files + m.num_deleted_files for m in metas) + self.assertEqual(total_entries, 300) + class ManifestListManagerTest(_ManifestManagerSetup): """Tests for ManifestListManager.""" From a650faaec2185ee6b394f89020d9299201abd8ac Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 14:32:42 +0800 Subject: [PATCH 04/26] [python] Replace fragile name parsing with explicit name_prefix parameter rolling_write now takes name_prefix directly and generates file names as {prefix}-0, {prefix}-1, etc. Callers no longer append -0 suffix. --- paimon-python/pypaimon/manifest/manifest_file_manager.py | 9 ++++----- paimon-python/pypaimon/manifest/manifest_file_merger.py | 2 +- .../pypaimon/tests/manifest/manifest_manager_test.py | 2 +- paimon-python/pypaimon/write/file_store_commit.py | 4 ++-- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index adc209c53093..4ea89dc78333 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -214,7 +214,7 @@ def write(self, file_name, entries: List[ManifestEntry]): def rolling_write(self, entries: List[ManifestEntry], suggested_file_size: int, - base_name: str) -> List[ManifestFileMeta]: + name_prefix: str) -> List[ManifestFileMeta]: if not entries: return [] @@ -224,10 +224,9 @@ def rolling_write(self, entries: List[ManifestEntry], total_size = buf.tell() if total_size <= suggested_file_size: - self._flush(base_name, buf.getvalue()) - return [self._build_meta(base_name, entries)] - - name_prefix = base_name.rsplit('-', 1)[0] if base_name[-1].isdigit() and '-' in base_name else base_name + file_name = f"{name_prefix}-0" + self._flush(file_name, buf.getvalue()) + return [self._build_meta(file_name, entries)] entries_per_chunk = max(1, int(len(entries) * suggested_file_size / total_size)) pos = 0 result = [] diff --git a/paimon-python/pypaimon/manifest/manifest_file_merger.py b/paimon-python/pypaimon/manifest/manifest_file_merger.py index adad641b4551..821b12aef5e9 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_merger.py +++ b/paimon-python/pypaimon/manifest/manifest_file_merger.py @@ -85,7 +85,7 @@ def _merge_candidates(self, candidates: List[ManifestFileMeta], if not merged_entries: return - manifest_file = "manifest-{}-0".format(str(uuid.uuid4())) + manifest_file = "manifest-{}".format(str(uuid.uuid4())) merged_metas = self.manifest_file_manager.rolling_write( merged_entries, self.suggested_meta_size, manifest_file) result.extend(merged_metas) diff --git a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py index 8dbb767e0005..1b85b4bed955 100644 --- a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py +++ b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py @@ -385,7 +385,7 @@ def test_rolling_write_with_skewed_entries(self): ), ) entries = [big if i % 5 == 0 else small for i in range(300)] - metas = manager.rolling_write(entries, target_size, "manifest-skew-0") + metas = manager.rolling_write(entries, target_size, "manifest-skew") max_allowed = target_size * 2 oversized = [m for m in metas if m.file_size > max_allowed] diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 715fabd77a3d..84107af32948 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -374,7 +374,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str delta_manifest_list = f"manifest-list-{unique_id}-1" # process new_manifest - new_manifest_file = f"manifest-{str(uuid.uuid4())}-0" + new_manifest_file = f"manifest-{str(uuid.uuid4())}" new_index_manifest = None # process snapshot new_snapshot_id = latest_snapshot.id + 1 if latest_snapshot else 1 @@ -429,7 +429,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str # Write changelog manifest if changelog entries exist if changelog_entries: - changelog_manifest_file = f"manifest-{str(uuid.uuid4())}-changelog-0" + changelog_manifest_file = f"manifest-{str(uuid.uuid4())}-changelog" changelog_manifest_file_metas = self._write_manifest_files( changelog_entries, changelog_manifest_file) changelog_manifest_list_name = f"manifest-list-{unique_id}-changelog" From d2060bdb17faccf885eafd6c6df3f8ba2017ebff Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 14:37:22 +0800 Subject: [PATCH 05/26] [python] Fix flake8 E128 continuation line indent in manifest tests --- .../pypaimon/tests/manifest/manifest_manager_test.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py index 1b85b4bed955..da05c345c580 100644 --- a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py +++ b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py @@ -361,7 +361,8 @@ def test_commit_manifest_exceeds_target_size(self): f"{len(oversized)} manifest file(s) exceed 2x target ({max_allowed} bytes): " f"{[(m.file_name, m.file_size) for m in oversized]}. " f"Java uses RollingFileWriter to split; Python writes one file.") - self.assertGreater(len(metas), 1, + self.assertGreater( + len(metas), 1, f"Expected multiple manifest files but got {len(metas)} " f"with total {sum(m.file_size for m in metas)} bytes") @@ -389,7 +390,8 @@ def test_rolling_write_with_skewed_entries(self): max_allowed = target_size * 2 oversized = [m for m in metas if m.file_size > max_allowed] - self.assertEqual(len(oversized), 0, + self.assertEqual( + len(oversized), 0, f"Skewed entries: {len(oversized)} file(s) exceed 2x target: " f"{[(m.file_name, m.file_size) for m in oversized]}") total_entries = sum(m.num_added_files + m.num_deleted_files for m in metas) From bd1ec85f38c076597ca25ed88fe6efb07db34e32 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 14:38:20 +0800 Subject: [PATCH 06/26] [python] Update file_store_commit_test for renamed _write_manifest_files --- paimon-python/pypaimon/tests/file_store_commit_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/tests/file_store_commit_test.py b/paimon-python/pypaimon/tests/file_store_commit_test.py index e3ff8c547f37..98c8867fe646 100644 --- a/paimon-python/pypaimon/tests/file_store_commit_test.py +++ b/paimon-python/pypaimon/tests/file_store_commit_test.py @@ -419,7 +419,7 @@ def test_append_commit_inherits_index_manifest( snapshot_commit.commit.return_value = True file_store_commit.snapshot_commit = snapshot_commit - file_store_commit._write_manifest_file = Mock(return_value=Mock()) + file_store_commit._write_manifest_files = Mock(return_value=[Mock()]) file_store_commit._generate_partition_statistics = Mock(return_value=[]) file_store_commit.manifest_list_manager.read_all.return_value = [] @@ -492,7 +492,7 @@ def make_file(name): file=make_file("f2.parquet")), ] - result = file_store_commit._write_manifest_file(entries, "manifest-test") + result = file_store_commit._write_manifest_files(entries, "manifest-test") self.assertIsNotNone(result) @staticmethod From bafc3930a59dba719d93a3d28f226fa843a5ac10 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 14:46:15 +0800 Subject: [PATCH 07/26] [python] Use sample-based size estimation in rolling_write Replace full Avro serialization for size estimation with a 64-entry sample. Only serialize all entries when the sample suggests they fit in a single file. For multi-file rolling, the per-chunk adaptive logic handles estimation drift. --- .../manifest/manifest_file_manager.py | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index 4ea89dc78333..faa8f95679ad 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -212,6 +212,8 @@ def write(self, file_name, entries: List[ManifestEntry]): fastavro.writer(buf, MANIFEST_ENTRY_SCHEMA, self._to_avro_records(entries)) self._flush(file_name, buf.getvalue()) + _ROLLING_SAMPLE_SIZE = 64 + def rolling_write(self, entries: List[ManifestEntry], suggested_file_size: int, name_prefix: str) -> List[ManifestFileMeta]: @@ -219,15 +221,23 @@ def rolling_write(self, entries: List[ManifestEntry], return [] avro_records = self._to_avro_records(entries) - buf = BytesIO() - fastavro.writer(buf, MANIFEST_ENTRY_SCHEMA, avro_records) - total_size = buf.tell() - - if total_size <= suggested_file_size: - file_name = f"{name_prefix}-0" - self._flush(file_name, buf.getvalue()) - return [self._build_meta(file_name, entries)] - entries_per_chunk = max(1, int(len(entries) * suggested_file_size / total_size)) + + sample_n = min(self._ROLLING_SAMPLE_SIZE, len(entries)) + sample_buf = BytesIO() + fastavro.writer(sample_buf, MANIFEST_ENTRY_SCHEMA, avro_records[:sample_n]) + avg_entry_size = max(1, sample_buf.tell() / sample_n) + + if avg_entry_size * len(entries) <= suggested_file_size: + # Likely fits in one file — serialize all and verify + buf = BytesIO() + fastavro.writer(buf, MANIFEST_ENTRY_SCHEMA, avro_records) + if buf.tell() <= suggested_file_size: + file_name = f"{name_prefix}-0" + self._flush(file_name, buf.getvalue()) + return [self._build_meta(file_name, entries)] + avg_entry_size = buf.tell() / len(entries) + + entries_per_chunk = max(1, int(suggested_file_size / avg_entry_size)) pos = 0 result = [] try: From 5b8ab2f81dc74ffb696a9de3bc91685bb262e5cc Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 14:47:24 +0800 Subject: [PATCH 08/26] [python] Track rolling manifest files in new_manifest_files_for_abort Record delta, changelog, and merge manifest file metas into new_manifest_files_for_abort so _cleanup_preparation_failure can delete them if a later step (e.g. manifest list write) fails. Previously the variable was declared but only populated by the merger, missing the delta/changelog files from rolling_write. --- paimon-python/pypaimon/write/file_store_commit.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 84107af32948..72ddf19627b8 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -425,6 +425,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str new_manifest_files_for_abort = [] try: new_manifest_file_metas = self._write_manifest_files(commit_entries, new_manifest_file) + new_manifest_files_for_abort.extend(new_manifest_file_metas) self.manifest_list_manager.write(delta_manifest_list, new_manifest_file_metas) # Write changelog manifest if changelog entries exist @@ -432,6 +433,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str changelog_manifest_file = f"manifest-{str(uuid.uuid4())}-changelog" changelog_manifest_file_metas = self._write_manifest_files( changelog_entries, changelog_manifest_file) + new_manifest_files_for_abort.extend(changelog_manifest_file_metas) changelog_manifest_list_name = f"manifest-list-{unique_id}-changelog" self.manifest_list_manager.write( changelog_manifest_list_name, changelog_manifest_file_metas) @@ -451,8 +453,9 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str total_record_count += previous_record_count else: existing_manifest_files = [] - merged_manifest_files, new_manifest_files_for_abort = self.manifest_file_merger.merge( + merged_manifest_files, merge_new_files = self.manifest_file_merger.merge( existing_manifest_files) + new_manifest_files_for_abort.extend(merge_new_files) self.manifest_list_manager.write(base_manifest_list, merged_manifest_files) delta_record_count = 0 From f9202a60199517331df6a9f212240cb94a3b76a0 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 14:52:45 +0800 Subject: [PATCH 09/26] [python] Use streaming rolling write aligned with Java RollingFileWriter Replace sample-based estimation with fastavro.Writer streaming API: write entries one by one, flush every 100 records to check buf size, roll to a new file when size >= target. This matches Java's RollingFileWriterImpl behavior (check every N records, roll on threshold). --- .../manifest/manifest_file_manager.py | 142 ++++++++---------- .../tests/manifest/manifest_manager_test.py | 4 +- 2 files changed, 66 insertions(+), 80 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index faa8f95679ad..f750fc27826a 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -212,7 +212,7 @@ def write(self, file_name, entries: List[ManifestEntry]): fastavro.writer(buf, MANIFEST_ENTRY_SCHEMA, self._to_avro_records(entries)) self._flush(file_name, buf.getvalue()) - _ROLLING_SAMPLE_SIZE = 64 + _CHECK_ROLLING_RECORD_CNT = 100 def rolling_write(self, entries: List[ManifestEntry], suggested_file_size: int, @@ -220,95 +220,79 @@ def rolling_write(self, entries: List[ManifestEntry], if not entries: return [] - avro_records = self._to_avro_records(entries) + from fastavro.write import Writer - sample_n = min(self._ROLLING_SAMPLE_SIZE, len(entries)) - sample_buf = BytesIO() - fastavro.writer(sample_buf, MANIFEST_ENTRY_SCHEMA, avro_records[:sample_n]) - avg_entry_size = max(1, sample_buf.tell() / sample_n) - - if avg_entry_size * len(entries) <= suggested_file_size: - # Likely fits in one file — serialize all and verify - buf = BytesIO() - fastavro.writer(buf, MANIFEST_ENTRY_SCHEMA, avro_records) - if buf.tell() <= suggested_file_size: - file_name = f"{name_prefix}-0" - self._flush(file_name, buf.getvalue()) - return [self._build_meta(file_name, entries)] - avg_entry_size = buf.tell() / len(entries) - - entries_per_chunk = max(1, int(suggested_file_size / avg_entry_size)) - pos = 0 result = [] + chunk_start = 0 + buf = BytesIO() + writer = Writer(buf, MANIFEST_ENTRY_SCHEMA) try: - while pos < len(entries): - end = min(pos + entries_per_chunk, len(entries)) - chunk_buf = BytesIO() - fastavro.writer(chunk_buf, MANIFEST_ENTRY_SCHEMA, avro_records[pos:end]) - chunk_size = chunk_buf.tell() - - # Adaptive: if chunk overshoots, shrink and re-serialize - if chunk_size > suggested_file_size and end - pos > 1: - end = pos + max(1, int((end - pos) * suggested_file_size / chunk_size)) - chunk_buf = BytesIO() - fastavro.writer(chunk_buf, MANIFEST_ENTRY_SCHEMA, avro_records[pos:end]) - chunk_size = chunk_buf.tell() - + for i, entry in enumerate(entries): + writer.write(self._to_avro_record(entry)) + if (i + 1) % self._CHECK_ROLLING_RECORD_CNT == 0: + writer.flush() + if buf.tell() >= suggested_file_size: + writer.dump() + file_name = f"{name_prefix}-{len(result)}" + self._flush(file_name, buf.getvalue()) + result.append(self._build_meta(file_name, entries[chunk_start:i + 1])) + chunk_start = i + 1 + buf = BytesIO() + writer = Writer(buf, MANIFEST_ENTRY_SCHEMA) + + if chunk_start < len(entries): + writer.dump() file_name = f"{name_prefix}-{len(result)}" - self._flush(file_name, chunk_buf.getvalue()) - result.append(self._build_meta(file_name, entries[pos:end])) - - # Adapt entries_per_chunk for next iteration based on actual size - if chunk_size > 0: - entries_per_chunk = max(1, int((end - pos) * suggested_file_size / chunk_size)) - pos = end + self._flush(file_name, buf.getvalue()) + result.append(self._build_meta(file_name, entries[chunk_start:])) except Exception: for meta in result: self.file_io.delete_quietly(f"{self.manifest_path}/{meta.file_name}") raise return result + @staticmethod + def _to_avro_record(entry: ManifestEntry) -> dict: + return { + "_VERSION": 2, + "_KIND": entry.kind, + "_PARTITION": GenericRowSerializer.to_bytes(entry.partition), + "_BUCKET": entry.bucket, + "_TOTAL_BUCKETS": entry.total_buckets, + "_FILE": { + "_FILE_NAME": entry.file.file_name, + "_FILE_SIZE": entry.file.file_size, + "_ROW_COUNT": entry.file.row_count, + "_MIN_KEY": GenericRowSerializer.to_bytes(entry.file.min_key), + "_MAX_KEY": GenericRowSerializer.to_bytes(entry.file.max_key), + "_KEY_STATS": { + "_MIN_VALUES": GenericRowSerializer.to_bytes(entry.file.key_stats.min_values), + "_MAX_VALUES": GenericRowSerializer.to_bytes(entry.file.key_stats.max_values), + "_NULL_COUNTS": entry.file.key_stats.null_counts, + }, + "_VALUE_STATS": { + "_MIN_VALUES": GenericRowSerializer.to_bytes(entry.file.value_stats.min_values), + "_MAX_VALUES": GenericRowSerializer.to_bytes(entry.file.value_stats.max_values), + "_NULL_COUNTS": entry.file.value_stats.null_counts, + }, + "_MIN_SEQUENCE_NUMBER": entry.file.min_sequence_number, + "_MAX_SEQUENCE_NUMBER": entry.file.max_sequence_number, + "_SCHEMA_ID": entry.file.schema_id, + "_LEVEL": entry.file.level, + "_EXTRA_FILES": entry.file.extra_files, + "_CREATION_TIME": entry.file.creation_time.get_millisecond() if entry.file.creation_time else None, + "_DELETE_ROW_COUNT": entry.file.delete_row_count, + "_EMBEDDED_FILE_INDEX": entry.file.embedded_index, + "_FILE_SOURCE": entry.file.file_source, + "_VALUE_STATS_COLS": entry.file.value_stats_cols, + "_EXTERNAL_PATH": entry.file.external_path, + "_FIRST_ROW_ID": entry.file.first_row_id, + "_WRITE_COLS": entry.file.write_cols, + } + } + def _to_avro_records(self, entries: List[ManifestEntry]) -> List[dict]: - records = [] - for entry in entries: - records.append({ - "_VERSION": 2, - "_KIND": entry.kind, - "_PARTITION": GenericRowSerializer.to_bytes(entry.partition), - "_BUCKET": entry.bucket, - "_TOTAL_BUCKETS": entry.total_buckets, - "_FILE": { - "_FILE_NAME": entry.file.file_name, - "_FILE_SIZE": entry.file.file_size, - "_ROW_COUNT": entry.file.row_count, - "_MIN_KEY": GenericRowSerializer.to_bytes(entry.file.min_key), - "_MAX_KEY": GenericRowSerializer.to_bytes(entry.file.max_key), - "_KEY_STATS": { - "_MIN_VALUES": GenericRowSerializer.to_bytes(entry.file.key_stats.min_values), - "_MAX_VALUES": GenericRowSerializer.to_bytes(entry.file.key_stats.max_values), - "_NULL_COUNTS": entry.file.key_stats.null_counts, - }, - "_VALUE_STATS": { - "_MIN_VALUES": GenericRowSerializer.to_bytes(entry.file.value_stats.min_values), - "_MAX_VALUES": GenericRowSerializer.to_bytes(entry.file.value_stats.max_values), - "_NULL_COUNTS": entry.file.value_stats.null_counts, - }, - "_MIN_SEQUENCE_NUMBER": entry.file.min_sequence_number, - "_MAX_SEQUENCE_NUMBER": entry.file.max_sequence_number, - "_SCHEMA_ID": entry.file.schema_id, - "_LEVEL": entry.file.level, - "_EXTRA_FILES": entry.file.extra_files, - "_CREATION_TIME": entry.file.creation_time.get_millisecond() if entry.file.creation_time else None, - "_DELETE_ROW_COUNT": entry.file.delete_row_count, - "_EMBEDDED_FILE_INDEX": entry.file.embedded_index, - "_FILE_SOURCE": entry.file.file_source, - "_VALUE_STATS_COLS": entry.file.value_stats_cols, - "_EXTERNAL_PATH": entry.file.external_path, - "_FIRST_ROW_ID": entry.file.first_row_id, - "_WRITE_COLS": entry.file.write_cols, - } - }) - return records + return [self._to_avro_record(e) for e in entries] def _flush(self, file_name: str, avro_bytes: bytes): manifest_path = f"{self.manifest_path}/{file_name}" diff --git a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py index da05c345c580..336df852cae2 100644 --- a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py +++ b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py @@ -388,7 +388,9 @@ def test_rolling_write_with_skewed_entries(self): entries = [big if i % 5 == 0 else small for i in range(300)] metas = manager.rolling_write(entries, target_size, "manifest-skew") - max_allowed = target_size * 2 + # Streaming rolling checks size every N records after flush, so files + # may exceed target by up to one batch worth of large entries. + max_allowed = target_size * 5 oversized = [m for m in metas if m.file_size > max_allowed] self.assertEqual( len(oversized), 0, From 75075a96ee88763f0e093265fd6b83d26aa9a5dd Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 15:05:14 +0800 Subject: [PATCH 10/26] [python] Use sync_interval for per-record size tracking in rolling write Replace fixed 100-record flush interval with fastavro sync_interval so buf.tell() updates after each record. Check size after every write instead of batched flushes, giving tighter control near the target. Use flush() instead of dump() to avoid writing empty trailing blocks. --- .../manifest/manifest_file_manager.py | 23 ++++++++----------- .../tests/manifest/manifest_manager_test.py | 4 +--- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index f750fc27826a..4bcbc9c30f96 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -212,8 +212,6 @@ def write(self, file_name, entries: List[ManifestEntry]): fastavro.writer(buf, MANIFEST_ENTRY_SCHEMA, self._to_avro_records(entries)) self._flush(file_name, buf.getvalue()) - _CHECK_ROLLING_RECORD_CNT = 100 - def rolling_write(self, entries: List[ManifestEntry], suggested_file_size: int, name_prefix: str) -> List[ManifestFileMeta]: @@ -222,26 +220,25 @@ def rolling_write(self, entries: List[ManifestEntry], from fastavro.write import Writer + sync_interval = min(16000, suggested_file_size) result = [] chunk_start = 0 buf = BytesIO() - writer = Writer(buf, MANIFEST_ENTRY_SCHEMA) + writer = Writer(buf, MANIFEST_ENTRY_SCHEMA, sync_interval=sync_interval) try: for i, entry in enumerate(entries): writer.write(self._to_avro_record(entry)) - if (i + 1) % self._CHECK_ROLLING_RECORD_CNT == 0: + if buf.tell() >= suggested_file_size: writer.flush() - if buf.tell() >= suggested_file_size: - writer.dump() - file_name = f"{name_prefix}-{len(result)}" - self._flush(file_name, buf.getvalue()) - result.append(self._build_meta(file_name, entries[chunk_start:i + 1])) - chunk_start = i + 1 - buf = BytesIO() - writer = Writer(buf, MANIFEST_ENTRY_SCHEMA) + file_name = f"{name_prefix}-{len(result)}" + self._flush(file_name, buf.getvalue()) + result.append(self._build_meta(file_name, entries[chunk_start:i + 1])) + chunk_start = i + 1 + buf = BytesIO() + writer = Writer(buf, MANIFEST_ENTRY_SCHEMA, sync_interval=sync_interval) if chunk_start < len(entries): - writer.dump() + writer.flush() file_name = f"{name_prefix}-{len(result)}" self._flush(file_name, buf.getvalue()) result.append(self._build_meta(file_name, entries[chunk_start:])) diff --git a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py index 336df852cae2..da05c345c580 100644 --- a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py +++ b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py @@ -388,9 +388,7 @@ def test_rolling_write_with_skewed_entries(self): entries = [big if i % 5 == 0 else small for i in range(300)] metas = manager.rolling_write(entries, target_size, "manifest-skew") - # Streaming rolling checks size every N records after flush, so files - # may exceed target by up to one batch worth of large entries. - max_allowed = target_size * 5 + max_allowed = target_size * 2 oversized = [m for m in metas if m.file_size > max_allowed] self.assertEqual( len(oversized), 0, From 43f1f86ffce2d0ea0bcda8b3d5e617760b4c8416 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 15:17:31 +0800 Subject: [PATCH 11/26] [python] Extract sync_interval magic number to _AVRO_SYNC_INTERVAL constant --- paimon-python/pypaimon/manifest/manifest_file_manager.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index 4bcbc9c30f96..a44fef451564 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -37,6 +37,8 @@ class ManifestFileManager: """Writer for manifest files in Avro format using unified FileIO.""" + _AVRO_SYNC_INTERVAL = 16000 + def __init__(self, table): from pypaimon.table.file_store_table import FileStoreTable @@ -220,7 +222,7 @@ def rolling_write(self, entries: List[ManifestEntry], from fastavro.write import Writer - sync_interval = min(16000, suggested_file_size) + sync_interval = min(self._AVRO_SYNC_INTERVAL, suggested_file_size) result = [] chunk_start = 0 buf = BytesIO() From a757ce0282d74145224f96adc2e4f4591d0c78ad Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 15:44:35 +0800 Subject: [PATCH 12/26] [python] Allow merger to split oversized single-file manifest groups Previously _merge_candidates skipped single-file groups unconditionally. Now it only skips when the file is within the target size. An oversized single manifest is re-read and re-written through rolling_write, splitting it into multiple target-sized files. --- .../pypaimon/manifest/manifest_file_merger.py | 2 +- .../tests/manifest/manifest_manager_test.py | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_merger.py b/paimon-python/pypaimon/manifest/manifest_file_merger.py index 821b12aef5e9..f6df123ba85d 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_merger.py +++ b/paimon-python/pypaimon/manifest/manifest_file_merger.py @@ -68,7 +68,7 @@ def _try_minor_compaction(self, manifest_files: List[ManifestFileMeta], def _merge_candidates(self, candidates: List[ManifestFileMeta], result: List[ManifestFileMeta], new_files: List[ManifestFileMeta]): - if len(candidates) == 1: + if len(candidates) == 1 and candidates[0].file_size <= self.suggested_meta_size: result.append(candidates[0]) return diff --git a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py index da05c345c580..29baad358b6b 100644 --- a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py +++ b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py @@ -397,6 +397,25 @@ def test_rolling_write_with_skewed_entries(self): total_entries = sum(m.num_added_files + m.num_deleted_files for m in metas) self.assertEqual(total_entries, 300) + def test_merge_splits_oversized_manifest(self): + from pypaimon.manifest.manifest_file_merger import ManifestFileMerger + target_size = 16 * 1024 + manager = ManifestFileManager(self.table) + entries = [self._create_manifest_entry(f"data-{i}.parquet") for i in range(500)] + oversized_meta = manager.write_with_meta("manifest-oversized-0", entries) + self.assertGreater(oversized_meta.file_size, target_size) + + merger = ManifestFileMerger(manager, target_size, 30) + result, new_files = merger.merge([oversized_meta]) + + self.assertGreater(len(result), 1, + f"Expected merge to split oversized manifest into " + f"multiple files, got {len(result)}") + self.assertTrue( + all(m.file_name != oversized_meta.file_name for m in result)) + total = sum(m.num_added_files + m.num_deleted_files for m in result) + self.assertEqual(total, 500) + class ManifestListManagerTest(_ManifestManagerSetup): """Tests for ManifestListManager.""" From e2b0b558827f9530d3f2d150c03b0943ce5a14fb Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 15:46:06 +0800 Subject: [PATCH 13/26] [python] Revert merger single-file split to match Java behavior Java's mergeCandidates unconditionally skips single-file groups. Revert the Python-only divergence to keep consistency. --- .../pypaimon/manifest/manifest_file_merger.py | 2 +- .../tests/manifest/manifest_manager_test.py | 18 ------------------ 2 files changed, 1 insertion(+), 19 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_merger.py b/paimon-python/pypaimon/manifest/manifest_file_merger.py index f6df123ba85d..821b12aef5e9 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_merger.py +++ b/paimon-python/pypaimon/manifest/manifest_file_merger.py @@ -68,7 +68,7 @@ def _try_minor_compaction(self, manifest_files: List[ManifestFileMeta], def _merge_candidates(self, candidates: List[ManifestFileMeta], result: List[ManifestFileMeta], new_files: List[ManifestFileMeta]): - if len(candidates) == 1 and candidates[0].file_size <= self.suggested_meta_size: + if len(candidates) == 1: result.append(candidates[0]) return diff --git a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py index 29baad358b6b..27b9af0b17ca 100644 --- a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py +++ b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py @@ -397,24 +397,6 @@ def test_rolling_write_with_skewed_entries(self): total_entries = sum(m.num_added_files + m.num_deleted_files for m in metas) self.assertEqual(total_entries, 300) - def test_merge_splits_oversized_manifest(self): - from pypaimon.manifest.manifest_file_merger import ManifestFileMerger - target_size = 16 * 1024 - manager = ManifestFileManager(self.table) - entries = [self._create_manifest_entry(f"data-{i}.parquet") for i in range(500)] - oversized_meta = manager.write_with_meta("manifest-oversized-0", entries) - self.assertGreater(oversized_meta.file_size, target_size) - - merger = ManifestFileMerger(manager, target_size, 30) - result, new_files = merger.merge([oversized_meta]) - - self.assertGreater(len(result), 1, - f"Expected merge to split oversized manifest into " - f"multiple files, got {len(result)}") - self.assertTrue( - all(m.file_name != oversized_meta.file_name for m in result)) - total = sum(m.num_added_files + m.num_deleted_files for m in result) - self.assertEqual(total, 500) class ManifestListManagerTest(_ManifestManagerSetup): From 4edecb80182ddb34aa29c904cb974df6bf5578db Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 15:50:00 +0800 Subject: [PATCH 14/26] [python] Clean up manifest files via new_manifest_files_for_abort on preparation failure When manifest list write fails after rolling_write succeeds, the except block now deletes the already-written manifest files before calling _cleanup_preparation_failure, preventing orphan files. --- paimon-python/pypaimon/write/file_store_commit.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 72ddf19627b8..67e8c0d0ea53 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -498,6 +498,9 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str # Generate partition statistics for the commit statistics = self._generate_partition_statistics(commit_entries) except Exception as e: + for meta in new_manifest_files_for_abort: + self.table.file_io.delete_quietly( + f"{self.manifest_file_manager.manifest_path}/{meta.file_name}") self._cleanup_preparation_failure(delta_manifest_list, base_manifest_list, new_index_manifest, changelog_manifest_list_name, new_manifest_files_for_abort) From 1e608f66bfa67c19621e30f69be0e437e4fc1f05 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 16:05:31 +0800 Subject: [PATCH 15/26] [python] Pass known file size to _build_meta to avoid remote HEAD requests rolling_write already has the byte buffer, so pass len(avro_bytes) directly instead of calling file_io.get_file_size() which issues a remote HEAD request per file. --- .../manifest/manifest_file_manager.py | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index a44fef451564..2c7009192fd1 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -232,18 +232,22 @@ def rolling_write(self, entries: List[ManifestEntry], writer.write(self._to_avro_record(entry)) if buf.tell() >= suggested_file_size: writer.flush() + avro_bytes = buf.getvalue() file_name = f"{name_prefix}-{len(result)}" - self._flush(file_name, buf.getvalue()) - result.append(self._build_meta(file_name, entries[chunk_start:i + 1])) + self._flush(file_name, avro_bytes) + result.append(self._build_meta( + file_name, entries[chunk_start:i + 1], len(avro_bytes))) chunk_start = i + 1 buf = BytesIO() writer = Writer(buf, MANIFEST_ENTRY_SCHEMA, sync_interval=sync_interval) if chunk_start < len(entries): writer.flush() + avro_bytes = buf.getvalue() file_name = f"{name_prefix}-{len(result)}" - self._flush(file_name, buf.getvalue()) - result.append(self._build_meta(file_name, entries[chunk_start:])) + self._flush(file_name, avro_bytes) + result.append(self._build_meta( + file_name, entries[chunk_start:], len(avro_bytes))) except Exception: for meta in result: self.file_io.delete_quietly(f"{self.manifest_path}/{meta.file_name}") @@ -302,7 +306,8 @@ def _flush(self, file_name: str, avro_bytes: bytes): self.file_io.delete_quietly(manifest_path) raise RuntimeError(f"Failed to write manifest file: {e}") from e - def _build_meta(self, file_name: str, entries: List[ManifestEntry]) -> ManifestFileMeta: + def _build_meta(self, file_name: str, entries: List[ManifestEntry], + file_size: int = None) -> ManifestFileMeta: added_file_count = 0 deleted_file_count = 0 schema_id = None @@ -337,10 +342,12 @@ def _build_meta(self, file_name: str, entries: List[ManifestEntry]) -> ManifestF if max_row_id is None or file_range.to > max_row_id: max_row_id = file_range.to - manifest_file_path = f"{self.manifest_path}/{file_name}" + if file_size is None: + manifest_file_path = f"{self.manifest_path}/{file_name}" + file_size = self.table.file_io.get_file_size(manifest_file_path) return ManifestFileMeta( file_name=file_name, - file_size=self.table.file_io.get_file_size(manifest_file_path), + file_size=file_size, num_added_files=added_file_count, num_deleted_files=deleted_file_count, partition_stats=SimpleStats( From 10cd1f1cd924eb930ce06ff1d132d2948207b785 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 16:09:22 +0800 Subject: [PATCH 16/26] [python] Track written files separately for rollback in rolling_write Use written_files list instead of result to track files that need cleanup on failure. Covers the gap where _flush succeeds but _build_meta fails, leaving the current chunk's file as an orphan. --- paimon-python/pypaimon/manifest/manifest_file_manager.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index 2c7009192fd1..d291abbba5c8 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -224,6 +224,7 @@ def rolling_write(self, entries: List[ManifestEntry], sync_interval = min(self._AVRO_SYNC_INTERVAL, suggested_file_size) result = [] + written_files = [] chunk_start = 0 buf = BytesIO() writer = Writer(buf, MANIFEST_ENTRY_SCHEMA, sync_interval=sync_interval) @@ -235,6 +236,7 @@ def rolling_write(self, entries: List[ManifestEntry], avro_bytes = buf.getvalue() file_name = f"{name_prefix}-{len(result)}" self._flush(file_name, avro_bytes) + written_files.append(file_name) result.append(self._build_meta( file_name, entries[chunk_start:i + 1], len(avro_bytes))) chunk_start = i + 1 @@ -246,11 +248,12 @@ def rolling_write(self, entries: List[ManifestEntry], avro_bytes = buf.getvalue() file_name = f"{name_prefix}-{len(result)}" self._flush(file_name, avro_bytes) + written_files.append(file_name) result.append(self._build_meta( file_name, entries[chunk_start:], len(avro_bytes))) except Exception: - for meta in result: - self.file_io.delete_quietly(f"{self.manifest_path}/{meta.file_name}") + for fname in written_files: + self.file_io.delete_quietly(f"{self.manifest_path}/{fname}") raise return result From 2d5cce99d6d7851fc9a5dbfccd417e55f10614ff Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 16:20:24 +0800 Subject: [PATCH 17/26] [python] Add read-back assertion for rolling manifest files Verify each rolling manifest file is readable and entry counts match the metadata, ensuring the streaming Writer produces valid Avro. --- .../pypaimon/tests/manifest/manifest_manager_test.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py index 27b9af0b17ca..1777a02db664 100644 --- a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py +++ b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py @@ -366,6 +366,14 @@ def test_commit_manifest_exceeds_target_size(self): f"Expected multiple manifest files but got {len(metas)} " f"with total {sum(m.file_size for m in metas)} bytes") + mfm = ManifestFileManager(table) + all_entries = [] + for meta in metas: + entries = mfm.read(meta.file_name) + self.assertEqual(len(entries), meta.num_added_files + meta.num_deleted_files) + all_entries.extend(entries) + self.assertEqual(len(all_entries), 200) + def test_rolling_write_with_skewed_entries(self): target_size = 16 * 1024 manager = ManifestFileManager(self.table) From 8f36b3bf6867fbeeccda72e02435e5c5b61d3323 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 16:41:33 +0800 Subject: [PATCH 18/26] [python] Remove extra blank line between test classes --- paimon-python/pypaimon/tests/manifest/manifest_manager_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py index 1777a02db664..adde81c34e84 100644 --- a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py +++ b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py @@ -406,7 +406,6 @@ def test_rolling_write_with_skewed_entries(self): self.assertEqual(total_entries, 300) - class ManifestListManagerTest(_ManifestManagerSetup): """Tests for ManifestListManager.""" From 0eb8094c7e2f78b637d44b7b515c04adfca6e310 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 16:43:50 +0800 Subject: [PATCH 19/26] [python] Remove redundant manifest file cleanup in except block --- paimon-python/pypaimon/write/file_store_commit.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 67e8c0d0ea53..72ddf19627b8 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -498,9 +498,6 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str # Generate partition statistics for the commit statistics = self._generate_partition_statistics(commit_entries) except Exception as e: - for meta in new_manifest_files_for_abort: - self.table.file_io.delete_quietly( - f"{self.manifest_file_manager.manifest_path}/{meta.file_name}") self._cleanup_preparation_failure(delta_manifest_list, base_manifest_list, new_index_manifest, changelog_manifest_list_name, new_manifest_files_for_abort) From be798358993739b9494fff526d6d204facb9b04a Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 16:47:15 +0800 Subject: [PATCH 20/26] [python] Align cleanup with Java CommitCleaner two-step structure Replace _cleanup_preparation_failure with two methods mirroring Java: - _clean_up_reuse_tmp_manifests: read delta/changelog manifest lists to find and delete their manifest files, then delete the lists - _clean_up_no_reuse_tmp_manifests: delete base manifest list, then only delete manifests in merge_after not present in merge_before --- .../pypaimon/write/file_store_commit.py | 91 +++++++++---------- 1 file changed, 43 insertions(+), 48 deletions(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 72ddf19627b8..e873e3832b47 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -422,10 +422,10 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str changelog_manifest_list_name = None changelog_manifest_list_size = None changelog_record_count = None - new_manifest_files_for_abort = [] + merge_before_manifests = [] + merge_after_manifests = [] try: new_manifest_file_metas = self._write_manifest_files(commit_entries, new_manifest_file) - new_manifest_files_for_abort.extend(new_manifest_file_metas) self.manifest_list_manager.write(delta_manifest_list, new_manifest_file_metas) # Write changelog manifest if changelog entries exist @@ -433,7 +433,6 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str changelog_manifest_file = f"manifest-{str(uuid.uuid4())}-changelog" changelog_manifest_file_metas = self._write_manifest_files( changelog_entries, changelog_manifest_file) - new_manifest_files_for_abort.extend(changelog_manifest_file_metas) changelog_manifest_list_name = f"manifest-list-{unique_id}-changelog" self.manifest_list_manager.write( changelog_manifest_list_name, changelog_manifest_file_metas) @@ -453,9 +452,10 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str total_record_count += previous_record_count else: existing_manifest_files = [] - merged_manifest_files, merge_new_files = self.manifest_file_merger.merge( + merge_before_manifests = existing_manifest_files + merged_manifest_files, _ = self.manifest_file_merger.merge( existing_manifest_files) - new_manifest_files_for_abort.extend(merge_new_files) + merge_after_manifests = merged_manifest_files self.manifest_list_manager.write(base_manifest_list, merged_manifest_files) delta_record_count = 0 @@ -498,9 +498,10 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str # Generate partition statistics for the commit statistics = self._generate_partition_statistics(commit_entries) except Exception as e: - self._cleanup_preparation_failure(delta_manifest_list, base_manifest_list, - new_index_manifest, changelog_manifest_list_name, - new_manifest_files_for_abort) + self._clean_up_reuse_tmp_manifests( + delta_manifest_list, changelog_manifest_list_name, new_index_manifest) + self._clean_up_no_reuse_tmp_manifests( + base_manifest_list, merge_before_manifests, merge_after_manifests) logger.warning(f"Exception occurs when preparing snapshot: {e}", exc_info=True) raise RuntimeError(f"Failed to prepare snapshot: {e}") @@ -645,51 +646,45 @@ def _collect_changelog_entries(self, commit_messages: List[CommitMessage]) -> Li )) return changelog_entries - def _cleanup_preparation_failure(self, + def _clean_up_reuse_tmp_manifests(self, delta_manifest_list: Optional[str], - base_manifest_list: Optional[str], - index_manifest: Optional[str] = None, - changelog_manifest_list: Optional[str] = None, - base_manifest_files_to_delete: Optional[List[ManifestFileMeta]] = None): - try: - manifest_path = self.manifest_list_manager.manifest_path - - if index_manifest: - self.table.file_io.delete_quietly(f"{manifest_path}/{index_manifest}") + changelog_manifest_list: Optional[str], + index_manifest: Optional[str] = None): + """Clean up delta/changelog manifests and index manifest. - if delta_manifest_list: - try: - manifest_files = self.manifest_list_manager.read(delta_manifest_list) - for manifest_meta in manifest_files: - manifest_file_path = f"{self.manifest_file_manager.manifest_path}/{manifest_meta.file_name}" - self.table.file_io.delete_quietly(manifest_file_path) - except Exception: - pass - delta_path = f"{manifest_path}/{delta_manifest_list}" - self.table.file_io.delete_quietly(delta_path) - - if base_manifest_list: - if base_manifest_files_to_delete: - for manifest_meta in base_manifest_files_to_delete: - manifest_file_path = ( - f"{self.manifest_file_manager.manifest_path}/{manifest_meta.file_name}") - self.table.file_io.delete_quietly(manifest_file_path) - base_path = f"{manifest_path}/{base_manifest_list}" - self.table.file_io.delete_quietly(base_path) - - if changelog_manifest_list: + Mirrors Java CommitCleaner.cleanUpReuseTmpManifests. + """ + manifest_path = self.manifest_list_manager.manifest_path + for ml_name in (delta_manifest_list, changelog_manifest_list): + if ml_name: try: - changelog_manifests = self.manifest_list_manager.read(changelog_manifest_list) - for manifest_meta in changelog_manifests: - manifest_file_path = ( - f"{self.manifest_file_manager.manifest_path}/{manifest_meta.file_name}") - self.table.file_io.delete_quietly(manifest_file_path) + for meta in self.manifest_list_manager.read(ml_name): + self.table.file_io.delete_quietly( + f"{self.manifest_file_manager.manifest_path}/{meta.file_name}") except Exception: pass - changelog_path = f"{manifest_path}/{changelog_manifest_list}" - self.table.file_io.delete_quietly(changelog_path) - except Exception as e: - logger.warning(f"Failed to clean up temporary files during preparation failure: {e}", exc_info=True) + self.table.file_io.delete_quietly(f"{manifest_path}/{ml_name}") + if index_manifest: + self.table.file_io.delete_quietly(f"{manifest_path}/{index_manifest}") + + def _clean_up_no_reuse_tmp_manifests(self, + base_manifest_list: Optional[str], + merge_before: List[ManifestFileMeta], + merge_after: List[ManifestFileMeta]): + """Clean up base manifest list and only the newly created merge manifests. + + Mirrors Java CommitCleaner.cleanUpNoReuseTmpManifests: only deletes + manifests in merge_after that are not in merge_before (i.e. newly + created by the merger, not pre-existing). + """ + manifest_path = self.manifest_list_manager.manifest_path + if base_manifest_list: + self.table.file_io.delete_quietly(f"{manifest_path}/{base_manifest_list}") + old_names = {m.file_name for m in merge_before} + for meta in merge_after: + if meta.file_name not in old_names: + self.table.file_io.delete_quietly( + f"{self.manifest_file_manager.manifest_path}/{meta.file_name}") def abort(self, commit_messages: List[CommitMessage]): """Abort commit and delete files. Uses external_path if available to ensure proper scheme handling.""" From 247413fd0d42214b85961777766cc63a367f317a Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 16:57:37 +0800 Subject: [PATCH 21/26] [python] Add known metas fallback to _clean_up_reuse_tmp_manifests When manifest list write fails after rolling_write succeeds, reading the manifest list back for cleanup also fails. Fall back to the known metas already in hand to delete orphan manifest files. --- .../pypaimon/write/file_store_commit.py | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index e873e3832b47..0d863e05ed5b 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -422,20 +422,22 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str changelog_manifest_list_name = None changelog_manifest_list_size = None changelog_record_count = None + delta_known_metas = [] + changelog_known_metas = [] merge_before_manifests = [] merge_after_manifests = [] try: - new_manifest_file_metas = self._write_manifest_files(commit_entries, new_manifest_file) - self.manifest_list_manager.write(delta_manifest_list, new_manifest_file_metas) + delta_known_metas = self._write_manifest_files(commit_entries, new_manifest_file) + self.manifest_list_manager.write(delta_manifest_list, delta_known_metas) # Write changelog manifest if changelog entries exist if changelog_entries: changelog_manifest_file = f"manifest-{str(uuid.uuid4())}-changelog" - changelog_manifest_file_metas = self._write_manifest_files( + changelog_known_metas = self._write_manifest_files( changelog_entries, changelog_manifest_file) changelog_manifest_list_name = f"manifest-list-{unique_id}-changelog" self.manifest_list_manager.write( - changelog_manifest_list_name, changelog_manifest_file_metas) + changelog_manifest_list_name, changelog_known_metas) manifest_path = self.manifest_list_manager.manifest_path changelog_manifest_list_size = self.table.file_io.get_file_size( f"{manifest_path}/{changelog_manifest_list_name}") @@ -499,7 +501,8 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str statistics = self._generate_partition_statistics(commit_entries) except Exception as e: self._clean_up_reuse_tmp_manifests( - delta_manifest_list, changelog_manifest_list_name, new_index_manifest) + delta_manifest_list, changelog_manifest_list_name, new_index_manifest, + delta_known_metas, changelog_known_metas) self._clean_up_no_reuse_tmp_manifests( base_manifest_list, merge_before_manifests, merge_after_manifests) logger.warning(f"Exception occurs when preparing snapshot: {e}", exc_info=True) @@ -649,20 +652,25 @@ def _collect_changelog_entries(self, commit_messages: List[CommitMessage]) -> Li def _clean_up_reuse_tmp_manifests(self, delta_manifest_list: Optional[str], changelog_manifest_list: Optional[str], - index_manifest: Optional[str] = None): + index_manifest: Optional[str] = None, + delta_known_metas: Optional[List[ManifestFileMeta]] = None, + changelog_known_metas: Optional[List[ManifestFileMeta]] = None): """Clean up delta/changelog manifests and index manifest. - Mirrors Java CommitCleaner.cleanUpReuseTmpManifests. + Mirrors Java CommitCleaner.cleanUpReuseTmpManifests. Falls back to + known metas when manifest list is unreadable (e.g. write failed). """ manifest_path = self.manifest_list_manager.manifest_path - for ml_name in (delta_manifest_list, changelog_manifest_list): + for ml_name, known in ((delta_manifest_list, delta_known_metas), + (changelog_manifest_list, changelog_known_metas)): if ml_name: try: - for meta in self.manifest_list_manager.read(ml_name): - self.table.file_io.delete_quietly( - f"{self.manifest_file_manager.manifest_path}/{meta.file_name}") + metas = self.manifest_list_manager.read(ml_name) except Exception: - pass + metas = known or [] + for meta in metas: + self.table.file_io.delete_quietly( + f"{self.manifest_file_manager.manifest_path}/{meta.file_name}") self.table.file_io.delete_quietly(f"{manifest_path}/{ml_name}") if index_manifest: self.table.file_io.delete_quietly(f"{manifest_path}/{index_manifest}") From c5c4ccf2d79c6a2d151fb795e8f550656a30001d Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 17:02:39 +0800 Subject: [PATCH 22/26] [python] Guard cleanup with try/except, simplify merge new_files, remove dead code - Wrap cleanup calls in try/except so cleanup errors don't mask the original preparation failure - Pass merge new_files directly instead of recomputing set difference - Remove unused write_with_meta method --- .../manifest/manifest_file_manager.py | 3 -- .../pypaimon/write/file_store_commit.py | 33 +++++++++---------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index d291abbba5c8..fa2f7abcc13b 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -369,6 +369,3 @@ def _build_meta(self, file_name: str, entries: List[ManifestEntry], max_row_id=max_row_id, ) - def write_with_meta(self, file_name, entries: List[ManifestEntry]) -> ManifestFileMeta: - self.write(file_name, entries) - return self._build_meta(file_name, entries) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 0d863e05ed5b..132000b8fb0e 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -455,7 +455,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str else: existing_manifest_files = [] merge_before_manifests = existing_manifest_files - merged_manifest_files, _ = self.manifest_file_merger.merge( + merged_manifest_files, merge_new_files = self.manifest_file_merger.merge( existing_manifest_files) merge_after_manifests = merged_manifest_files self.manifest_list_manager.write(base_manifest_list, merged_manifest_files) @@ -500,11 +500,15 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str # Generate partition statistics for the commit statistics = self._generate_partition_statistics(commit_entries) except Exception as e: - self._clean_up_reuse_tmp_manifests( - delta_manifest_list, changelog_manifest_list_name, new_index_manifest, - delta_known_metas, changelog_known_metas) - self._clean_up_no_reuse_tmp_manifests( - base_manifest_list, merge_before_manifests, merge_after_manifests) + try: + self._clean_up_reuse_tmp_manifests( + delta_manifest_list, changelog_manifest_list_name, new_index_manifest, + delta_known_metas, changelog_known_metas) + self._clean_up_no_reuse_tmp_manifests( + base_manifest_list, merge_new_files) + except Exception as cleanup_err: + logger.warning(f"Failed to clean up temporary files: {cleanup_err}", + exc_info=True) logger.warning(f"Exception occurs when preparing snapshot: {e}", exc_info=True) raise RuntimeError(f"Failed to prepare snapshot: {e}") @@ -677,22 +681,17 @@ def _clean_up_reuse_tmp_manifests(self, def _clean_up_no_reuse_tmp_manifests(self, base_manifest_list: Optional[str], - merge_before: List[ManifestFileMeta], - merge_after: List[ManifestFileMeta]): - """Clean up base manifest list and only the newly created merge manifests. + merge_new_files: List[ManifestFileMeta]): + """Clean up base manifest list and newly created merge manifests. - Mirrors Java CommitCleaner.cleanUpNoReuseTmpManifests: only deletes - manifests in merge_after that are not in merge_before (i.e. newly - created by the merger, not pre-existing). + Mirrors Java CommitCleaner.cleanUpNoReuseTmpManifests. """ manifest_path = self.manifest_list_manager.manifest_path if base_manifest_list: self.table.file_io.delete_quietly(f"{manifest_path}/{base_manifest_list}") - old_names = {m.file_name for m in merge_before} - for meta in merge_after: - if meta.file_name not in old_names: - self.table.file_io.delete_quietly( - f"{self.manifest_file_manager.manifest_path}/{meta.file_name}") + for meta in merge_new_files: + self.table.file_io.delete_quietly( + f"{self.manifest_file_manager.manifest_path}/{meta.file_name}") def abort(self, commit_messages: List[CommitMessage]): """Abort commit and delete files. Uses external_path if available to ensure proper scheme handling.""" From d7f34ee4c57d3c0f8f43ceee1336f849afe343a3 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 17:05:53 +0800 Subject: [PATCH 23/26] [python] Fix unbound merge_new_files and remove unused merge_after_manifests --- paimon-python/pypaimon/write/file_store_commit.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 132000b8fb0e..9d62c5e9e2b7 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -424,8 +424,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str changelog_record_count = None delta_known_metas = [] changelog_known_metas = [] - merge_before_manifests = [] - merge_after_manifests = [] + merge_new_files = [] try: delta_known_metas = self._write_manifest_files(commit_entries, new_manifest_file) self.manifest_list_manager.write(delta_manifest_list, delta_known_metas) @@ -454,10 +453,8 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str total_record_count += previous_record_count else: existing_manifest_files = [] - merge_before_manifests = existing_manifest_files merged_manifest_files, merge_new_files = self.manifest_file_merger.merge( existing_manifest_files) - merge_after_manifests = merged_manifest_files self.manifest_list_manager.write(base_manifest_list, merged_manifest_files) delta_record_count = 0 From 542aeccb00e7d5a400628ede4db78ae15cf0d844 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 17:17:52 +0800 Subject: [PATCH 24/26] [python] Remove trailing blank line at EOF --- paimon-python/pypaimon/manifest/manifest_file_manager.py | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index fa2f7abcc13b..196b7bedf5ab 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -368,4 +368,3 @@ def _build_meta(self, file_name: str, entries: List[ManifestEntry], min_row_id=min_row_id, max_row_id=max_row_id, ) - From 235f93ed9843e505c59b3af664545087b467b810 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 17:46:57 +0800 Subject: [PATCH 25/26] [python] Remove known metas fallback to match Java CommitCleaner Java's cleanUpReuseTmpManifests only reads manifest list to find files to delete. If manifest list write failed, it skips cleanup. Align Python to the same behavior. --- .../pypaimon/write/file_store_commit.py | 32 +++++++------------ 1 file changed, 12 insertions(+), 20 deletions(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 9d62c5e9e2b7..34ef83c33128 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -422,21 +422,19 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str changelog_manifest_list_name = None changelog_manifest_list_size = None changelog_record_count = None - delta_known_metas = [] - changelog_known_metas = [] merge_new_files = [] try: - delta_known_metas = self._write_manifest_files(commit_entries, new_manifest_file) - self.manifest_list_manager.write(delta_manifest_list, delta_known_metas) + new_manifest_file_metas = self._write_manifest_files(commit_entries, new_manifest_file) + self.manifest_list_manager.write(delta_manifest_list, new_manifest_file_metas) # Write changelog manifest if changelog entries exist if changelog_entries: changelog_manifest_file = f"manifest-{str(uuid.uuid4())}-changelog" - changelog_known_metas = self._write_manifest_files( + changelog_manifest_file_metas = self._write_manifest_files( changelog_entries, changelog_manifest_file) changelog_manifest_list_name = f"manifest-list-{unique_id}-changelog" self.manifest_list_manager.write( - changelog_manifest_list_name, changelog_known_metas) + changelog_manifest_list_name, changelog_manifest_file_metas) manifest_path = self.manifest_list_manager.manifest_path changelog_manifest_list_size = self.table.file_io.get_file_size( f"{manifest_path}/{changelog_manifest_list_name}") @@ -499,8 +497,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str except Exception as e: try: self._clean_up_reuse_tmp_manifests( - delta_manifest_list, changelog_manifest_list_name, new_index_manifest, - delta_known_metas, changelog_known_metas) + delta_manifest_list, changelog_manifest_list_name, new_index_manifest) self._clean_up_no_reuse_tmp_manifests( base_manifest_list, merge_new_files) except Exception as cleanup_err: @@ -653,25 +650,20 @@ def _collect_changelog_entries(self, commit_messages: List[CommitMessage]) -> Li def _clean_up_reuse_tmp_manifests(self, delta_manifest_list: Optional[str], changelog_manifest_list: Optional[str], - index_manifest: Optional[str] = None, - delta_known_metas: Optional[List[ManifestFileMeta]] = None, - changelog_known_metas: Optional[List[ManifestFileMeta]] = None): + index_manifest: Optional[str] = None): """Clean up delta/changelog manifests and index manifest. - Mirrors Java CommitCleaner.cleanUpReuseTmpManifests. Falls back to - known metas when manifest list is unreadable (e.g. write failed). + Mirrors Java CommitCleaner.cleanUpReuseTmpManifests. """ manifest_path = self.manifest_list_manager.manifest_path - for ml_name, known in ((delta_manifest_list, delta_known_metas), - (changelog_manifest_list, changelog_known_metas)): + for ml_name in (delta_manifest_list, changelog_manifest_list): if ml_name: try: - metas = self.manifest_list_manager.read(ml_name) + for meta in self.manifest_list_manager.read(ml_name): + self.table.file_io.delete_quietly( + f"{self.manifest_file_manager.manifest_path}/{meta.file_name}") except Exception: - metas = known or [] - for meta in metas: - self.table.file_io.delete_quietly( - f"{self.manifest_file_manager.manifest_path}/{meta.file_name}") + pass self.table.file_io.delete_quietly(f"{manifest_path}/{ml_name}") if index_manifest: self.table.file_io.delete_quietly(f"{manifest_path}/{index_manifest}") From a6790c9f9aa442574082ef3caa841aaa82aa3544 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 18:38:08 +0800 Subject: [PATCH 26/26] [python] Fix flake8 E128 indent in cleanup method signatures --- .../pypaimon/write/file_store_commit.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 34ef83c33128..fb90f1aa903a 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -647,10 +647,11 @@ def _collect_changelog_entries(self, commit_messages: List[CommitMessage]) -> Li )) return changelog_entries - def _clean_up_reuse_tmp_manifests(self, - delta_manifest_list: Optional[str], - changelog_manifest_list: Optional[str], - index_manifest: Optional[str] = None): + def _clean_up_reuse_tmp_manifests( + self, + delta_manifest_list: Optional[str], + changelog_manifest_list: Optional[str], + index_manifest: Optional[str] = None): """Clean up delta/changelog manifests and index manifest. Mirrors Java CommitCleaner.cleanUpReuseTmpManifests. @@ -668,9 +669,10 @@ def _clean_up_reuse_tmp_manifests(self, if index_manifest: self.table.file_io.delete_quietly(f"{manifest_path}/{index_manifest}") - def _clean_up_no_reuse_tmp_manifests(self, - base_manifest_list: Optional[str], - merge_new_files: List[ManifestFileMeta]): + def _clean_up_no_reuse_tmp_manifests( + self, + base_manifest_list: Optional[str], + merge_new_files: List[ManifestFileMeta]): """Clean up base manifest list and newly created merge manifests. Mirrors Java CommitCleaner.cleanUpNoReuseTmpManifests.