Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 34 additions & 24 deletions paimon-python/pypaimon/tests/overwrite_changes_cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
recompute the files to delete. Under concurrent writers this made each retry as
expensive as the first attempt. OverwriteChangesProvider caches the existing
files of the target partitions and, on retry, reuses them when the snapshots in
between are all APPEND and have not touched the target partitions (verified by a
cheap DELTA probe), instead of a full re-scan.
between can be applied from target-partition DELTA manifests, instead of a full
re-scan.

The test deterministically forces ``K`` conflicts, each advancing the latest
snapshot with an append to an unrelated partition, and asserts the full scan
Expand Down Expand Up @@ -87,7 +87,7 @@ def test_overwrite_scan_runs_once_not_once_per_retry(self):
# --- count provider full scans and delta probes (class-level spies) ---
counts = {'full_scan': 0, 'probe': 0}
orig_full = OverwriteChangesProvider._full_scan
orig_probe = OverwriteChangesProvider._delta_touches_target
orig_probe = OverwriteChangesProvider._read_delta_entries

def spy_full(self, *a, **k):
counts['full_scan'] += 1
Expand All @@ -112,13 +112,13 @@ def patched_cas(snapshot, statistics):

fsc.snapshot_commit.commit = patched_cas
OverwriteChangesProvider._full_scan = spy_full
OverwriteChangesProvider._delta_touches_target = spy_probe
OverwriteChangesProvider._read_delta_entries = spy_probe
try:
c.commit(messages)
c.close()
finally:
OverwriteChangesProvider._full_scan = orig_full
OverwriteChangesProvider._delta_touches_target = orig_probe
OverwriteChangesProvider._read_delta_entries = orig_probe

# Harness sanity: we really did force K conflicts and then converged.
self.assertEqual(cas['fails'], K, "expected exactly K forced conflicts")
Expand Down Expand Up @@ -146,8 +146,9 @@ def patched_cas(snapshot, statistics):
self.assertEqual(sorted(actual[actual['f0'] == 2]['f1'].tolist()), ['c'])
self.assertEqual(len(actual[actual['f0'] == 99]), K)

def test_cache_rebuilt_when_concurrent_append_hits_target_partition(self):
# Concurrent appends hit the overwrite target; probe sees it touched -> rebuild.
def test_cache_applies_delta_when_concurrent_append_hits_target_partition(self):
# Concurrent appends hit the overwrite target; retry advances cached
# target-partition state from APPEND deltas instead of rebuilding.
K = 3

wb = self.table.new_batch_write_builder().overwrite({'f0': 1})
Expand All @@ -160,7 +161,7 @@ def test_cache_rebuilt_when_concurrent_append_hits_target_partition(self):

counts = {'full_scan': 0, 'probe': 0}
orig_full = OverwriteChangesProvider._full_scan
orig_probe = OverwriteChangesProvider._delta_touches_target
orig_probe = OverwriteChangesProvider._read_delta_entries

def spy_full(self, *a, **k):
counts['full_scan'] += 1
Expand All @@ -182,20 +183,20 @@ def patched_cas(snapshot, statistics):

fsc.snapshot_commit.commit = patched_cas
OverwriteChangesProvider._full_scan = spy_full
OverwriteChangesProvider._delta_touches_target = spy_probe
OverwriteChangesProvider._read_delta_entries = spy_probe
try:
c.commit(messages)
c.close()
finally:
OverwriteChangesProvider._full_scan = orig_full
OverwriteChangesProvider._delta_touches_target = orig_probe
OverwriteChangesProvider._read_delta_entries = orig_probe

self.assertEqual(cas['fails'], K, "expected exactly K forced conflicts")

# Target touched each retry => cache rebuilds; full scan runs every attempt.
self.assertEqual(counts['full_scan'], K + 1,
f"full scan ran {counts['full_scan']}x; cache must rebuild "
f"when the target partition is touched")
# Target APPEND deltas can be applied to cached state; no full rebuild.
self.assertEqual(counts['full_scan'], 1,
f"full scan ran {counts['full_scan']}x; APPEND deltas "
f"should advance cached target state")
self.assertEqual(counts['probe'], K,
f"delta probe ran {counts['probe']}x; once per retry (= K)")

Expand Down Expand Up @@ -248,23 +249,31 @@ def patched_cas(snapshot, statistics):
self.assertEqual(cas['fails'], K, "expected exactly K forced conflicts")
return captured['provider']

def test_cache_rebuilt_on_non_append_snapshot(self):
# A non-APPEND (OVERWRITE) snapshot between retries forces a rebuild even
# though it only touches an unrelated partition.
def test_cache_applies_delta_when_concurrent_overwrite_hits_target_partition(self):
# Concurrent overwrites produce DELETE+ADD deltas. They can be applied
# to the cached target state just like APPEND deltas.
K = 2
wb = self.table.new_batch_write_builder().overwrite({'f0': 1})
w = wb.new_write()
c = wb.new_commit()
w.write_pandas(pd.DataFrame({'f0': [1], 'f1': ['new']}))
provider = self._run_with_conflicts(
c, w.prepare_commit(), K,
lambda i: self._overwrite_partition(99, f'z{i}'))
lambda i: self._overwrite_partition(1, f'z{i}'))

self.assertEqual(provider.full_scan_count, K + 1) # rebuilt every retry
self.assertEqual(provider.delta_probe_count, K) # probed, bailed at kind check
self.assertEqual(provider.full_scan_count, 1)
self.assertEqual(provider.delta_probe_count, K)
self.assertEqual(provider.delta_apply_count, K)

read_builder = self.table.new_read_builder()
actual = read_builder.new_read().to_pandas(
read_builder.new_scan().plan().splits())
self.assertEqual(sorted(actual[actual['f0'] == 1]['f1'].tolist()), ['new'])
self.assertEqual(sorted(actual[actual['f0'] == 2]['f1'].tolist()), ['c'])

def test_whole_table_overwrite_always_full_scans(self):
# Whole-table overwrite (no partition filter) can never reuse the cache.
def test_whole_table_overwrite_advances_by_append_delta(self):
# Whole-table overwrite (no partition filter) can still advance through
# APPEND deltas because every appended file belongs to the target state.
K = 2
wb = self.table.new_batch_write_builder().overwrite()
w = wb.new_write()
Expand All @@ -274,8 +283,9 @@ def test_whole_table_overwrite_always_full_scans(self):
c, w.prepare_commit(), K,
lambda i: self._append(pd.DataFrame({'f0': [99], 'f1': [f'x{i}']})))

self.assertEqual(provider.full_scan_count, K + 1) # null filter -> always full scan
self.assertEqual(provider.delta_probe_count, 0) # never enters the probe loop
self.assertEqual(provider.full_scan_count, 1)
self.assertEqual(provider.delta_probe_count, K)
self.assertEqual(provider.delta_apply_count, K)

read_builder = self.table.new_read_builder()
actual = read_builder.new_read().to_pandas(
Expand Down
73 changes: 40 additions & 33 deletions paimon-python/pypaimon/write/commit/overwrite_changes_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from typing import List, Optional

from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.schema.file_entry import FileEntry
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
from pypaimon.read.scanner.file_scanner import FileScanner
from pypaimon.snapshot.snapshot import Snapshot
Expand All @@ -28,11 +30,9 @@ class OverwriteChangesProvider:
caching the existing files of the target partitions across commit retries
to avoid repeated full scans.

On retry, if the latest snapshot advanced, the cache is reused only when the
snapshots in between are all APPEND and have not touched the target
partitions; otherwise it is rebuilt by a full scan. A whole-table overwrite
(``partition_filter is None``) always rebuilds. Mirrors Java
``OverwriteChangesProvider`` (#7894).
On retry, if the latest snapshot advanced, the cached state is updated by
applying the target-partition delta manifests. Missing snapshots or
unreadable deltas fall back to a full scan.
"""

def __init__(self, table, manifest_list_manager, snapshot_manager,
Expand All @@ -46,9 +46,10 @@ def __init__(self, table, manifest_list_manager, snapshot_manager,
self._cached_snapshot: Optional[Snapshot] = None
self._cached_entries: List[ManifestEntry] = []

# Counters for tests / observability (mirrors Java @VisibleForTesting).
# Counters for tests / observability.
self.full_scan_count = 0
self.delta_probe_count = 0
self.delta_apply_count = 0

def provide(self, latest_snapshot: Optional[Snapshot]) -> List[ManifestEntry]:
if latest_snapshot is None:
Expand All @@ -63,7 +64,7 @@ def provide(self, latest_snapshot: Optional[Snapshot]) -> List[ManifestEntry]:
f"Cached snapshot id {self._cached_snapshot.id} is greater than "
f"latest snapshot id {latest_snapshot.id}")
elif self._cached_snapshot.id < latest_snapshot.id:
if not self._can_use_cache(latest_snapshot):
if not self._advance_cache(latest_snapshot):
self._cached_entries = self._full_scan(latest_snapshot)
self._cached_snapshot = latest_snapshot
# cached_snapshot.id == latest_snapshot.id -> reuse cache as-is
Expand All @@ -76,39 +77,45 @@ def _full_scan(self, latest_snapshot: Snapshot) -> List[ManifestEntry]:
partition_predicate=self.partition_filter)
.read_manifest_entries(self.manifest_list_manager.read_all(latest_snapshot)))

def _can_use_cache(self, latest_snapshot: Snapshot) -> bool:
if self.partition_filter is None:
# Whole-table overwrite: any concurrent commit touches the target,
# so skip the delta probe and force a full scan.
return False
for snapshot_id in range(self._cached_snapshot.id + 1, latest_snapshot.id + 1):
self.delta_probe_count += 1
try:
def _advance_cache(self, latest_snapshot: Snapshot) -> bool:
pending_entries = []
applied_count = 0
manifest_file_manager = ManifestFileManager(self.table)
try:
for snapshot_id in range(self._cached_snapshot.id + 1, latest_snapshot.id + 1):
self.delta_probe_count += 1
snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id)
if snapshot is None:
return False
if snapshot.commit_kind != "APPEND":
# Only APPEND snapshots produce a reliable DELTA manifest for
# probing; other kinds may rewrite/reorganize manifests.
return False
if self._delta_touches_target(snapshot):
return False
except Exception:
# e.g. the snapshot is being expired; a full scan is always safe.
return False
entries = self._read_delta_entries(snapshot, manifest_file_manager)
if entries:
pending_entries.extend(entries)
applied_count += 1
if pending_entries:
self._cached_entries = list(
FileEntry.merge_entries(self._cached_entries + pending_entries))
self.delta_apply_count += applied_count
except Exception:
# e.g. the snapshot is being expired; a full scan is always safe.
return False
return True

def _delta_touches_target(self, snapshot: Snapshot) -> bool:
def _read_delta_entries(
self, snapshot: Snapshot,
manifest_file_manager: ManifestFileManager) -> List[ManifestEntry]:
delta_manifests = self.manifest_list_manager.read_delta(snapshot)
if not delta_manifests:
return False
# Only APPEND snapshots are probed (see _can_use_cache), so the delta has
# no standalone DELETEs; FileScanner's partition predicate prunes at the
# manifest-file level before reading entries.
entries = (FileScanner(self.table, lambda: ([], None),
partition_predicate=self.partition_filter)
.read_manifest_entries(delta_manifests))
return len(entries) > 0
return []
# Read raw delta entries so DELETE entries from OVERWRITE / COMPACT
# snapshots are applied instead of being discarded by FileScanner.
entries = []
for manifest_file in delta_manifests:
for entry in manifest_file_manager.read(manifest_file.file_name):
if (self.partition_filter is not None
and not self.partition_filter.test(entry.partition)):
continue
entries.append(entry)
return entries

def _build_result(self, existing_entries: List[ManifestEntry]) -> List[ManifestEntry]:
entries = []
Expand Down
Loading