diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index abd2bac1903e..745ad5bb854c 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -186,6 +186,13 @@ class CoreOptions: .with_description("The parallelism for scanning manifest files.") ) + MANIFEST_COMPRESSION: ConfigOption[str] = ( + ConfigOptions.key("manifest.compression") + .string_type() + .default_value("zstd") + .with_description("Default file compression for manifest.") + ) + MANIFEST_TARGET_FILE_SIZE: ConfigOption[MemorySize] = ( ConfigOptions.key("manifest.target-file-size") .memory_type() @@ -860,6 +867,9 @@ def dynamic_bucket_max_buckets(self, default=None): def scan_manifest_parallelism(self, default=None): return self.options.get(CoreOptions.SCAN_MANIFEST_PARALLELISM, default) + def manifest_compression(self, default=None): + return self.options.get(CoreOptions.MANIFEST_COMPRESSION, default) + def manifest_target_size(self, default=None): if default is not None and not isinstance(default, MemorySize): default = MemorySize.of_bytes(default) if isinstance(default, int) else MemorySize.parse(default) diff --git a/paimon-python/pypaimon/manifest/__init__.py b/paimon-python/pypaimon/manifest/__init__.py index 8cb828ad1d65..fd842e10a528 100644 --- a/paimon-python/pypaimon/manifest/__init__.py +++ b/paimon-python/pypaimon/manifest/__init__.py @@ -23,3 +23,20 @@ from pypaimon.manifest import fastavro_py36_compat # noqa: F401 except ImportError: pass + + +_SUPPORTED_MANIFEST_COMPRESSIONS = { + 'null', 'deflate', 'snappy', 'zstd', 'bzip2', 'xz', +} + + +def avro_codec(compression): + """Map Paimon manifest.compression config value to fastavro codec name.""" + lower = compression.lower() + if lower not in _SUPPORTED_MANIFEST_COMPRESSIONS: + raise ValueError( + f"Unsupported manifest compression '{compression}'. " + f"Supported: {sorted(_SUPPORTED_MANIFEST_COMPRESSIONS)}") + if lower == 'zstd': + return 'zstandard' + return lower diff --git a/paimon-python/pypaimon/manifest/index_manifest_file.py b/paimon-python/pypaimon/manifest/index_manifest_file.py index a7d82d4b12ba..62ca9ea16dc6 100644 --- a/paimon-python/pypaimon/manifest/index_manifest_file.py +++ b/paimon-python/pypaimon/manifest/index_manifest_file.py @@ -85,12 +85,14 @@ class IndexManifestFile: def __init__(self, table): from pypaimon.table.file_store_table import FileStoreTable + from pypaimon.manifest import avro_codec self.table: FileStoreTable = table manifest_path = table.table_path.rstrip('/') self.manifest_path = f"{manifest_path}/manifest" self.file_io = table.file_io self.partition_keys_fields = self.table.partition_keys_fields + self._codec = avro_codec(table.options.manifest_compression()) def read(self, index_manifest_name: str) -> List[IndexManifestEntry]: index_manifest_path = f"{self.manifest_path}/{index_manifest_name}" @@ -247,7 +249,9 @@ def write(self, entries: List[IndexManifestEntry]) -> str: records = [self._to_avro_record(e) for e in entries] try: buffer = BytesIO() - fastavro.writer(buffer, INDEX_MANIFEST_ENTRY_SCHEMA, records) + fastavro.writer( + buffer, INDEX_MANIFEST_ENTRY_SCHEMA, records, + codec=self._codec) with self.file_io.new_output_stream(path) as output_stream: output_stream.write(buffer.getvalue()) except Exception as e: diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index af710f94d6ba..9840b918e4a7 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -47,6 +47,8 @@ def __init__(self, table): self.partition_keys_fields = self.table.partition_keys_fields self.primary_keys_fields = self.table.primary_keys_fields self.trimmed_primary_keys_fields = self.table.trimmed_primary_keys_fields + from pypaimon.manifest import avro_codec + self._codec = avro_codec(table.options.manifest_compression()) def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], manifest_entry_filter=None, drop_stats=True, max_workers=8, @@ -252,7 +254,9 @@ def write(self, file_name, entries: List[ManifestEntry]): manifest_path = f"{self.manifest_path}/{file_name}" try: buffer = BytesIO() - fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, avro_records) + fastavro.writer( + buffer, MANIFEST_ENTRY_SCHEMA, avro_records, + codec=self._codec) avro_bytes = buffer.getvalue() with self.file_io.new_output_stream(manifest_path) as output_stream: output_stream.write(avro_bytes) diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py b/paimon-python/pypaimon/manifest/manifest_list_manager.py index 273dfbe363f2..3a0e606ef5c4 100644 --- a/paimon-python/pypaimon/manifest/manifest_list_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py @@ -32,11 +32,13 @@ class ManifestListManager: def __init__(self, table): from pypaimon.table.file_store_table import FileStoreTable + from pypaimon.manifest import avro_codec self.table: FileStoreTable = table manifest_path = table.table_path.rstrip('/') self.manifest_path = f"{manifest_path}/manifest" self.file_io = self.table.file_io + self._codec = avro_codec(table.options.manifest_compression()) def read_all(self, snapshot: Optional[Snapshot]) -> List[ManifestFileMeta]: """Read base + delta manifest lists for full file state.""" @@ -124,7 +126,9 @@ def write(self, file_name, manifest_file_metas: List[ManifestFileMeta]): list_path = f"{self.manifest_path}/{file_name}" try: buffer = BytesIO() - fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, avro_records) + fastavro.writer( + buffer, MANIFEST_FILE_META_SCHEMA, avro_records, + codec=self._codec) avro_bytes = buffer.getvalue() with self.file_io.new_output_stream(list_path) as output_stream: output_stream.write(avro_bytes) diff --git a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py index 2c6423af3f90..8a64263f29f0 100644 --- a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py +++ b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py @@ -325,6 +325,64 @@ def test_read_write_cols_with_system_field(self): self.assertEqual(read_stats.max_values.get_field(0), 10) self.assertEqual(read_stats.null_counts, [2]) + def test_manifest_compression(self): + import fastavro + pa_schema = pa.schema([('pk', pa.int32()), ('val', pa.string())]) + schema = Schema.from_pyarrow_schema( + pa_schema, primary_keys=['pk'], options={'bucket': '1'}) + self.catalog.create_table('default.codec_test', schema, False) + table = self.catalog.get_table('default.codec_test') + + wb = table.new_batch_write_builder() + w = wb.new_write() + w.write_arrow(pa.Table.from_pylist( + [{'pk': i, 'val': f'v{i}'} for i in range(10)], schema=pa_schema)) + wb.new_commit().commit(w.prepare_commit()) + w.close() + + snap = table.snapshot_manager().get_latest_snapshot() + mlm = ManifestListManager(table) + metas = mlm.read_all(snap) + self.assertGreater(len(metas), 0) + + manifest_path = f"{mlm.manifest_path}/{metas[0].file_name}" + with table.file_io.new_input_stream(manifest_path) as f: + codec = fastavro.reader(f).metadata.get('avro.codec', b'null') + if isinstance(codec, bytes): + codec = codec.decode() + self.assertEqual( + codec, 'zstandard', + f"Manifest file uses codec '{codec}' but Java defaults to zstd.") + + list_path = f"{mlm.manifest_path}/{snap.base_manifest_list}" + with table.file_io.new_input_stream(list_path) as f: + list_codec = fastavro.reader(f).metadata.get('avro.codec', b'null') + if isinstance(list_codec, bytes): + list_codec = list_codec.decode() + self.assertEqual( + list_codec, 'zstandard', + f"Manifest list uses codec '{list_codec}' but Java defaults to zstd.") + + from pypaimon.manifest.index_manifest_file import IndexManifestFile + from pypaimon.manifest.index_manifest_entry import IndexManifestEntry + from pypaimon.index.index_file_meta import IndexFileMeta + imf = IndexManifestFile(table) + idx_entry = IndexManifestEntry( + kind=0, partition=GenericRow([], []), bucket=0, + index_file=IndexFileMeta( + index_type='BTREE', file_name='idx-0', + file_size=100, row_count=10), + ) + idx_file = imf.write([idx_entry]) + idx_path = f"{mlm.manifest_path}/{idx_file}" + with table.file_io.new_input_stream(idx_path) as f: + idx_codec = fastavro.reader(f).metadata.get('avro.codec', b'null') + if isinstance(idx_codec, bytes): + idx_codec = idx_codec.decode() + self.assertEqual( + idx_codec, 'zstandard', + f"Index manifest uses codec '{idx_codec}' but Java defaults to zstd.") + class ManifestListManagerTest(_ManifestManagerSetup): """Tests for ManifestListManager.""" diff --git a/paimon-python/pypaimon/tests/manifest/manifest_schema_test.py b/paimon-python/pypaimon/tests/manifest/manifest_schema_test.py index b806ad3fa91f..f483d84cccff 100644 --- a/paimon-python/pypaimon/tests/manifest/manifest_schema_test.py +++ b/paimon-python/pypaimon/tests/manifest/manifest_schema_test.py @@ -221,6 +221,7 @@ def test_read_legacy_manifest_list(self): table.table_path = table_path table.file_io = file_io table.partition_keys_fields = [] + table.options.manifest_compression.return_value = 'zstd' manager = ManifestListManager(table) metas = manager.read(manifest_list_name)