From 6ff1ac0628b60c70048fdb1e5b989b46244399a7 Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Thu, 28 May 2026 20:43:54 +0800 Subject: [PATCH 1/2] [lake] Fix DvTableReadableSnapshotRetriever scanning wrong snapshot due to ineffective options getBucketsWithoutL0AndWithL0() passed SCAN_SNAPSHOT_ID and BATCH_SCAN_MODE via table options to store().newScan(), but these options are only consumed by table-level scans (DataTableBatchScan), not by store-level scans (AbstractFileStoreScan). This means the scan always hit the latest snapshot instead of the specified one. Fix by using the direct .withSnapshot(snapshot) API on FileStoreScan, consistent with getBucketsWithFlushedL0() in the same file. Co-Authored-By: Claude Opus 4.6 --- .../utils/DvTableReadableSnapshotRetriever.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java index 54c3c7f948..3412829fa7 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java @@ -32,7 +32,6 @@ import org.apache.fluss.utils.ExceptionUtils; import org.apache.fluss.utils.types.Tuple2; -import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; @@ -495,16 +494,10 @@ private LakeSnapshot getOrFetchLakeSnapshot(long snapshotId, Map bucketsWithoutL0 = new HashSet<>(); Set bucketsWithL0 = new HashSet<>(); - // Scan the snapshot to get all splits including level0 - Map scanOptions = new HashMap<>(); - scanOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), String.valueOf(snapshot.id())); - // hacky: set batch scan mode to compact to make sure we can get l0 level files - scanOptions.put( - CoreOptions.BATCH_SCAN_MODE.key(), CoreOptions.BatchScanMode.COMPACT.getValue()); - + // Scan the snapshot to get all data files including L0 level files Map>> manifestsByBucket = FileStoreScan.Plan.groupByPartFiles( - fileStoreTable.copy(scanOptions).store().newScan().plan().files()); + fileStoreTable.store().newScan().withSnapshot(snapshot).plan().files()); for (Map.Entry>> manifestsByBucketEntry : manifestsByBucket.entrySet()) { From a7d7fdcb56399c5b5d07e5c4ce5fb4ea88269eae Mon Sep 17 00:00:00 2001 From: luoyuxia Date: Wed, 24 Jun 2026 15:27:02 +0800 Subject: [PATCH 2/2] [lake] Fix premature snapshot deletion for DV readable offsets For Paimon DV tables, getReadableSnapshotAndOffsets returns an earliestSnapshotIdToKeep telling Fluss which lake snapshots may be deleted. When the latest compacted snapshot had no L0 in any bucket, the code shortcut earliestSnapshotIdToKeep to that compacted snapshot's previous APPEND, assuming nothing earlier was needed. This was unsound: a bucket can be clean (no L0) in the current compacted snapshot yet still be anchored to an older snapshot - it was flushed earlier and has not been flushed since, so its base anchor (the previous APPEND of the latest snapshot that exactly holds its most-recently flushed L0) can be older than the compacted snapshot's previous APPEND. Once such a bucket later receives new L0, recomputation traces back to that older anchor; if it was already deleted, the retrieval returns null and readable offsets can no longer advance. Fix by also computing the base anchor for buckets without L0 and lowering earliestSnapshotIdToKeep to the minimum anchor across all buckets. This is best-effort: if a bucket's flush history has expired and its anchor cannot be determined, keep all previous snapshots rather than risk deleting a needed one; it never blocks the readable-offset advance. The shared anchor computation is extracted into findBaseAnchorAppendSnapshot, reused by both the with-L0 offset traversal and the new no-L0 retention pass. The partitioned test expectation that encoded the old, over-aggressive value is updated accordingly. Co-Authored-By: Claude Opus 4.8 --- .../DvTableReadableSnapshotRetriever.java | 209 +++++++++++++----- .../DvTableReadableSnapshotRetrieverTest.java | 9 +- 2 files changed, 159 insertions(+), 59 deletions(-) diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java index 3412829fa7..30ecfeda3f 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java @@ -255,19 +255,19 @@ public ReadableSnapshotResult getReadableSnapshotAndOffsets(long tieredSnapshotI return null; } - // We keep snapshots because for a compacted snapshot, if a bucket has L0, we find the - // snapshot that exactly holds those L0, then use that snapshot's previous APPEND's tiered - // offset as the readable offset (that offset is safe to read). When the current compacted - // snapshot has no L0 in any bucket, we do not traverse; for any later compact we would - // traverse to (going backwards in time), if some bucket has L0, the snapshot that exactly - // holds that L0 must be after the current compacted snapshot on the timeline. So that - // snapshot's previous APPEND cannot be earlier than the current compacted snapshot's - // previous APPEND. Therefore the minimum snapshot we need to keep is the current compact's - // previous APPEND; set earliestSnapshotIdToKeep to it so it is not deleted. Earlier - // snapshots may be safely deleted. - if (bucketsWithL0.isEmpty()) { - earliestSnapshotIdToKeep = compactedSnapshotPreviousAppendSnapshot.id(); - } + // The earliest snapshot we must keep is bounded by EVERY bucket's base anchor, i.e. the + // previous APPEND of the latest snapshot that exactly holds the L0 files of the bucket's + // most recent flush. A bucket's base anchor only moves forward when the bucket is flushed + // again; until then, a future recomputation (triggered once the bucket receives new L0) + // will trace back to that same anchor, so the anchor snapshot must stay retained. + // + // Buckets with L0 in the current compacted snapshot have their anchor found by the + // traversal below (which also derives their readable offset). Buckets without L0 take their + // readable offset from the latest tiered snapshot and are NOT traversed there, so their + // anchors are computed separately afterwards. We must not assume "no L0 in any bucket" + // means earlier snapshots are deletable: a bucket can be clean in the current compacted + // snapshot yet still be anchored to an older snapshot (it was flushed earlier and has not + // been flushed since). // for all buckets with l0, we need to find the latest compacted snapshot which flushed // the buckets, the per-bucket offset should be updated to the corresponding compacted @@ -304,52 +304,26 @@ public ReadableSnapshotResult getReadableSnapshotAndOffsets(long tieredSnapshotI continue; } if (!readableOffsets.containsKey(tb)) { - Snapshot sourceSnapshot = - findLatestSnapshotExactlyHoldingL0Files( - fileStoreTable, currentSnapshot); - // it happens if there is a compacted snapshot flush l0 files for a bucket, - // but the snapshot from which the compacted snapshot compact is expired - // it should happen rarely, we can't determine the readable offsets for this - // bucket, currently, we just return null to stop readable offset advance - // if it happen, compaction should work unexpected, warn it and reminds to - // increase snapshot retention - if (sourceSnapshot == null) { - LOG.warn( - "Cannot find snapshot holding L0 files flushed by compacted snapshot {} for bucket {}, " - + "the snapshot may have been expired. Consider increasing snapshot retention.", - currentSnapshot.id(), - tb); - return null; - } - - // we already find that for this bucket, which snapshot do the latest flush, - // the offset for the previous one append snapshot should be the readable - // offset - Snapshot previousAppendSnapshot = - sourceSnapshot.commitKind() == Snapshot.CommitKind.APPEND - ? sourceSnapshot - : findPreviousSnapshot( - sourceSnapshot.id(), Snapshot.CommitKind.APPEND); - - // Can't find previous APPEND snapshot, likely due to snapshot expiration. - // This happens when the snapshot holding flushed L0 files is a COMPACT - // snapshot, - // and all APPEND snapshots before it have been expired. + // The previous APPEND of the latest snapshot that still exactly holds the L0 + // files flushed by currentSnapshot is this bucket's base anchor; its tiered + // offset is the bucket's readable offset. + Snapshot previousAppendSnapshot = findBaseAnchorAppendSnapshot(currentSnapshot); + + // Can't determine the base anchor, likely due to snapshot expiration: either + // the snapshot holding the flushed L0 files, or all earlier APPEND snapshots, + // have been expired. We can't determine this bucket's readable offset, so stop + // advancing and return null. // - // TODO: Optimization - Store compacted snapshot offsets in Fluss - // Currently, we rely on Paimon to find the previous APPEND snapshot to get its - // offset. If Fluss stores offsets for all snapshots (including COMPACT - // snapshots), - // we could: - // 1. Use the sourceSnapshot's offset directly if it's stored in Fluss - // 2. Find any previous snapshot (COMPACT or APPEND) and use its offset - // 3. This would make the system more resilient to snapshot expiration + // TODO: Optimization - Store compacted snapshot offsets in Fluss so we don't + // need to find the previous APPEND snapshot to get its offset, making this + // resilient to snapshot expiration. if (previousAppendSnapshot == null) { LOG.warn( - "Cannot find previous APPEND snapshot before snapshot {} for bucket {}. " - + "This may be due to snapshot expiration. Consider increasing paimon snapshot retention.", - sourceSnapshot.id(), - tb); + "Cannot determine base anchor (previous APPEND) for bucket {} flushed by " + + "compacted snapshot {}. Snapshot history may have expired; " + + "consider increasing paimon snapshot retention.", + tb, + currentSnapshot.id()); return null; } @@ -403,6 +377,23 @@ public ReadableSnapshotResult getReadableSnapshotAndOffsets(long tieredSnapshotI return null; } + // Tighten earliestSnapshotIdToKeep using the base anchors of buckets without L0 as well. + // Their readable offsets come from the latest tiered snapshot, but their base was + // established by an earlier flush; once such a bucket receives new L0, a later + // recomputation traces back to that flush's anchor, so the anchor snapshot must be kept. + // + // This is best-effort: if a bucket's flush history has expired and its anchor cannot be + // determined, we conservatively keep all previous snapshots (KEEP_ALL_PREVIOUS) rather than + // risk deleting one that is still needed. We never fail the whole readable-offset advance + // here, since these buckets' offsets are already resolved from the latest tiered snapshot. + earliestSnapshotIdToKeep = + tightenEarliestSnapshotIdToKeepForBucketsWithoutL0( + bucketsWithoutL0, + flussTableBucketMapper, + latestCompactedSnapshot.id(), + earliestSnapshotId, + earliestSnapshotIdToKeep); + // we use the previous append snapshot tiered offset of the compacted snapshot as the // compacted snapshot tiered offsets LakeSnapshot tieredLakeSnapshot = @@ -425,6 +416,112 @@ public ReadableSnapshotResult getReadableSnapshotAndOffsets(long tieredSnapshotI earliestSnapshotIdToKeep); } + /** + * Lowers {@code earliestSnapshotIdToKeep} to also cover the base anchors of buckets that have + * no L0 files in the latest compacted snapshot. + * + *

A bucket without L0 has all of its data in base files, and its readable offset is taken + * from the latest tiered snapshot. However, that base was established by the bucket's most + * recent flush, and a later recomputation (once the bucket receives new L0) will trace back to + * that flush's anchor snapshot (the previous APPEND of the latest snapshot that exactly holds + * the flushed L0). That anchor snapshot must therefore stay retained until the bucket is + * flushed again. + * + *

This is best-effort: if a bucket's flush history has expired and its anchor cannot be + * determined, this conservatively returns {@link LakeCommitResult#KEEP_ALL_PREVIOUS} so that no + * snapshot is deleted, rather than risk deleting one that is still needed. + * + * @param bucketsWithoutL0 buckets with no L0 in the latest compacted snapshot + * @param flussTableBucketMapper mapper from Paimon partition-bucket to Fluss table bucket + * @param latestCompactedSnapshotId the latest compacted snapshot id (traversal start) + * @param earliestSnapshotId the earliest snapshot id still present in Paimon (traversal end) + * @param earliestSnapshotIdToKeep the current value computed from buckets with L0 + * @return the tightened earliest snapshot id to keep + */ + private long tightenEarliestSnapshotIdToKeepForBucketsWithoutL0( + Set bucketsWithoutL0, + FlussTableBucketMapper flussTableBucketMapper, + long latestCompactedSnapshotId, + long earliestSnapshotId, + long earliestSnapshotIdToKeep) + throws IOException { + if (bucketsWithoutL0.isEmpty()) { + return earliestSnapshotIdToKeep; + } + + // Only track buckets that map to a Fluss bucket; unmappable ones (e.g. a partition not in + // Fluss) never need recomputation and must not pin retention. + Set bucketsToAnchor = new HashSet<>(); + for (PaimonPartitionBucket bucket : bucketsWithoutL0) { + if (flussTableBucketMapper.toTableBucket(bucket) != null) { + bucketsToAnchor.add(bucket); + } + } + if (bucketsToAnchor.isEmpty()) { + return earliestSnapshotIdToKeep; + } + boolean allAnchorsResolved = true; + + for (long currentSnapshotId = latestCompactedSnapshotId; + currentSnapshotId >= earliestSnapshotId && !bucketsToAnchor.isEmpty(); + currentSnapshotId--) { + Snapshot currentSnapshot = snapshotManager.tryGetSnapshot(currentSnapshotId); + if (currentSnapshot == null + || currentSnapshot.commitKind() != Snapshot.CommitKind.COMPACT) { + continue; + } + // The first flush encountered going backwards is the bucket's most recent flush. + for (PaimonPartitionBucket partitionBucket : getBucketsWithFlushedL0(currentSnapshot)) { + if (!bucketsToAnchor.remove(partitionBucket)) { + continue; + } + Snapshot previousAppendSnapshot = findBaseAnchorAppendSnapshot(currentSnapshot); + if (previousAppendSnapshot == null) { + // can't determine this bucket's base anchor; don't tighten retention + allAnchorsResolved = false; + continue; + } + if (earliestSnapshotIdToKeep <= 0 + || previousAppendSnapshot.id() < earliestSnapshotIdToKeep) { + earliestSnapshotIdToKeep = previousAppendSnapshot.id(); + } + } + } + + if (!bucketsToAnchor.isEmpty()) { + // some bucket's most recent flush was not found within the retained snapshots + allAnchorsResolved = false; + } + + return allAnchorsResolved ? earliestSnapshotIdToKeep : LakeCommitResult.KEEP_ALL_PREVIOUS; + } + + /** + * Finds the base anchor APPEND snapshot for the bucket(s) whose L0 files were flushed by the + * given compacted snapshot. + * + *

The anchor is the previous APPEND of the latest snapshot that still exactly holds those + * flushed L0 files (see {@link PaimonDvTableUtils#findLatestSnapshotExactlyHoldingL0Files}). + * Its tiered offset is the bucket's readable offset, and it must stay retained until the bucket + * is flushed again. + * + * @param flushingCompactedSnapshot the COMPACT snapshot that flushed the bucket's L0 files + * @return the base anchor APPEND snapshot, or {@code null} if it cannot be determined (e.g. the + * holding snapshot or all earlier APPEND snapshots have been expired) + */ + @Nullable + private Snapshot findBaseAnchorAppendSnapshot(Snapshot flushingCompactedSnapshot) + throws IOException { + Snapshot sourceSnapshot = + findLatestSnapshotExactlyHoldingL0Files(fileStoreTable, flushingCompactedSnapshot); + if (sourceSnapshot == null) { + return null; + } + return sourceSnapshot.commitKind() == Snapshot.CommitKind.APPEND + ? sourceSnapshot + : findPreviousSnapshot(sourceSnapshot.id(), Snapshot.CommitKind.APPEND); + } + /** * Checks that the given lake snapshot belongs to the current table (same table id). Throws when * the table may have been dropped and re-created with a different id; the tiering committer diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java index 0ce3d5b8ef..ea4fdce282 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java @@ -732,9 +732,12 @@ void testGetReadableSnapshotAndOffsetsForPartitionedTable() throws Exception { assertThat(readableSnapshotAndOffsets.getReadableOffsets()) .isEqualTo(expectedReadableOffsets); - // After compact 13, all buckets have no L0 in the compacted snapshot, we only need - // to keep the previous append snapshot 12 - assertThat(readableSnapshotAndOffsets.getEarliestSnapshotIdToKeep()).isEqualTo(snapshot12); + // All buckets have no L0 in the compacted snapshot, but each bucket's base is anchored to + // the previous APPEND of its most recent flush: partition0/bucket0 -> snapshot6 (flushed at + // s7, never flushed since), partition1/bucket0 -> snapshot9, partition0/bucket1 -> + // snapshot12. The earliest snapshot we must keep is the minimum of these anchors + // (snapshot6), since a future recomputation for partition0/bucket0 would trace back to it. + assertThat(readableSnapshotAndOffsets.getEarliestSnapshotIdToKeep()).isEqualTo(snapshot6); } private long latestSnapshot(FileStoreTable fileStoreTable) {