diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index af710f94d6ba..196b7bedf5ab 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -37,6 +37,8 @@ class ManifestFileManager: """Writer for manifest files in Avro format using unified FileIO.""" + _AVRO_SYNC_INTERVAL = 16000 + def __init__(self, table): from pypaimon.table.file_store_table import FileStoreTable @@ -208,60 +210,107 @@ def _get_value_stats_fields(self, file_dict: dict, schema_fields: list) -> List: return fields def write(self, file_name, entries: List[ManifestEntry]): - avro_records = [] - for entry in entries: - avro_record = { - "_VERSION": 2, - "_KIND": entry.kind, - "_PARTITION": GenericRowSerializer.to_bytes(entry.partition), - "_BUCKET": entry.bucket, - "_TOTAL_BUCKETS": entry.total_buckets, - "_FILE": { - "_FILE_NAME": entry.file.file_name, - "_FILE_SIZE": entry.file.file_size, - "_ROW_COUNT": entry.file.row_count, - "_MIN_KEY": GenericRowSerializer.to_bytes(entry.file.min_key), - "_MAX_KEY": GenericRowSerializer.to_bytes(entry.file.max_key), - "_KEY_STATS": { - "_MIN_VALUES": GenericRowSerializer.to_bytes(entry.file.key_stats.min_values), - "_MAX_VALUES": GenericRowSerializer.to_bytes(entry.file.key_stats.max_values), - "_NULL_COUNTS": entry.file.key_stats.null_counts, - }, - "_VALUE_STATS": { - "_MIN_VALUES": GenericRowSerializer.to_bytes(entry.file.value_stats.min_values), - "_MAX_VALUES": GenericRowSerializer.to_bytes(entry.file.value_stats.max_values), - "_NULL_COUNTS": entry.file.value_stats.null_counts, - }, - "_MIN_SEQUENCE_NUMBER": entry.file.min_sequence_number, - "_MAX_SEQUENCE_NUMBER": entry.file.max_sequence_number, - "_SCHEMA_ID": entry.file.schema_id, - "_LEVEL": entry.file.level, - "_EXTRA_FILES": entry.file.extra_files, - "_CREATION_TIME": entry.file.creation_time.get_millisecond() if entry.file.creation_time else None, - "_DELETE_ROW_COUNT": entry.file.delete_row_count, - "_EMBEDDED_FILE_INDEX": entry.file.embedded_index, - "_FILE_SOURCE": entry.file.file_source, - "_VALUE_STATS_COLS": entry.file.value_stats_cols, - "_EXTERNAL_PATH": entry.file.external_path, - "_FIRST_ROW_ID": entry.file.first_row_id, - "_WRITE_COLS": entry.file.write_cols, - } + buf = BytesIO() + fastavro.writer(buf, MANIFEST_ENTRY_SCHEMA, self._to_avro_records(entries)) + self._flush(file_name, buf.getvalue()) + + def rolling_write(self, entries: List[ManifestEntry], + suggested_file_size: int, + name_prefix: str) -> List[ManifestFileMeta]: + if not entries: + return [] + + from fastavro.write import Writer + + sync_interval = min(self._AVRO_SYNC_INTERVAL, suggested_file_size) + result = [] + written_files = [] + chunk_start = 0 + buf = BytesIO() + writer = Writer(buf, MANIFEST_ENTRY_SCHEMA, sync_interval=sync_interval) + try: + for i, entry in enumerate(entries): + writer.write(self._to_avro_record(entry)) + if buf.tell() >= suggested_file_size: + writer.flush() + avro_bytes = buf.getvalue() + file_name = f"{name_prefix}-{len(result)}" + self._flush(file_name, avro_bytes) + written_files.append(file_name) + result.append(self._build_meta( + file_name, entries[chunk_start:i + 1], len(avro_bytes))) + chunk_start = i + 1 + buf = BytesIO() + writer = Writer(buf, MANIFEST_ENTRY_SCHEMA, sync_interval=sync_interval) + + if chunk_start < len(entries): + writer.flush() + avro_bytes = buf.getvalue() + file_name = f"{name_prefix}-{len(result)}" + self._flush(file_name, avro_bytes) + written_files.append(file_name) + result.append(self._build_meta( + file_name, entries[chunk_start:], len(avro_bytes))) + except Exception: + for fname in written_files: + self.file_io.delete_quietly(f"{self.manifest_path}/{fname}") + raise + return result + + @staticmethod + def _to_avro_record(entry: ManifestEntry) -> dict: + return { + "_VERSION": 2, + "_KIND": entry.kind, + "_PARTITION": GenericRowSerializer.to_bytes(entry.partition), + "_BUCKET": entry.bucket, + "_TOTAL_BUCKETS": entry.total_buckets, + "_FILE": { + "_FILE_NAME": entry.file.file_name, + "_FILE_SIZE": entry.file.file_size, + "_ROW_COUNT": entry.file.row_count, + "_MIN_KEY": GenericRowSerializer.to_bytes(entry.file.min_key), + "_MAX_KEY": GenericRowSerializer.to_bytes(entry.file.max_key), + "_KEY_STATS": { + "_MIN_VALUES": GenericRowSerializer.to_bytes(entry.file.key_stats.min_values), + "_MAX_VALUES": GenericRowSerializer.to_bytes(entry.file.key_stats.max_values), + "_NULL_COUNTS": entry.file.key_stats.null_counts, + }, + "_VALUE_STATS": { + "_MIN_VALUES": GenericRowSerializer.to_bytes(entry.file.value_stats.min_values), + "_MAX_VALUES": GenericRowSerializer.to_bytes(entry.file.value_stats.max_values), + "_NULL_COUNTS": entry.file.value_stats.null_counts, + }, + "_MIN_SEQUENCE_NUMBER": entry.file.min_sequence_number, + "_MAX_SEQUENCE_NUMBER": entry.file.max_sequence_number, + "_SCHEMA_ID": entry.file.schema_id, + "_LEVEL": entry.file.level, + "_EXTRA_FILES": entry.file.extra_files, + "_CREATION_TIME": entry.file.creation_time.get_millisecond() if entry.file.creation_time else None, + "_DELETE_ROW_COUNT": entry.file.delete_row_count, + "_EMBEDDED_FILE_INDEX": entry.file.embedded_index, + "_FILE_SOURCE": entry.file.file_source, + "_VALUE_STATS_COLS": entry.file.value_stats_cols, + "_EXTERNAL_PATH": entry.file.external_path, + "_FIRST_ROW_ID": entry.file.first_row_id, + "_WRITE_COLS": entry.file.write_cols, } - avro_records.append(avro_record) + } + + def _to_avro_records(self, entries: List[ManifestEntry]) -> List[dict]: + return [self._to_avro_record(e) for e in entries] + def _flush(self, file_name: str, avro_bytes: bytes): manifest_path = f"{self.manifest_path}/{file_name}" try: - buffer = BytesIO() - fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, avro_records) - avro_bytes = buffer.getvalue() with self.file_io.new_output_stream(manifest_path) as output_stream: output_stream.write(avro_bytes) except Exception as e: self.file_io.delete_quietly(manifest_path) raise RuntimeError(f"Failed to write manifest file: {e}") from e - def write_with_meta(self, file_name, entries: List[ManifestEntry]) -> ManifestFileMeta: - self.write(file_name, entries) + def _build_meta(self, file_name: str, entries: List[ManifestEntry], + file_size: int = None) -> ManifestFileMeta: added_file_count = 0 deleted_file_count = 0 schema_id = None @@ -296,10 +345,12 @@ def write_with_meta(self, file_name, entries: List[ManifestEntry]) -> ManifestFi if max_row_id is None or file_range.to > max_row_id: max_row_id = file_range.to - manifest_file_path = f"{self.manifest_path}/{file_name}" + if file_size is None: + manifest_file_path = f"{self.manifest_path}/{file_name}" + file_size = self.table.file_io.get_file_size(manifest_file_path) return ManifestFileMeta( file_name=file_name, - file_size=self.table.file_io.get_file_size(manifest_file_path), + file_size=file_size, num_added_files=added_file_count, num_deleted_files=deleted_file_count, partition_stats=SimpleStats( diff --git a/paimon-python/pypaimon/manifest/manifest_file_merger.py b/paimon-python/pypaimon/manifest/manifest_file_merger.py index 47cc66b72e37..821b12aef5e9 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_merger.py +++ b/paimon-python/pypaimon/manifest/manifest_file_merger.py @@ -85,13 +85,11 @@ def _merge_candidates(self, candidates: List[ManifestFileMeta], if not merged_entries: return - manifest_file = "manifest-{}-0".format(str(uuid.uuid4())) - merged_meta = self.manifest_file_manager.write_with_meta( - manifest_file, - merged_entries, - ) - result.append(merged_meta) - new_files.append(merged_meta) + manifest_file = "manifest-{}".format(str(uuid.uuid4())) + merged_metas = self.manifest_file_manager.rolling_write( + merged_entries, self.suggested_meta_size, manifest_file) + result.extend(merged_metas) + new_files.extend(merged_metas) def _delete_manifests(self, manifests: List[ManifestFileMeta]): for manifest in manifests: diff --git a/paimon-python/pypaimon/tests/file_store_commit_test.py b/paimon-python/pypaimon/tests/file_store_commit_test.py index e3ff8c547f37..98c8867fe646 100644 --- a/paimon-python/pypaimon/tests/file_store_commit_test.py +++ b/paimon-python/pypaimon/tests/file_store_commit_test.py @@ -419,7 +419,7 @@ def test_append_commit_inherits_index_manifest( snapshot_commit.commit.return_value = True file_store_commit.snapshot_commit = snapshot_commit - file_store_commit._write_manifest_file = Mock(return_value=Mock()) + file_store_commit._write_manifest_files = Mock(return_value=[Mock()]) file_store_commit._generate_partition_statistics = Mock(return_value=[]) file_store_commit.manifest_list_manager.read_all.return_value = [] @@ -492,7 +492,7 @@ def make_file(name): file=make_file("f2.parquet")), ] - result = file_store_commit._write_manifest_file(entries, "manifest-test") + result = file_store_commit._write_manifest_files(entries, "manifest-test") self.assertIsNotNone(result) @staticmethod diff --git a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py index 2c6423af3f90..adde81c34e84 100644 --- a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py +++ b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py @@ -325,6 +325,86 @@ def test_read_write_cols_with_system_field(self): self.assertEqual(read_stats.max_values.get_field(0), 10) self.assertEqual(read_stats.null_counts, [2]) + def test_commit_manifest_exceeds_target_size(self): + target_size = 16 * 1024 + pa_schema = pa.schema([ + ('pt', pa.string()), + ('pk', pa.int32()), + ('val', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + primary_keys=['pk'], + partition_keys=['pt'], + options={ + 'bucket': '1', + 'manifest.target-file-size': '16 kb', + }, + ) + self.catalog.create_table('default.rolling_bug', schema, False) + table = self.catalog.get_table('default.rolling_bug') + + wb = table.new_batch_write_builder() + w = wb.new_write() + for pt in range(200): + rows = [{'pt': f'p{pt}', 'pk': i, 'val': f'v{i}'} for i in range(5)] + w.write_arrow(pa.Table.from_pylist(rows, schema=pa_schema)) + wb.new_commit().commit(w.prepare_commit()) + w.close() + + snap = table.snapshot_manager().get_latest_snapshot() + metas = ManifestListManager(table).read_all(snap) + max_allowed = target_size * 2 + oversized = [m for m in metas if m.file_size > max_allowed] + self.assertEqual( + len(oversized), 0, + f"{len(oversized)} manifest file(s) exceed 2x target ({max_allowed} bytes): " + f"{[(m.file_name, m.file_size) for m in oversized]}. " + f"Java uses RollingFileWriter to split; Python writes one file.") + self.assertGreater( + len(metas), 1, + f"Expected multiple manifest files but got {len(metas)} " + f"with total {sum(m.file_size for m in metas)} bytes") + + mfm = ManifestFileManager(table) + all_entries = [] + for meta in metas: + entries = mfm.read(meta.file_name) + self.assertEqual(len(entries), meta.num_added_files + meta.num_deleted_files) + all_entries.extend(entries) + self.assertEqual(len(all_entries), 200) + + def test_rolling_write_with_skewed_entries(self): + target_size = 16 * 1024 + manager = ManifestFileManager(self.table) + small = self._create_manifest_entry("small.parquet") + big = ManifestEntry( + kind=0, partition=_EMPTY_ROW, bucket=0, total_buckets=1, + file=DataFileMeta( + file_name="big.parquet", file_size=1024, row_count=100, + min_key=_EMPTY_ROW, max_key=_EMPTY_ROW, + key_stats=_EMPTY_STATS, value_stats=_EMPTY_STATS, + min_sequence_number=1, max_sequence_number=100, + schema_id=0, level=0, + extra_files=[f"extra-{i}.idx" for i in range(50)], + creation_time=Timestamp.from_epoch_millis(0), + delete_row_count=0, embedded_index=b'\x00' * 2000, + file_source=None, value_stats_cols=None, + external_path=None, first_row_id=None, write_cols=None, + ), + ) + entries = [big if i % 5 == 0 else small for i in range(300)] + metas = manager.rolling_write(entries, target_size, "manifest-skew") + + max_allowed = target_size * 2 + oversized = [m for m in metas if m.file_size > max_allowed] + self.assertEqual( + len(oversized), 0, + f"Skewed entries: {len(oversized)} file(s) exceed 2x target: " + f"{[(m.file_name, m.file_size) for m in oversized]}") + total_entries = sum(m.num_added_files + m.num_deleted_files for m in metas) + self.assertEqual(total_entries, 300) + class ManifestListManagerTest(_ManifestManagerSetup): """Tests for ManifestListManager.""" diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 304792b4124e..fb90f1aa903a 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -374,7 +374,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str delta_manifest_list = f"manifest-list-{unique_id}-1" # process new_manifest - new_manifest_file = f"manifest-{str(uuid.uuid4())}-0" + new_manifest_file = f"manifest-{str(uuid.uuid4())}" new_index_manifest = None # process snapshot new_snapshot_id = latest_snapshot.id + 1 if latest_snapshot else 1 @@ -422,19 +422,19 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str changelog_manifest_list_name = None changelog_manifest_list_size = None changelog_record_count = None - new_manifest_files_for_abort = [] + merge_new_files = [] try: - new_manifest_file_meta = self._write_manifest_file(commit_entries, new_manifest_file) - self.manifest_list_manager.write(delta_manifest_list, [new_manifest_file_meta]) + new_manifest_file_metas = self._write_manifest_files(commit_entries, new_manifest_file) + self.manifest_list_manager.write(delta_manifest_list, new_manifest_file_metas) # Write changelog manifest if changelog entries exist if changelog_entries: - changelog_manifest_file = f"manifest-{str(uuid.uuid4())}-changelog-0" - changelog_manifest_file_meta = self._write_manifest_file( + changelog_manifest_file = f"manifest-{str(uuid.uuid4())}-changelog" + changelog_manifest_file_metas = self._write_manifest_files( changelog_entries, changelog_manifest_file) changelog_manifest_list_name = f"manifest-list-{unique_id}-changelog" self.manifest_list_manager.write( - changelog_manifest_list_name, [changelog_manifest_file_meta]) + changelog_manifest_list_name, changelog_manifest_file_metas) manifest_path = self.manifest_list_manager.manifest_path changelog_manifest_list_size = self.table.file_io.get_file_size( f"{manifest_path}/{changelog_manifest_list_name}") @@ -451,7 +451,7 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str total_record_count += previous_record_count else: existing_manifest_files = [] - merged_manifest_files, new_manifest_files_for_abort = self.manifest_file_merger.merge( + merged_manifest_files, merge_new_files = self.manifest_file_merger.merge( existing_manifest_files) self.manifest_list_manager.write(base_manifest_list, merged_manifest_files) @@ -495,9 +495,14 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str # Generate partition statistics for the commit statistics = self._generate_partition_statistics(commit_entries) except Exception as e: - self._cleanup_preparation_failure(delta_manifest_list, base_manifest_list, - new_index_manifest, changelog_manifest_list_name, - new_manifest_files_for_abort) + try: + self._clean_up_reuse_tmp_manifests( + delta_manifest_list, changelog_manifest_list_name, new_index_manifest) + self._clean_up_no_reuse_tmp_manifests( + base_manifest_list, merge_new_files) + except Exception as cleanup_err: + logger.warning(f"Failed to clean up temporary files: {cleanup_err}", + exc_info=True) logger.warning(f"Exception occurs when preparing snapshot: {e}", exc_info=True) raise RuntimeError(f"Failed to prepare snapshot: {e}") @@ -543,8 +548,9 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str return SuccessResult() - def _write_manifest_file(self, commit_entries, new_manifest_file): - return self.manifest_file_manager.write_with_meta(new_manifest_file, commit_entries) + def _write_manifest_files(self, commit_entries, base_name): + return self.manifest_file_manager.rolling_write( + commit_entries, self.manifest_target_size, base_name) def _is_duplicate_commit(self, retry_result, latest_snapshot, commit_identifier, commit_kind) -> bool: if retry_result is not None and latest_snapshot is not None: @@ -641,51 +647,42 @@ def _collect_changelog_entries(self, commit_messages: List[CommitMessage]) -> Li )) return changelog_entries - def _cleanup_preparation_failure(self, - delta_manifest_list: Optional[str], - base_manifest_list: Optional[str], - index_manifest: Optional[str] = None, - changelog_manifest_list: Optional[str] = None, - base_manifest_files_to_delete: Optional[List[ManifestFileMeta]] = None): - try: - manifest_path = self.manifest_list_manager.manifest_path - - if index_manifest: - self.table.file_io.delete_quietly(f"{manifest_path}/{index_manifest}") + def _clean_up_reuse_tmp_manifests( + self, + delta_manifest_list: Optional[str], + changelog_manifest_list: Optional[str], + index_manifest: Optional[str] = None): + """Clean up delta/changelog manifests and index manifest. - if delta_manifest_list: - try: - manifest_files = self.manifest_list_manager.read(delta_manifest_list) - for manifest_meta in manifest_files: - manifest_file_path = f"{self.manifest_file_manager.manifest_path}/{manifest_meta.file_name}" - self.table.file_io.delete_quietly(manifest_file_path) - except Exception: - pass - delta_path = f"{manifest_path}/{delta_manifest_list}" - self.table.file_io.delete_quietly(delta_path) - - if base_manifest_list: - if base_manifest_files_to_delete: - for manifest_meta in base_manifest_files_to_delete: - manifest_file_path = ( - f"{self.manifest_file_manager.manifest_path}/{manifest_meta.file_name}") - self.table.file_io.delete_quietly(manifest_file_path) - base_path = f"{manifest_path}/{base_manifest_list}" - self.table.file_io.delete_quietly(base_path) - - if changelog_manifest_list: + Mirrors Java CommitCleaner.cleanUpReuseTmpManifests. + """ + manifest_path = self.manifest_list_manager.manifest_path + for ml_name in (delta_manifest_list, changelog_manifest_list): + if ml_name: try: - changelog_manifests = self.manifest_list_manager.read(changelog_manifest_list) - for manifest_meta in changelog_manifests: - manifest_file_path = ( - f"{self.manifest_file_manager.manifest_path}/{manifest_meta.file_name}") - self.table.file_io.delete_quietly(manifest_file_path) + for meta in self.manifest_list_manager.read(ml_name): + self.table.file_io.delete_quietly( + f"{self.manifest_file_manager.manifest_path}/{meta.file_name}") except Exception: pass - changelog_path = f"{manifest_path}/{changelog_manifest_list}" - self.table.file_io.delete_quietly(changelog_path) - except Exception as e: - logger.warning(f"Failed to clean up temporary files during preparation failure: {e}", exc_info=True) + self.table.file_io.delete_quietly(f"{manifest_path}/{ml_name}") + if index_manifest: + self.table.file_io.delete_quietly(f"{manifest_path}/{index_manifest}") + + def _clean_up_no_reuse_tmp_manifests( + self, + base_manifest_list: Optional[str], + merge_new_files: List[ManifestFileMeta]): + """Clean up base manifest list and newly created merge manifests. + + Mirrors Java CommitCleaner.cleanUpNoReuseTmpManifests. + """ + manifest_path = self.manifest_list_manager.manifest_path + if base_manifest_list: + self.table.file_io.delete_quietly(f"{manifest_path}/{base_manifest_list}") + for meta in merge_new_files: + self.table.file_io.delete_quietly( + f"{self.manifest_file_manager.manifest_path}/{meta.file_name}") def abort(self, commit_messages: List[CommitMessage]): """Abort commit and delete files. Uses external_path if available to ensure proper scheme handling."""