From 39dabb076b37f3320937c09477cbdb4362cbfc26 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Sun, 28 Jun 2026 00:00:03 +0800 Subject: [PATCH] [python] Apply overwrite delta changes on commit retry --- .../tests/overwrite_changes_cache_test.py | 58 +++++++++------ .../commit/overwrite_changes_provider.py | 73 ++++++++++--------- 2 files changed, 74 insertions(+), 57 deletions(-) diff --git a/paimon-python/pypaimon/tests/overwrite_changes_cache_test.py b/paimon-python/pypaimon/tests/overwrite_changes_cache_test.py index c7fb16e628b0..2bbef578e627 100644 --- a/paimon-python/pypaimon/tests/overwrite_changes_cache_test.py +++ b/paimon-python/pypaimon/tests/overwrite_changes_cache_test.py @@ -21,8 +21,8 @@ recompute the files to delete. Under concurrent writers this made each retry as expensive as the first attempt. OverwriteChangesProvider caches the existing files of the target partitions and, on retry, reuses them when the snapshots in -between are all APPEND and have not touched the target partitions (verified by a -cheap DELTA probe), instead of a full re-scan. +between can be applied from target-partition DELTA manifests, instead of a full +re-scan. The test deterministically forces ``K`` conflicts, each advancing the latest snapshot with an append to an unrelated partition, and asserts the full scan @@ -87,7 +87,7 @@ def test_overwrite_scan_runs_once_not_once_per_retry(self): # --- count provider full scans and delta probes (class-level spies) --- counts = {'full_scan': 0, 'probe': 0} orig_full = OverwriteChangesProvider._full_scan - orig_probe = OverwriteChangesProvider._delta_touches_target + orig_probe = OverwriteChangesProvider._read_delta_entries def spy_full(self, *a, **k): counts['full_scan'] += 1 @@ -112,13 +112,13 @@ def patched_cas(snapshot, statistics): fsc.snapshot_commit.commit = patched_cas OverwriteChangesProvider._full_scan = spy_full - OverwriteChangesProvider._delta_touches_target = spy_probe + OverwriteChangesProvider._read_delta_entries = spy_probe try: c.commit(messages) c.close() finally: OverwriteChangesProvider._full_scan = orig_full - OverwriteChangesProvider._delta_touches_target = orig_probe + OverwriteChangesProvider._read_delta_entries = orig_probe # Harness sanity: we really did force K conflicts and then converged. self.assertEqual(cas['fails'], K, "expected exactly K forced conflicts") @@ -146,8 +146,9 @@ def patched_cas(snapshot, statistics): self.assertEqual(sorted(actual[actual['f0'] == 2]['f1'].tolist()), ['c']) self.assertEqual(len(actual[actual['f0'] == 99]), K) - def test_cache_rebuilt_when_concurrent_append_hits_target_partition(self): - # Concurrent appends hit the overwrite target; probe sees it touched -> rebuild. + def test_cache_applies_delta_when_concurrent_append_hits_target_partition(self): + # Concurrent appends hit the overwrite target; retry advances cached + # target-partition state from APPEND deltas instead of rebuilding. K = 3 wb = self.table.new_batch_write_builder().overwrite({'f0': 1}) @@ -160,7 +161,7 @@ def test_cache_rebuilt_when_concurrent_append_hits_target_partition(self): counts = {'full_scan': 0, 'probe': 0} orig_full = OverwriteChangesProvider._full_scan - orig_probe = OverwriteChangesProvider._delta_touches_target + orig_probe = OverwriteChangesProvider._read_delta_entries def spy_full(self, *a, **k): counts['full_scan'] += 1 @@ -182,20 +183,20 @@ def patched_cas(snapshot, statistics): fsc.snapshot_commit.commit = patched_cas OverwriteChangesProvider._full_scan = spy_full - OverwriteChangesProvider._delta_touches_target = spy_probe + OverwriteChangesProvider._read_delta_entries = spy_probe try: c.commit(messages) c.close() finally: OverwriteChangesProvider._full_scan = orig_full - OverwriteChangesProvider._delta_touches_target = orig_probe + OverwriteChangesProvider._read_delta_entries = orig_probe self.assertEqual(cas['fails'], K, "expected exactly K forced conflicts") - # Target touched each retry => cache rebuilds; full scan runs every attempt. - self.assertEqual(counts['full_scan'], K + 1, - f"full scan ran {counts['full_scan']}x; cache must rebuild " - f"when the target partition is touched") + # Target APPEND deltas can be applied to cached state; no full rebuild. + self.assertEqual(counts['full_scan'], 1, + f"full scan ran {counts['full_scan']}x; APPEND deltas " + f"should advance cached target state") self.assertEqual(counts['probe'], K, f"delta probe ran {counts['probe']}x; once per retry (= K)") @@ -248,9 +249,9 @@ def patched_cas(snapshot, statistics): self.assertEqual(cas['fails'], K, "expected exactly K forced conflicts") return captured['provider'] - def test_cache_rebuilt_on_non_append_snapshot(self): - # A non-APPEND (OVERWRITE) snapshot between retries forces a rebuild even - # though it only touches an unrelated partition. + def test_cache_applies_delta_when_concurrent_overwrite_hits_target_partition(self): + # Concurrent overwrites produce DELETE+ADD deltas. They can be applied + # to the cached target state just like APPEND deltas. K = 2 wb = self.table.new_batch_write_builder().overwrite({'f0': 1}) w = wb.new_write() @@ -258,13 +259,21 @@ def test_cache_rebuilt_on_non_append_snapshot(self): w.write_pandas(pd.DataFrame({'f0': [1], 'f1': ['new']})) provider = self._run_with_conflicts( c, w.prepare_commit(), K, - lambda i: self._overwrite_partition(99, f'z{i}')) + lambda i: self._overwrite_partition(1, f'z{i}')) - self.assertEqual(provider.full_scan_count, K + 1) # rebuilt every retry - self.assertEqual(provider.delta_probe_count, K) # probed, bailed at kind check + self.assertEqual(provider.full_scan_count, 1) + self.assertEqual(provider.delta_probe_count, K) + self.assertEqual(provider.delta_apply_count, K) + + read_builder = self.table.new_read_builder() + actual = read_builder.new_read().to_pandas( + read_builder.new_scan().plan().splits()) + self.assertEqual(sorted(actual[actual['f0'] == 1]['f1'].tolist()), ['new']) + self.assertEqual(sorted(actual[actual['f0'] == 2]['f1'].tolist()), ['c']) - def test_whole_table_overwrite_always_full_scans(self): - # Whole-table overwrite (no partition filter) can never reuse the cache. + def test_whole_table_overwrite_advances_by_append_delta(self): + # Whole-table overwrite (no partition filter) can still advance through + # APPEND deltas because every appended file belongs to the target state. K = 2 wb = self.table.new_batch_write_builder().overwrite() w = wb.new_write() @@ -274,8 +283,9 @@ def test_whole_table_overwrite_always_full_scans(self): c, w.prepare_commit(), K, lambda i: self._append(pd.DataFrame({'f0': [99], 'f1': [f'x{i}']}))) - self.assertEqual(provider.full_scan_count, K + 1) # null filter -> always full scan - self.assertEqual(provider.delta_probe_count, 0) # never enters the probe loop + self.assertEqual(provider.full_scan_count, 1) + self.assertEqual(provider.delta_probe_count, K) + self.assertEqual(provider.delta_apply_count, K) read_builder = self.table.new_read_builder() actual = read_builder.new_read().to_pandas( diff --git a/paimon-python/pypaimon/write/commit/overwrite_changes_provider.py b/paimon-python/pypaimon/write/commit/overwrite_changes_provider.py index a76d5936f17b..9a20dc008bd6 100644 --- a/paimon-python/pypaimon/write/commit/overwrite_changes_provider.py +++ b/paimon-python/pypaimon/write/commit/overwrite_changes_provider.py @@ -17,6 +17,8 @@ from typing import List, Optional +from pypaimon.manifest.manifest_file_manager import ManifestFileManager +from pypaimon.manifest.schema.file_entry import FileEntry from pypaimon.manifest.schema.manifest_entry import ManifestEntry from pypaimon.read.scanner.file_scanner import FileScanner from pypaimon.snapshot.snapshot import Snapshot @@ -28,11 +30,9 @@ class OverwriteChangesProvider: caching the existing files of the target partitions across commit retries to avoid repeated full scans. - On retry, if the latest snapshot advanced, the cache is reused only when the - snapshots in between are all APPEND and have not touched the target - partitions; otherwise it is rebuilt by a full scan. A whole-table overwrite - (``partition_filter is None``) always rebuilds. Mirrors Java - ``OverwriteChangesProvider`` (#7894). + On retry, if the latest snapshot advanced, the cached state is updated by + applying the target-partition delta manifests. Missing snapshots or + unreadable deltas fall back to a full scan. """ def __init__(self, table, manifest_list_manager, snapshot_manager, @@ -46,9 +46,10 @@ def __init__(self, table, manifest_list_manager, snapshot_manager, self._cached_snapshot: Optional[Snapshot] = None self._cached_entries: List[ManifestEntry] = [] - # Counters for tests / observability (mirrors Java @VisibleForTesting). + # Counters for tests / observability. self.full_scan_count = 0 self.delta_probe_count = 0 + self.delta_apply_count = 0 def provide(self, latest_snapshot: Optional[Snapshot]) -> List[ManifestEntry]: if latest_snapshot is None: @@ -63,7 +64,7 @@ def provide(self, latest_snapshot: Optional[Snapshot]) -> List[ManifestEntry]: f"Cached snapshot id {self._cached_snapshot.id} is greater than " f"latest snapshot id {latest_snapshot.id}") elif self._cached_snapshot.id < latest_snapshot.id: - if not self._can_use_cache(latest_snapshot): + if not self._advance_cache(latest_snapshot): self._cached_entries = self._full_scan(latest_snapshot) self._cached_snapshot = latest_snapshot # cached_snapshot.id == latest_snapshot.id -> reuse cache as-is @@ -76,39 +77,45 @@ def _full_scan(self, latest_snapshot: Snapshot) -> List[ManifestEntry]: partition_predicate=self.partition_filter) .read_manifest_entries(self.manifest_list_manager.read_all(latest_snapshot))) - def _can_use_cache(self, latest_snapshot: Snapshot) -> bool: - if self.partition_filter is None: - # Whole-table overwrite: any concurrent commit touches the target, - # so skip the delta probe and force a full scan. - return False - for snapshot_id in range(self._cached_snapshot.id + 1, latest_snapshot.id + 1): - self.delta_probe_count += 1 - try: + def _advance_cache(self, latest_snapshot: Snapshot) -> bool: + pending_entries = [] + applied_count = 0 + manifest_file_manager = ManifestFileManager(self.table) + try: + for snapshot_id in range(self._cached_snapshot.id + 1, latest_snapshot.id + 1): + self.delta_probe_count += 1 snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id) if snapshot is None: return False - if snapshot.commit_kind != "APPEND": - # Only APPEND snapshots produce a reliable DELTA manifest for - # probing; other kinds may rewrite/reorganize manifests. - return False - if self._delta_touches_target(snapshot): - return False - except Exception: - # e.g. the snapshot is being expired; a full scan is always safe. - return False + entries = self._read_delta_entries(snapshot, manifest_file_manager) + if entries: + pending_entries.extend(entries) + applied_count += 1 + if pending_entries: + self._cached_entries = list( + FileEntry.merge_entries(self._cached_entries + pending_entries)) + self.delta_apply_count += applied_count + except Exception: + # e.g. the snapshot is being expired; a full scan is always safe. + return False return True - def _delta_touches_target(self, snapshot: Snapshot) -> bool: + def _read_delta_entries( + self, snapshot: Snapshot, + manifest_file_manager: ManifestFileManager) -> List[ManifestEntry]: delta_manifests = self.manifest_list_manager.read_delta(snapshot) if not delta_manifests: - return False - # Only APPEND snapshots are probed (see _can_use_cache), so the delta has - # no standalone DELETEs; FileScanner's partition predicate prunes at the - # manifest-file level before reading entries. - entries = (FileScanner(self.table, lambda: ([], None), - partition_predicate=self.partition_filter) - .read_manifest_entries(delta_manifests)) - return len(entries) > 0 + return [] + # Read raw delta entries so DELETE entries from OVERWRITE / COMPACT + # snapshots are applied instead of being discarded by FileScanner. + entries = [] + for manifest_file in delta_manifests: + for entry in manifest_file_manager.read(manifest_file.file_name): + if (self.partition_filter is not None + and not self.partition_filter.test(entry.partition)): + continue + entries.append(entry) + return entries def _build_result(self, existing_entries: List[ManifestEntry]) -> List[ManifestEntry]: entries = []