Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2fcdd6b
[python] Split manifest files by target size during commit and compac…
XiaoHongbo-Hope Jun 28, 2026
9af4710
[python] Fix rolling_write: clean up on partial failure, remove doubl…
XiaoHongbo-Hope Jun 28, 2026
96e8169
[python] Use size-aware adaptive rolling instead of avg-based chunk s…
XiaoHongbo-Hope Jun 28, 2026
a650faa
[python] Replace fragile name parsing with explicit name_prefix param…
XiaoHongbo-Hope Jun 28, 2026
d2060bd
[python] Fix flake8 E128 continuation line indent in manifest tests
XiaoHongbo-Hope Jun 28, 2026
bd1ec85
[python] Update file_store_commit_test for renamed _write_manifest_files
XiaoHongbo-Hope Jun 28, 2026
bafc393
[python] Use sample-based size estimation in rolling_write
XiaoHongbo-Hope Jun 28, 2026
5b8ab2f
[python] Track rolling manifest files in new_manifest_files_for_abort
XiaoHongbo-Hope Jun 28, 2026
f9202a6
[python] Use streaming rolling write aligned with Java RollingFileWriter
XiaoHongbo-Hope Jun 28, 2026
75075a9
[python] Use sync_interval for per-record size tracking in rolling write
XiaoHongbo-Hope Jun 28, 2026
43f1f86
[python] Extract sync_interval magic number to _AVRO_SYNC_INTERVAL co…
XiaoHongbo-Hope Jun 28, 2026
a757ce0
[python] Allow merger to split oversized single-file manifest groups
XiaoHongbo-Hope Jun 28, 2026
e2b0b55
[python] Revert merger single-file split to match Java behavior
XiaoHongbo-Hope Jun 28, 2026
4edecb8
[python] Clean up manifest files via new_manifest_files_for_abort on …
XiaoHongbo-Hope Jun 28, 2026
1e608f6
[python] Pass known file size to _build_meta to avoid remote HEAD req…
XiaoHongbo-Hope Jun 28, 2026
10cd1f1
[python] Track written files separately for rollback in rolling_write
XiaoHongbo-Hope Jun 28, 2026
2d5cce9
[python] Add read-back assertion for rolling manifest files
XiaoHongbo-Hope Jun 28, 2026
8f36b3b
[python] Remove extra blank line between test classes
XiaoHongbo-Hope Jun 28, 2026
0eb8094
[python] Remove redundant manifest file cleanup in except block
XiaoHongbo-Hope Jun 28, 2026
be79835
[python] Align cleanup with Java CommitCleaner two-step structure
XiaoHongbo-Hope Jun 28, 2026
247413f
[python] Add known metas fallback to _clean_up_reuse_tmp_manifests
XiaoHongbo-Hope Jun 28, 2026
c5c4ccf
[python] Guard cleanup with try/except, simplify merge new_files, rem…
XiaoHongbo-Hope Jun 28, 2026
d7f34ee
[python] Fix unbound merge_new_files and remove unused merge_after_ma…
XiaoHongbo-Hope Jun 28, 2026
542aecc
[python] Remove trailing blank line at EOF
XiaoHongbo-Hope Jun 28, 2026
235f93e
[python] Remove known metas fallback to match Java CommitCleaner
XiaoHongbo-Hope Jun 28, 2026
a6790c9
[python] Fix flake8 E128 indent in cleanup method signatures
XiaoHongbo-Hope Jun 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 97 additions & 46 deletions paimon-python/pypaimon/manifest/manifest_file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
class ManifestFileManager:
"""Writer for manifest files in Avro format using unified FileIO."""

_AVRO_SYNC_INTERVAL = 16000

def __init__(self, table):
from pypaimon.table.file_store_table import FileStoreTable

Expand Down Expand Up @@ -208,60 +210,107 @@ def _get_value_stats_fields(self, file_dict: dict, schema_fields: list) -> List:
return fields

def write(self, file_name, entries: List[ManifestEntry]):
avro_records = []
for entry in entries:
avro_record = {
"_VERSION": 2,
"_KIND": entry.kind,
"_PARTITION": GenericRowSerializer.to_bytes(entry.partition),
"_BUCKET": entry.bucket,
"_TOTAL_BUCKETS": entry.total_buckets,
"_FILE": {
"_FILE_NAME": entry.file.file_name,
"_FILE_SIZE": entry.file.file_size,
"_ROW_COUNT": entry.file.row_count,
"_MIN_KEY": GenericRowSerializer.to_bytes(entry.file.min_key),
"_MAX_KEY": GenericRowSerializer.to_bytes(entry.file.max_key),
"_KEY_STATS": {
"_MIN_VALUES": GenericRowSerializer.to_bytes(entry.file.key_stats.min_values),
"_MAX_VALUES": GenericRowSerializer.to_bytes(entry.file.key_stats.max_values),
"_NULL_COUNTS": entry.file.key_stats.null_counts,
},
"_VALUE_STATS": {
"_MIN_VALUES": GenericRowSerializer.to_bytes(entry.file.value_stats.min_values),
"_MAX_VALUES": GenericRowSerializer.to_bytes(entry.file.value_stats.max_values),
"_NULL_COUNTS": entry.file.value_stats.null_counts,
},
"_MIN_SEQUENCE_NUMBER": entry.file.min_sequence_number,
"_MAX_SEQUENCE_NUMBER": entry.file.max_sequence_number,
"_SCHEMA_ID": entry.file.schema_id,
"_LEVEL": entry.file.level,
"_EXTRA_FILES": entry.file.extra_files,
"_CREATION_TIME": entry.file.creation_time.get_millisecond() if entry.file.creation_time else None,
"_DELETE_ROW_COUNT": entry.file.delete_row_count,
"_EMBEDDED_FILE_INDEX": entry.file.embedded_index,
"_FILE_SOURCE": entry.file.file_source,
"_VALUE_STATS_COLS": entry.file.value_stats_cols,
"_EXTERNAL_PATH": entry.file.external_path,
"_FIRST_ROW_ID": entry.file.first_row_id,
"_WRITE_COLS": entry.file.write_cols,
}
buf = BytesIO()
fastavro.writer(buf, MANIFEST_ENTRY_SCHEMA, self._to_avro_records(entries))
self._flush(file_name, buf.getvalue())

def rolling_write(self, entries: List[ManifestEntry],
suggested_file_size: int,
name_prefix: str) -> List[ManifestFileMeta]:
if not entries:
return []

from fastavro.write import Writer

sync_interval = min(self._AVRO_SYNC_INTERVAL, suggested_file_size)
result = []
written_files = []
chunk_start = 0
buf = BytesIO()
writer = Writer(buf, MANIFEST_ENTRY_SCHEMA, sync_interval=sync_interval)
try:
for i, entry in enumerate(entries):
writer.write(self._to_avro_record(entry))
if buf.tell() >= suggested_file_size:
writer.flush()
avro_bytes = buf.getvalue()
file_name = f"{name_prefix}-{len(result)}"
self._flush(file_name, avro_bytes)
written_files.append(file_name)
result.append(self._build_meta(
file_name, entries[chunk_start:i + 1], len(avro_bytes)))
chunk_start = i + 1
buf = BytesIO()
writer = Writer(buf, MANIFEST_ENTRY_SCHEMA, sync_interval=sync_interval)

if chunk_start < len(entries):
writer.flush()
avro_bytes = buf.getvalue()
file_name = f"{name_prefix}-{len(result)}"
self._flush(file_name, avro_bytes)
written_files.append(file_name)
result.append(self._build_meta(
file_name, entries[chunk_start:], len(avro_bytes)))
except Exception:
for fname in written_files:
self.file_io.delete_quietly(f"{self.manifest_path}/{fname}")
raise
return result

@staticmethod
def _to_avro_record(entry: ManifestEntry) -> dict:
return {
"_VERSION": 2,
"_KIND": entry.kind,
"_PARTITION": GenericRowSerializer.to_bytes(entry.partition),
"_BUCKET": entry.bucket,
"_TOTAL_BUCKETS": entry.total_buckets,
"_FILE": {
"_FILE_NAME": entry.file.file_name,
"_FILE_SIZE": entry.file.file_size,
"_ROW_COUNT": entry.file.row_count,
"_MIN_KEY": GenericRowSerializer.to_bytes(entry.file.min_key),
"_MAX_KEY": GenericRowSerializer.to_bytes(entry.file.max_key),
"_KEY_STATS": {
"_MIN_VALUES": GenericRowSerializer.to_bytes(entry.file.key_stats.min_values),
"_MAX_VALUES": GenericRowSerializer.to_bytes(entry.file.key_stats.max_values),
"_NULL_COUNTS": entry.file.key_stats.null_counts,
},
"_VALUE_STATS": {
"_MIN_VALUES": GenericRowSerializer.to_bytes(entry.file.value_stats.min_values),
"_MAX_VALUES": GenericRowSerializer.to_bytes(entry.file.value_stats.max_values),
"_NULL_COUNTS": entry.file.value_stats.null_counts,
},
"_MIN_SEQUENCE_NUMBER": entry.file.min_sequence_number,
"_MAX_SEQUENCE_NUMBER": entry.file.max_sequence_number,
"_SCHEMA_ID": entry.file.schema_id,
"_LEVEL": entry.file.level,
"_EXTRA_FILES": entry.file.extra_files,
"_CREATION_TIME": entry.file.creation_time.get_millisecond() if entry.file.creation_time else None,
"_DELETE_ROW_COUNT": entry.file.delete_row_count,
"_EMBEDDED_FILE_INDEX": entry.file.embedded_index,
"_FILE_SOURCE": entry.file.file_source,
"_VALUE_STATS_COLS": entry.file.value_stats_cols,
"_EXTERNAL_PATH": entry.file.external_path,
"_FIRST_ROW_ID": entry.file.first_row_id,
"_WRITE_COLS": entry.file.write_cols,
}
avro_records.append(avro_record)
}

def _to_avro_records(self, entries: List[ManifestEntry]) -> List[dict]:
return [self._to_avro_record(e) for e in entries]

def _flush(self, file_name: str, avro_bytes: bytes):
manifest_path = f"{self.manifest_path}/{file_name}"
try:
buffer = BytesIO()
fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, avro_records)
avro_bytes = buffer.getvalue()
with self.file_io.new_output_stream(manifest_path) as output_stream:
output_stream.write(avro_bytes)
except Exception as e:
self.file_io.delete_quietly(manifest_path)
raise RuntimeError(f"Failed to write manifest file: {e}") from e

def write_with_meta(self, file_name, entries: List[ManifestEntry]) -> ManifestFileMeta:
self.write(file_name, entries)
def _build_meta(self, file_name: str, entries: List[ManifestEntry],
file_size: int = None) -> ManifestFileMeta:
added_file_count = 0
deleted_file_count = 0
schema_id = None
Expand Down Expand Up @@ -296,10 +345,12 @@ def write_with_meta(self, file_name, entries: List[ManifestEntry]) -> ManifestFi
if max_row_id is None or file_range.to > max_row_id:
max_row_id = file_range.to

manifest_file_path = f"{self.manifest_path}/{file_name}"
if file_size is None:
manifest_file_path = f"{self.manifest_path}/{file_name}"
file_size = self.table.file_io.get_file_size(manifest_file_path)
return ManifestFileMeta(
file_name=file_name,
file_size=self.table.file_io.get_file_size(manifest_file_path),
file_size=file_size,
num_added_files=added_file_count,
num_deleted_files=deleted_file_count,
partition_stats=SimpleStats(
Expand Down
12 changes: 5 additions & 7 deletions paimon-python/pypaimon/manifest/manifest_file_merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,11 @@ def _merge_candidates(self, candidates: List[ManifestFileMeta],
if not merged_entries:
return

manifest_file = "manifest-{}-0".format(str(uuid.uuid4()))
merged_meta = self.manifest_file_manager.write_with_meta(
manifest_file,
merged_entries,
)
result.append(merged_meta)
new_files.append(merged_meta)
manifest_file = "manifest-{}".format(str(uuid.uuid4()))
merged_metas = self.manifest_file_manager.rolling_write(
merged_entries, self.suggested_meta_size, manifest_file)
result.extend(merged_metas)
new_files.extend(merged_metas)

def _delete_manifests(self, manifests: List[ManifestFileMeta]):
for manifest in manifests:
Expand Down
4 changes: 2 additions & 2 deletions paimon-python/pypaimon/tests/file_store_commit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ def test_append_commit_inherits_index_manifest(
snapshot_commit.commit.return_value = True
file_store_commit.snapshot_commit = snapshot_commit

file_store_commit._write_manifest_file = Mock(return_value=Mock())
file_store_commit._write_manifest_files = Mock(return_value=[Mock()])
file_store_commit._generate_partition_statistics = Mock(return_value=[])
file_store_commit.manifest_list_manager.read_all.return_value = []

Expand Down Expand Up @@ -492,7 +492,7 @@ def make_file(name):
file=make_file("f2.parquet")),
]

result = file_store_commit._write_manifest_file(entries, "manifest-test")
result = file_store_commit._write_manifest_files(entries, "manifest-test")
self.assertIsNotNone(result)

@staticmethod
Expand Down
80 changes: 80 additions & 0 deletions paimon-python/pypaimon/tests/manifest/manifest_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,86 @@ def test_read_write_cols_with_system_field(self):
self.assertEqual(read_stats.max_values.get_field(0), 10)
self.assertEqual(read_stats.null_counts, [2])

def test_commit_manifest_exceeds_target_size(self):
target_size = 16 * 1024
pa_schema = pa.schema([
('pt', pa.string()),
('pk', pa.int32()),
('val', pa.string()),
])
schema = Schema.from_pyarrow_schema(
pa_schema,
primary_keys=['pk'],
partition_keys=['pt'],
options={
'bucket': '1',
'manifest.target-file-size': '16 kb',
},
)
self.catalog.create_table('default.rolling_bug', schema, False)
table = self.catalog.get_table('default.rolling_bug')

wb = table.new_batch_write_builder()
w = wb.new_write()
for pt in range(200):
rows = [{'pt': f'p{pt}', 'pk': i, 'val': f'v{i}'} for i in range(5)]
w.write_arrow(pa.Table.from_pylist(rows, schema=pa_schema))
wb.new_commit().commit(w.prepare_commit())
w.close()

snap = table.snapshot_manager().get_latest_snapshot()
metas = ManifestListManager(table).read_all(snap)
max_allowed = target_size * 2
oversized = [m for m in metas if m.file_size > max_allowed]
self.assertEqual(
len(oversized), 0,
f"{len(oversized)} manifest file(s) exceed 2x target ({max_allowed} bytes): "
f"{[(m.file_name, m.file_size) for m in oversized]}. "
f"Java uses RollingFileWriter to split; Python writes one file.")
self.assertGreater(
len(metas), 1,
f"Expected multiple manifest files but got {len(metas)} "
f"with total {sum(m.file_size for m in metas)} bytes")

mfm = ManifestFileManager(table)
all_entries = []
for meta in metas:
entries = mfm.read(meta.file_name)
self.assertEqual(len(entries), meta.num_added_files + meta.num_deleted_files)
all_entries.extend(entries)
self.assertEqual(len(all_entries), 200)

def test_rolling_write_with_skewed_entries(self):
target_size = 16 * 1024
manager = ManifestFileManager(self.table)
small = self._create_manifest_entry("small.parquet")
big = ManifestEntry(
kind=0, partition=_EMPTY_ROW, bucket=0, total_buckets=1,
file=DataFileMeta(
file_name="big.parquet", file_size=1024, row_count=100,
min_key=_EMPTY_ROW, max_key=_EMPTY_ROW,
key_stats=_EMPTY_STATS, value_stats=_EMPTY_STATS,
min_sequence_number=1, max_sequence_number=100,
schema_id=0, level=0,
extra_files=[f"extra-{i}.idx" for i in range(50)],
creation_time=Timestamp.from_epoch_millis(0),
delete_row_count=0, embedded_index=b'\x00' * 2000,
file_source=None, value_stats_cols=None,
external_path=None, first_row_id=None, write_cols=None,
),
)
entries = [big if i % 5 == 0 else small for i in range(300)]
metas = manager.rolling_write(entries, target_size, "manifest-skew")

max_allowed = target_size * 2
oversized = [m for m in metas if m.file_size > max_allowed]
self.assertEqual(
len(oversized), 0,
f"Skewed entries: {len(oversized)} file(s) exceed 2x target: "
f"{[(m.file_name, m.file_size) for m in oversized]}")
total_entries = sum(m.num_added_files + m.num_deleted_files for m in metas)
self.assertEqual(total_entries, 300)


class ManifestListManagerTest(_ManifestManagerSetup):
"""Tests for ManifestListManager."""
Expand Down
Loading
Loading