From 9e70dc624ba959e16142edca1d564a1c79566383 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 21:11:29 +0800 Subject: [PATCH 01/11] [python] Use zstd compression for manifest files by default Manifest files and manifest list files were written without compression (null codec), resulting in larger files on storage. Add manifest.compression option (default zstd) and pass the codec to fastavro when writing manifest and manifest list files. --- .../pypaimon/common/options/core_options.py | 10 +++++ .../manifest/manifest_file_manager.py | 13 ++++++- .../manifest/manifest_list_manager.py | 5 ++- .../tests/manifest/manifest_manager_test.py | 38 +++++++++++++++++++ 4 files changed, 64 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index abd2bac1903e..745ad5bb854c 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -186,6 +186,13 @@ class CoreOptions: .with_description("The parallelism for scanning manifest files.") ) + MANIFEST_COMPRESSION: ConfigOption[str] = ( + ConfigOptions.key("manifest.compression") + .string_type() + .default_value("zstd") + .with_description("Default file compression for manifest.") + ) + MANIFEST_TARGET_FILE_SIZE: ConfigOption[MemorySize] = ( ConfigOptions.key("manifest.target-file-size") .memory_type() @@ -860,6 +867,9 @@ def dynamic_bucket_max_buckets(self, default=None): def scan_manifest_parallelism(self, default=None): return self.options.get(CoreOptions.SCAN_MANIFEST_PARALLELISM, default) + def manifest_compression(self, default=None): + return self.options.get(CoreOptions.MANIFEST_COMPRESSION, default) + def manifest_target_size(self, default=None): if default is not None and not isinstance(default, MemorySize): default = MemorySize.of_bytes(default) if isinstance(default, int) else MemorySize.parse(default) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index af710f94d6ba..4005509f7cca 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -34,9 +34,18 @@ from pypaimon.table.row.binary_row import BinaryRow +def _avro_codec(compression: str) -> str: + """Map Paimon compression config to fastavro codec name.""" + if compression.lower() in ('zstd', 'zstandard'): + return 'zstandard' + return compression + + class ManifestFileManager: """Writer for manifest files in Avro format using unified FileIO.""" + _AVRO_SYNC_INTERVAL = 16000 + def __init__(self, table): from pypaimon.table.file_store_table import FileStoreTable @@ -47,6 +56,7 @@ def __init__(self, table): self.partition_keys_fields = self.table.partition_keys_fields self.primary_keys_fields = self.table.primary_keys_fields self.trimmed_primary_keys_fields = self.table.trimmed_primary_keys_fields + self._codec = _avro_codec(table.options.manifest_compression()) def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], manifest_entry_filter=None, drop_stats=True, max_workers=8, @@ -252,7 +262,8 @@ def write(self, file_name, entries: List[ManifestEntry]): manifest_path = f"{self.manifest_path}/{file_name}" try: buffer = BytesIO() - fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, avro_records) + fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, avro_records, + codec=self._codec) avro_bytes = buffer.getvalue() with self.file_io.new_output_stream(manifest_path) as output_stream: output_stream.write(avro_bytes) diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py b/paimon-python/pypaimon/manifest/manifest_list_manager.py index 273dfbe363f2..05e695999854 100644 --- a/paimon-python/pypaimon/manifest/manifest_list_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py @@ -32,11 +32,13 @@ class ManifestListManager: def __init__(self, table): from pypaimon.table.file_store_table import FileStoreTable + from pypaimon.manifest.manifest_file_manager import _avro_codec self.table: FileStoreTable = table manifest_path = table.table_path.rstrip('/') self.manifest_path = f"{manifest_path}/manifest" self.file_io = self.table.file_io + self._codec = _avro_codec(table.options.manifest_compression()) def read_all(self, snapshot: Optional[Snapshot]) -> List[ManifestFileMeta]: """Read base + delta manifest lists for full file state.""" @@ -124,7 +126,8 @@ def write(self, file_name, manifest_file_metas: List[ManifestFileMeta]): list_path = f"{self.manifest_path}/{file_name}" try: buffer = BytesIO() - fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, avro_records) + fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, avro_records, + codec=self._codec) avro_bytes = buffer.getvalue() with self.file_io.new_output_stream(list_path) as output_stream: output_stream.write(avro_bytes) diff --git a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py index 2c6423af3f90..e0e1881959ae 100644 --- a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py +++ b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py @@ -325,6 +325,44 @@ def test_read_write_cols_with_system_field(self): self.assertEqual(read_stats.max_values.get_field(0), 10) self.assertEqual(read_stats.null_counts, [2]) + def test_manifest_compression(self): + import fastavro + pa_schema = pa.schema([('pk', pa.int32()), ('val', pa.string())]) + schema = Schema.from_pyarrow_schema( + pa_schema, primary_keys=['pk'], options={'bucket': '1'}) + self.catalog.create_table('default.codec_test', schema, False) + table = self.catalog.get_table('default.codec_test') + + wb = table.new_batch_write_builder() + w = wb.new_write() + w.write_arrow(pa.Table.from_pylist( + [{'pk': i, 'val': f'v{i}'} for i in range(10)], schema=pa_schema)) + wb.new_commit().commit(w.prepare_commit()) + w.close() + + snap = table.snapshot_manager().get_latest_snapshot() + mlm = ManifestListManager(table) + metas = mlm.read_all(snap) + self.assertGreater(len(metas), 0) + + manifest_path = f"{mlm.manifest_path}/{metas[0].file_name}" + with table.file_io.new_input_stream(manifest_path) as f: + codec = fastavro.reader(f).metadata.get('avro.codec', b'null') + if isinstance(codec, bytes): + codec = codec.decode() + self.assertEqual( + codec, 'zstandard', + f"Manifest file uses codec '{codec}' but Java defaults to zstd.") + + list_path = f"{mlm.manifest_path}/{snap.base_manifest_list}" + with table.file_io.new_input_stream(list_path) as f: + list_codec = fastavro.reader(f).metadata.get('avro.codec', b'null') + if isinstance(list_codec, bytes): + list_codec = list_codec.decode() + self.assertEqual( + list_codec, 'zstandard', + f"Manifest list uses codec '{list_codec}' but Java defaults to zstd.") + class ManifestListManagerTest(_ManifestManagerSetup): """Tests for ManifestListManager.""" From 92b07eb976f793a0f5724099fab7c678a919cfb8 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 21:13:40 +0800 Subject: [PATCH 02/11] [python] Use zstd compression for manifest files by default Manifest files and manifest list files were written without compression (null codec), resulting in larger files on storage. Add manifest.compression option (default zstd) and pass the codec to fastavro when writing manifest and manifest list files. From 926834518b77b54df81356b04d8dabc9f1d956cb Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 21:20:29 +0800 Subject: [PATCH 03/11] [python] Remove _AVRO_SYNC_INTERVAL from compression PR --- paimon-python/pypaimon/manifest/manifest_file_manager.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index 4005509f7cca..d343bcd835c7 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -44,8 +44,6 @@ def _avro_codec(compression: str) -> str: class ManifestFileManager: """Writer for manifest files in Avro format using unified FileIO.""" - _AVRO_SYNC_INTERVAL = 16000 - def __init__(self, table): from pypaimon.table.file_store_table import FileStoreTable From 040d9280a96a5fac5ceecea2fbaa0bb0e1559092 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 21:34:09 +0800 Subject: [PATCH 04/11] [python] Apply zstd compression to index manifest files --- .../pypaimon/manifest/index_manifest_file.py | 5 ++++- .../tests/manifest/manifest_manager_test.py | 20 +++++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/manifest/index_manifest_file.py b/paimon-python/pypaimon/manifest/index_manifest_file.py index a7d82d4b12ba..326352373e14 100644 --- a/paimon-python/pypaimon/manifest/index_manifest_file.py +++ b/paimon-python/pypaimon/manifest/index_manifest_file.py @@ -85,12 +85,14 @@ class IndexManifestFile: def __init__(self, table): from pypaimon.table.file_store_table import FileStoreTable + from pypaimon.manifest.manifest_file_manager import _avro_codec self.table: FileStoreTable = table manifest_path = table.table_path.rstrip('/') self.manifest_path = f"{manifest_path}/manifest" self.file_io = table.file_io self.partition_keys_fields = self.table.partition_keys_fields + self._codec = _avro_codec(table.options.manifest_compression()) def read(self, index_manifest_name: str) -> List[IndexManifestEntry]: index_manifest_path = f"{self.manifest_path}/{index_manifest_name}" @@ -247,7 +249,8 @@ def write(self, entries: List[IndexManifestEntry]) -> str: records = [self._to_avro_record(e) for e in entries] try: buffer = BytesIO() - fastavro.writer(buffer, INDEX_MANIFEST_ENTRY_SCHEMA, records) + fastavro.writer(buffer, INDEX_MANIFEST_ENTRY_SCHEMA, records, + codec=self._codec) with self.file_io.new_output_stream(path) as output_stream: output_stream.write(buffer.getvalue()) except Exception as e: diff --git a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py index e0e1881959ae..8a64263f29f0 100644 --- a/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py +++ b/paimon-python/pypaimon/tests/manifest/manifest_manager_test.py @@ -363,6 +363,26 @@ def test_manifest_compression(self): list_codec, 'zstandard', f"Manifest list uses codec '{list_codec}' but Java defaults to zstd.") + from pypaimon.manifest.index_manifest_file import IndexManifestFile + from pypaimon.manifest.index_manifest_entry import IndexManifestEntry + from pypaimon.index.index_file_meta import IndexFileMeta + imf = IndexManifestFile(table) + idx_entry = IndexManifestEntry( + kind=0, partition=GenericRow([], []), bucket=0, + index_file=IndexFileMeta( + index_type='BTREE', file_name='idx-0', + file_size=100, row_count=10), + ) + idx_file = imf.write([idx_entry]) + idx_path = f"{mlm.manifest_path}/{idx_file}" + with table.file_io.new_input_stream(idx_path) as f: + idx_codec = fastavro.reader(f).metadata.get('avro.codec', b'null') + if isinstance(idx_codec, bytes): + idx_codec = idx_codec.decode() + self.assertEqual( + idx_codec, 'zstandard', + f"Index manifest uses codec '{idx_codec}' but Java defaults to zstd.") + class ManifestListManagerTest(_ManifestManagerSetup): """Tests for ManifestListManager.""" From 99b2c3e9a1199b5dc25806214175d4f6b00721f8 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 21:47:19 +0800 Subject: [PATCH 05/11] [python] Validate manifest compression codec against supported values --- .../pypaimon/manifest/manifest_file_manager.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index d343bcd835c7..019fa20e969a 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -34,11 +34,19 @@ from pypaimon.table.row.binary_row import BinaryRow +_SUPPORTED_CODECS = {'null', 'deflate', 'snappy', 'zstandard', 'bzip2', 'xz', 'lz4'} + + def _avro_codec(compression: str) -> str: """Map Paimon compression config to fastavro codec name.""" if compression.lower() in ('zstd', 'zstandard'): return 'zstandard' - return compression + codec = compression.lower() + if codec not in _SUPPORTED_CODECS: + raise ValueError( + f"Unsupported manifest compression '{compression}'. " + f"Supported: {sorted(_SUPPORTED_CODECS)}") + return codec class ManifestFileManager: From 9646d91c08cf80c27defe1f8c37152fd1928cdca Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 21:54:16 +0800 Subject: [PATCH 06/11] [python] Move avro_codec to manifest package to decouple from manifest_file_manager --- paimon-python/pypaimon/manifest/__init__.py | 17 +++++++++++++++++ .../pypaimon/manifest/index_manifest_file.py | 4 ++-- .../pypaimon/manifest/manifest_file_manager.py | 18 ++---------------- .../pypaimon/manifest/manifest_list_manager.py | 4 ++-- 4 files changed, 23 insertions(+), 20 deletions(-) diff --git a/paimon-python/pypaimon/manifest/__init__.py b/paimon-python/pypaimon/manifest/__init__.py index 8cb828ad1d65..dbc9d0e27da1 100644 --- a/paimon-python/pypaimon/manifest/__init__.py +++ b/paimon-python/pypaimon/manifest/__init__.py @@ -23,3 +23,20 @@ from pypaimon.manifest import fastavro_py36_compat # noqa: F401 except ImportError: pass + + +_SUPPORTED_AVRO_CODECS = { + 'null', 'deflate', 'snappy', 'zstandard', 'bzip2', 'xz', 'lz4', +} + + +def avro_codec(compression): + """Map Paimon compression config value to fastavro codec name.""" + if compression.lower() in ('zstd', 'zstandard'): + return 'zstandard' + codec = compression.lower() + if codec not in _SUPPORTED_AVRO_CODECS: + raise ValueError( + f"Unsupported manifest compression '{compression}'. " + f"Supported: {sorted(_SUPPORTED_AVRO_CODECS)}") + return codec diff --git a/paimon-python/pypaimon/manifest/index_manifest_file.py b/paimon-python/pypaimon/manifest/index_manifest_file.py index 326352373e14..df045906e2ad 100644 --- a/paimon-python/pypaimon/manifest/index_manifest_file.py +++ b/paimon-python/pypaimon/manifest/index_manifest_file.py @@ -85,14 +85,14 @@ class IndexManifestFile: def __init__(self, table): from pypaimon.table.file_store_table import FileStoreTable - from pypaimon.manifest.manifest_file_manager import _avro_codec + from pypaimon.manifest import avro_codec self.table: FileStoreTable = table manifest_path = table.table_path.rstrip('/') self.manifest_path = f"{manifest_path}/manifest" self.file_io = table.file_io self.partition_keys_fields = self.table.partition_keys_fields - self._codec = _avro_codec(table.options.manifest_compression()) + self._codec = avro_codec(table.options.manifest_compression()) def read(self, index_manifest_name: str) -> List[IndexManifestEntry]: index_manifest_path = f"{self.manifest_path}/{index_manifest_name}" diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index 019fa20e969a..cfcb7044a765 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -34,21 +34,6 @@ from pypaimon.table.row.binary_row import BinaryRow -_SUPPORTED_CODECS = {'null', 'deflate', 'snappy', 'zstandard', 'bzip2', 'xz', 'lz4'} - - -def _avro_codec(compression: str) -> str: - """Map Paimon compression config to fastavro codec name.""" - if compression.lower() in ('zstd', 'zstandard'): - return 'zstandard' - codec = compression.lower() - if codec not in _SUPPORTED_CODECS: - raise ValueError( - f"Unsupported manifest compression '{compression}'. " - f"Supported: {sorted(_SUPPORTED_CODECS)}") - return codec - - class ManifestFileManager: """Writer for manifest files in Avro format using unified FileIO.""" @@ -62,7 +47,8 @@ def __init__(self, table): self.partition_keys_fields = self.table.partition_keys_fields self.primary_keys_fields = self.table.primary_keys_fields self.trimmed_primary_keys_fields = self.table.trimmed_primary_keys_fields - self._codec = _avro_codec(table.options.manifest_compression()) + from pypaimon.manifest import avro_codec + self._codec = avro_codec(table.options.manifest_compression()) def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], manifest_entry_filter=None, drop_stats=True, max_workers=8, diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py b/paimon-python/pypaimon/manifest/manifest_list_manager.py index 05e695999854..d07d09c6550a 100644 --- a/paimon-python/pypaimon/manifest/manifest_list_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py @@ -32,13 +32,13 @@ class ManifestListManager: def __init__(self, table): from pypaimon.table.file_store_table import FileStoreTable - from pypaimon.manifest.manifest_file_manager import _avro_codec + from pypaimon.manifest import avro_codec self.table: FileStoreTable = table manifest_path = table.table_path.rstrip('/') self.manifest_path = f"{manifest_path}/manifest" self.file_io = self.table.file_io - self._codec = _avro_codec(table.options.manifest_compression()) + self._codec = avro_codec(table.options.manifest_compression()) def read_all(self, snapshot: Optional[Snapshot]) -> List[ManifestFileMeta]: """Read base + delta manifest lists for full file state.""" From f80fb087428336621253b0871920a85532d4fc27 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 22:00:58 +0800 Subject: [PATCH 07/11] [python] Remove lz4 from supported manifest codecs for Java Avro compatibility --- paimon-python/pypaimon/manifest/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/manifest/__init__.py b/paimon-python/pypaimon/manifest/__init__.py index dbc9d0e27da1..f581e411eeb7 100644 --- a/paimon-python/pypaimon/manifest/__init__.py +++ b/paimon-python/pypaimon/manifest/__init__.py @@ -26,7 +26,7 @@ _SUPPORTED_AVRO_CODECS = { - 'null', 'deflate', 'snappy', 'zstandard', 'bzip2', 'xz', 'lz4', + 'null', 'deflate', 'snappy', 'zstandard', 'bzip2', 'xz', } From ce757a03bae23d04d1ff1aee8dca81d05cf4ee1b Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 22:20:54 +0800 Subject: [PATCH 08/11] [python] Include zstd alias in supported codecs error message --- paimon-python/pypaimon/manifest/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/manifest/__init__.py b/paimon-python/pypaimon/manifest/__init__.py index f581e411eeb7..84088daad716 100644 --- a/paimon-python/pypaimon/manifest/__init__.py +++ b/paimon-python/pypaimon/manifest/__init__.py @@ -38,5 +38,5 @@ def avro_codec(compression): if codec not in _SUPPORTED_AVRO_CODECS: raise ValueError( f"Unsupported manifest compression '{compression}'. " - f"Supported: {sorted(_SUPPORTED_AVRO_CODECS)}") + f"Supported: {sorted(_SUPPORTED_AVRO_CODECS | {'zstd'})}") return codec From d1f05c9df3c9ef027066f48bc6485d1d1622f559 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 22:22:31 +0800 Subject: [PATCH 09/11] [python] Accept only zstd (not zstandard) to match Java config convention --- paimon-python/pypaimon/manifest/__init__.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/paimon-python/pypaimon/manifest/__init__.py b/paimon-python/pypaimon/manifest/__init__.py index 84088daad716..fd842e10a528 100644 --- a/paimon-python/pypaimon/manifest/__init__.py +++ b/paimon-python/pypaimon/manifest/__init__.py @@ -25,18 +25,18 @@ pass -_SUPPORTED_AVRO_CODECS = { - 'null', 'deflate', 'snappy', 'zstandard', 'bzip2', 'xz', +_SUPPORTED_MANIFEST_COMPRESSIONS = { + 'null', 'deflate', 'snappy', 'zstd', 'bzip2', 'xz', } def avro_codec(compression): - """Map Paimon compression config value to fastavro codec name.""" - if compression.lower() in ('zstd', 'zstandard'): - return 'zstandard' - codec = compression.lower() - if codec not in _SUPPORTED_AVRO_CODECS: + """Map Paimon manifest.compression config value to fastavro codec name.""" + lower = compression.lower() + if lower not in _SUPPORTED_MANIFEST_COMPRESSIONS: raise ValueError( f"Unsupported manifest compression '{compression}'. " - f"Supported: {sorted(_SUPPORTED_AVRO_CODECS | {'zstd'})}") - return codec + f"Supported: {sorted(_SUPPORTED_MANIFEST_COMPRESSIONS)}") + if lower == 'zstd': + return 'zstandard' + return lower From 7960bfd5bdeb066c155e596d8180c4c0c4c98f40 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 22:37:38 +0800 Subject: [PATCH 10/11] [python] Fix Mock table in manifest_schema_test for manifest_compression --- paimon-python/pypaimon/tests/manifest/manifest_schema_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/paimon-python/pypaimon/tests/manifest/manifest_schema_test.py b/paimon-python/pypaimon/tests/manifest/manifest_schema_test.py index b806ad3fa91f..f483d84cccff 100644 --- a/paimon-python/pypaimon/tests/manifest/manifest_schema_test.py +++ b/paimon-python/pypaimon/tests/manifest/manifest_schema_test.py @@ -221,6 +221,7 @@ def test_read_legacy_manifest_list(self): table.table_path = table_path table.file_io = file_io table.partition_keys_fields = [] + table.options.manifest_compression.return_value = 'zstd' manager = ManifestListManager(table) metas = manager.read(manifest_list_name) From c10edd437518b9b58a6c3977d2061f33d3e92178 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 28 Jun 2026 23:18:30 +0800 Subject: [PATCH 11/11] [python] Fix flake8 E128 indent in fastavro.writer codec lines --- paimon-python/pypaimon/manifest/index_manifest_file.py | 5 +++-- paimon-python/pypaimon/manifest/manifest_file_manager.py | 5 +++-- paimon-python/pypaimon/manifest/manifest_list_manager.py | 5 +++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/paimon-python/pypaimon/manifest/index_manifest_file.py b/paimon-python/pypaimon/manifest/index_manifest_file.py index df045906e2ad..62ca9ea16dc6 100644 --- a/paimon-python/pypaimon/manifest/index_manifest_file.py +++ b/paimon-python/pypaimon/manifest/index_manifest_file.py @@ -249,8 +249,9 @@ def write(self, entries: List[IndexManifestEntry]) -> str: records = [self._to_avro_record(e) for e in entries] try: buffer = BytesIO() - fastavro.writer(buffer, INDEX_MANIFEST_ENTRY_SCHEMA, records, - codec=self._codec) + fastavro.writer( + buffer, INDEX_MANIFEST_ENTRY_SCHEMA, records, + codec=self._codec) with self.file_io.new_output_stream(path) as output_stream: output_stream.write(buffer.getvalue()) except Exception as e: diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index cfcb7044a765..9840b918e4a7 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -254,8 +254,9 @@ def write(self, file_name, entries: List[ManifestEntry]): manifest_path = f"{self.manifest_path}/{file_name}" try: buffer = BytesIO() - fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, avro_records, - codec=self._codec) + fastavro.writer( + buffer, MANIFEST_ENTRY_SCHEMA, avro_records, + codec=self._codec) avro_bytes = buffer.getvalue() with self.file_io.new_output_stream(manifest_path) as output_stream: output_stream.write(avro_bytes) diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py b/paimon-python/pypaimon/manifest/manifest_list_manager.py index d07d09c6550a..3a0e606ef5c4 100644 --- a/paimon-python/pypaimon/manifest/manifest_list_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py @@ -126,8 +126,9 @@ def write(self, file_name, manifest_file_metas: List[ManifestFileMeta]): list_path = f"{self.manifest_path}/{file_name}" try: buffer = BytesIO() - fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, avro_records, - codec=self._codec) + fastavro.writer( + buffer, MANIFEST_FILE_META_SCHEMA, avro_records, + codec=self._codec) avro_bytes = buffer.getvalue() with self.file_io.new_output_stream(list_path) as output_stream: output_stream.write(avro_bytes)