Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,13 @@ class CoreOptions:
.with_description("The parallelism for scanning manifest files.")
)

MANIFEST_COMPRESSION: ConfigOption[str] = (
ConfigOptions.key("manifest.compression")
.string_type()
.default_value("zstd")
.with_description("Default file compression for manifest.")
)

MANIFEST_TARGET_FILE_SIZE: ConfigOption[MemorySize] = (
ConfigOptions.key("manifest.target-file-size")
.memory_type()
Expand Down Expand Up @@ -860,6 +867,9 @@ def dynamic_bucket_max_buckets(self, default=None):
def scan_manifest_parallelism(self, default=None):
return self.options.get(CoreOptions.SCAN_MANIFEST_PARALLELISM, default)

def manifest_compression(self, default=None):
return self.options.get(CoreOptions.MANIFEST_COMPRESSION, default)

def manifest_target_size(self, default=None):
if default is not None and not isinstance(default, MemorySize):
default = MemorySize.of_bytes(default) if isinstance(default, int) else MemorySize.parse(default)
Expand Down
17 changes: 17 additions & 0 deletions paimon-python/pypaimon/manifest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,20 @@
from pypaimon.manifest import fastavro_py36_compat # noqa: F401
except ImportError:
pass


_SUPPORTED_MANIFEST_COMPRESSIONS = {
'null', 'deflate', 'snappy', 'zstd', 'bzip2', 'xz',
}


def avro_codec(compression):
"""Map Paimon manifest.compression config value to fastavro codec name."""
lower = compression.lower()
if lower not in _SUPPORTED_MANIFEST_COMPRESSIONS:
raise ValueError(
f"Unsupported manifest compression '{compression}'. "
f"Supported: {sorted(_SUPPORTED_MANIFEST_COMPRESSIONS)}")
if lower == 'zstd':
return 'zstandard'
return lower
6 changes: 5 additions & 1 deletion paimon-python/pypaimon/manifest/index_manifest_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,14 @@ class IndexManifestFile:

def __init__(self, table):
from pypaimon.table.file_store_table import FileStoreTable
from pypaimon.manifest import avro_codec

self.table: FileStoreTable = table
manifest_path = table.table_path.rstrip('/')
self.manifest_path = f"{manifest_path}/manifest"
self.file_io = table.file_io
self.partition_keys_fields = self.table.partition_keys_fields
self._codec = avro_codec(table.options.manifest_compression())

def read(self, index_manifest_name: str) -> List[IndexManifestEntry]:
index_manifest_path = f"{self.manifest_path}/{index_manifest_name}"
Expand Down Expand Up @@ -247,7 +249,9 @@ def write(self, entries: List[IndexManifestEntry]) -> str:
records = [self._to_avro_record(e) for e in entries]
try:
buffer = BytesIO()
fastavro.writer(buffer, INDEX_MANIFEST_ENTRY_SCHEMA, records)
fastavro.writer(
buffer, INDEX_MANIFEST_ENTRY_SCHEMA, records,
codec=self._codec)
with self.file_io.new_output_stream(path) as output_stream:
output_stream.write(buffer.getvalue())
except Exception as e:
Expand Down
6 changes: 5 additions & 1 deletion paimon-python/pypaimon/manifest/manifest_file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def __init__(self, table):
self.partition_keys_fields = self.table.partition_keys_fields
self.primary_keys_fields = self.table.primary_keys_fields
self.trimmed_primary_keys_fields = self.table.trimmed_primary_keys_fields
from pypaimon.manifest import avro_codec
self._codec = avro_codec(table.options.manifest_compression())

def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], manifest_entry_filter=None,
drop_stats=True, max_workers=8,
Expand Down Expand Up @@ -252,7 +254,9 @@ def write(self, file_name, entries: List[ManifestEntry]):
manifest_path = f"{self.manifest_path}/{file_name}"
try:
buffer = BytesIO()
fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, avro_records)
fastavro.writer(
buffer, MANIFEST_ENTRY_SCHEMA, avro_records,
codec=self._codec)
avro_bytes = buffer.getvalue()
with self.file_io.new_output_stream(manifest_path) as output_stream:
output_stream.write(avro_bytes)
Expand Down
6 changes: 5 additions & 1 deletion paimon-python/pypaimon/manifest/manifest_list_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ class ManifestListManager:

def __init__(self, table):
from pypaimon.table.file_store_table import FileStoreTable
from pypaimon.manifest import avro_codec

self.table: FileStoreTable = table
manifest_path = table.table_path.rstrip('/')
self.manifest_path = f"{manifest_path}/manifest"
self.file_io = self.table.file_io
self._codec = avro_codec(table.options.manifest_compression())

def read_all(self, snapshot: Optional[Snapshot]) -> List[ManifestFileMeta]:
"""Read base + delta manifest lists for full file state."""
Expand Down Expand Up @@ -124,7 +126,9 @@ def write(self, file_name, manifest_file_metas: List[ManifestFileMeta]):
list_path = f"{self.manifest_path}/{file_name}"
try:
buffer = BytesIO()
fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, avro_records)
fastavro.writer(
buffer, MANIFEST_FILE_META_SCHEMA, avro_records,
codec=self._codec)
avro_bytes = buffer.getvalue()
with self.file_io.new_output_stream(list_path) as output_stream:
output_stream.write(avro_bytes)
Expand Down
58 changes: 58 additions & 0 deletions paimon-python/pypaimon/tests/manifest/manifest_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,64 @@ def test_read_write_cols_with_system_field(self):
self.assertEqual(read_stats.max_values.get_field(0), 10)
self.assertEqual(read_stats.null_counts, [2])

def test_manifest_compression(self):
import fastavro
pa_schema = pa.schema([('pk', pa.int32()), ('val', pa.string())])
schema = Schema.from_pyarrow_schema(
pa_schema, primary_keys=['pk'], options={'bucket': '1'})
self.catalog.create_table('default.codec_test', schema, False)
table = self.catalog.get_table('default.codec_test')

wb = table.new_batch_write_builder()
w = wb.new_write()
w.write_arrow(pa.Table.from_pylist(
[{'pk': i, 'val': f'v{i}'} for i in range(10)], schema=pa_schema))
wb.new_commit().commit(w.prepare_commit())
w.close()

snap = table.snapshot_manager().get_latest_snapshot()
mlm = ManifestListManager(table)
metas = mlm.read_all(snap)
self.assertGreater(len(metas), 0)

manifest_path = f"{mlm.manifest_path}/{metas[0].file_name}"
with table.file_io.new_input_stream(manifest_path) as f:
codec = fastavro.reader(f).metadata.get('avro.codec', b'null')
if isinstance(codec, bytes):
codec = codec.decode()
self.assertEqual(
codec, 'zstandard',
f"Manifest file uses codec '{codec}' but Java defaults to zstd.")

list_path = f"{mlm.manifest_path}/{snap.base_manifest_list}"
with table.file_io.new_input_stream(list_path) as f:
list_codec = fastavro.reader(f).metadata.get('avro.codec', b'null')
if isinstance(list_codec, bytes):
list_codec = list_codec.decode()
self.assertEqual(
list_codec, 'zstandard',
f"Manifest list uses codec '{list_codec}' but Java defaults to zstd.")

from pypaimon.manifest.index_manifest_file import IndexManifestFile
from pypaimon.manifest.index_manifest_entry import IndexManifestEntry
from pypaimon.index.index_file_meta import IndexFileMeta
imf = IndexManifestFile(table)
idx_entry = IndexManifestEntry(
kind=0, partition=GenericRow([], []), bucket=0,
index_file=IndexFileMeta(
index_type='BTREE', file_name='idx-0',
file_size=100, row_count=10),
)
idx_file = imf.write([idx_entry])
idx_path = f"{mlm.manifest_path}/{idx_file}"
with table.file_io.new_input_stream(idx_path) as f:
idx_codec = fastavro.reader(f).metadata.get('avro.codec', b'null')
if isinstance(idx_codec, bytes):
idx_codec = idx_codec.decode()
self.assertEqual(
idx_codec, 'zstandard',
f"Index manifest uses codec '{idx_codec}' but Java defaults to zstd.")


class ManifestListManagerTest(_ManifestManagerSetup):
"""Tests for ManifestListManager."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ def test_read_legacy_manifest_list(self):
table.table_path = table_path
table.file_io = file_io
table.partition_keys_fields = []
table.options.manifest_compression.return_value = 'zstd'
manager = ManifestListManager(table)
metas = manager.read(manifest_list_name)

Expand Down
Loading