diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java index 54c3c7f948..30ecfeda3f 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java @@ -32,7 +32,6 @@ import org.apache.fluss.utils.ExceptionUtils; import org.apache.fluss.utils.types.Tuple2; -import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; @@ -256,19 +255,19 @@ public ReadableSnapshotResult getReadableSnapshotAndOffsets(long tieredSnapshotI return null; } - // We keep snapshots because for a compacted snapshot, if a bucket has L0, we find the - // snapshot that exactly holds those L0, then use that snapshot's previous APPEND's tiered - // offset as the readable offset (that offset is safe to read). When the current compacted - // snapshot has no L0 in any bucket, we do not traverse; for any later compact we would - // traverse to (going backwards in time), if some bucket has L0, the snapshot that exactly - // holds that L0 must be after the current compacted snapshot on the timeline. So that - // snapshot's previous APPEND cannot be earlier than the current compacted snapshot's - // previous APPEND. Therefore the minimum snapshot we need to keep is the current compact's - // previous APPEND; set earliestSnapshotIdToKeep to it so it is not deleted. Earlier - // snapshots may be safely deleted. - if (bucketsWithL0.isEmpty()) { - earliestSnapshotIdToKeep = compactedSnapshotPreviousAppendSnapshot.id(); - } + // The earliest snapshot we must keep is bounded by EVERY bucket's base anchor, i.e. the + // previous APPEND of the latest snapshot that exactly holds the L0 files of the bucket's + // most recent flush. A bucket's base anchor only moves forward when the bucket is flushed + // again; until then, a future recomputation (triggered once the bucket receives new L0) + // will trace back to that same anchor, so the anchor snapshot must stay retained. + // + // Buckets with L0 in the current compacted snapshot have their anchor found by the + // traversal below (which also derives their readable offset). Buckets without L0 take their + // readable offset from the latest tiered snapshot and are NOT traversed there, so their + // anchors are computed separately afterwards. We must not assume "no L0 in any bucket" + // means earlier snapshots are deletable: a bucket can be clean in the current compacted + // snapshot yet still be anchored to an older snapshot (it was flushed earlier and has not + // been flushed since). // for all buckets with l0, we need to find the latest compacted snapshot which flushed // the buckets, the per-bucket offset should be updated to the corresponding compacted @@ -305,52 +304,26 @@ public ReadableSnapshotResult getReadableSnapshotAndOffsets(long tieredSnapshotI continue; } if (!readableOffsets.containsKey(tb)) { - Snapshot sourceSnapshot = - findLatestSnapshotExactlyHoldingL0Files( - fileStoreTable, currentSnapshot); - // it happens if there is a compacted snapshot flush l0 files for a bucket, - // but the snapshot from which the compacted snapshot compact is expired - // it should happen rarely, we can't determine the readable offsets for this - // bucket, currently, we just return null to stop readable offset advance - // if it happen, compaction should work unexpected, warn it and reminds to - // increase snapshot retention - if (sourceSnapshot == null) { - LOG.warn( - "Cannot find snapshot holding L0 files flushed by compacted snapshot {} for bucket {}, " - + "the snapshot may have been expired. Consider increasing snapshot retention.", - currentSnapshot.id(), - tb); - return null; - } - - // we already find that for this bucket, which snapshot do the latest flush, - // the offset for the previous one append snapshot should be the readable - // offset - Snapshot previousAppendSnapshot = - sourceSnapshot.commitKind() == Snapshot.CommitKind.APPEND - ? sourceSnapshot - : findPreviousSnapshot( - sourceSnapshot.id(), Snapshot.CommitKind.APPEND); - - // Can't find previous APPEND snapshot, likely due to snapshot expiration. - // This happens when the snapshot holding flushed L0 files is a COMPACT - // snapshot, - // and all APPEND snapshots before it have been expired. + // The previous APPEND of the latest snapshot that still exactly holds the L0 + // files flushed by currentSnapshot is this bucket's base anchor; its tiered + // offset is the bucket's readable offset. + Snapshot previousAppendSnapshot = findBaseAnchorAppendSnapshot(currentSnapshot); + + // Can't determine the base anchor, likely due to snapshot expiration: either + // the snapshot holding the flushed L0 files, or all earlier APPEND snapshots, + // have been expired. We can't determine this bucket's readable offset, so stop + // advancing and return null. // - // TODO: Optimization - Store compacted snapshot offsets in Fluss - // Currently, we rely on Paimon to find the previous APPEND snapshot to get its - // offset. If Fluss stores offsets for all snapshots (including COMPACT - // snapshots), - // we could: - // 1. Use the sourceSnapshot's offset directly if it's stored in Fluss - // 2. Find any previous snapshot (COMPACT or APPEND) and use its offset - // 3. This would make the system more resilient to snapshot expiration + // TODO: Optimization - Store compacted snapshot offsets in Fluss so we don't + // need to find the previous APPEND snapshot to get its offset, making this + // resilient to snapshot expiration. if (previousAppendSnapshot == null) { LOG.warn( - "Cannot find previous APPEND snapshot before snapshot {} for bucket {}. " - + "This may be due to snapshot expiration. Consider increasing paimon snapshot retention.", - sourceSnapshot.id(), - tb); + "Cannot determine base anchor (previous APPEND) for bucket {} flushed by " + + "compacted snapshot {}. Snapshot history may have expired; " + + "consider increasing paimon snapshot retention.", + tb, + currentSnapshot.id()); return null; } @@ -404,6 +377,23 @@ public ReadableSnapshotResult getReadableSnapshotAndOffsets(long tieredSnapshotI return null; } + // Tighten earliestSnapshotIdToKeep using the base anchors of buckets without L0 as well. + // Their readable offsets come from the latest tiered snapshot, but their base was + // established by an earlier flush; once such a bucket receives new L0, a later + // recomputation traces back to that flush's anchor, so the anchor snapshot must be kept. + // + // This is best-effort: if a bucket's flush history has expired and its anchor cannot be + // determined, we conservatively keep all previous snapshots (KEEP_ALL_PREVIOUS) rather than + // risk deleting one that is still needed. We never fail the whole readable-offset advance + // here, since these buckets' offsets are already resolved from the latest tiered snapshot. + earliestSnapshotIdToKeep = + tightenEarliestSnapshotIdToKeepForBucketsWithoutL0( + bucketsWithoutL0, + flussTableBucketMapper, + latestCompactedSnapshot.id(), + earliestSnapshotId, + earliestSnapshotIdToKeep); + // we use the previous append snapshot tiered offset of the compacted snapshot as the // compacted snapshot tiered offsets LakeSnapshot tieredLakeSnapshot = @@ -426,6 +416,112 @@ public ReadableSnapshotResult getReadableSnapshotAndOffsets(long tieredSnapshotI earliestSnapshotIdToKeep); } + /** + * Lowers {@code earliestSnapshotIdToKeep} to also cover the base anchors of buckets that have + * no L0 files in the latest compacted snapshot. + * + *

A bucket without L0 has all of its data in base files, and its readable offset is taken + * from the latest tiered snapshot. However, that base was established by the bucket's most + * recent flush, and a later recomputation (once the bucket receives new L0) will trace back to + * that flush's anchor snapshot (the previous APPEND of the latest snapshot that exactly holds + * the flushed L0). That anchor snapshot must therefore stay retained until the bucket is + * flushed again. + * + *

This is best-effort: if a bucket's flush history has expired and its anchor cannot be + * determined, this conservatively returns {@link LakeCommitResult#KEEP_ALL_PREVIOUS} so that no + * snapshot is deleted, rather than risk deleting one that is still needed. + * + * @param bucketsWithoutL0 buckets with no L0 in the latest compacted snapshot + * @param flussTableBucketMapper mapper from Paimon partition-bucket to Fluss table bucket + * @param latestCompactedSnapshotId the latest compacted snapshot id (traversal start) + * @param earliestSnapshotId the earliest snapshot id still present in Paimon (traversal end) + * @param earliestSnapshotIdToKeep the current value computed from buckets with L0 + * @return the tightened earliest snapshot id to keep + */ + private long tightenEarliestSnapshotIdToKeepForBucketsWithoutL0( + Set bucketsWithoutL0, + FlussTableBucketMapper flussTableBucketMapper, + long latestCompactedSnapshotId, + long earliestSnapshotId, + long earliestSnapshotIdToKeep) + throws IOException { + if (bucketsWithoutL0.isEmpty()) { + return earliestSnapshotIdToKeep; + } + + // Only track buckets that map to a Fluss bucket; unmappable ones (e.g. a partition not in + // Fluss) never need recomputation and must not pin retention. + Set bucketsToAnchor = new HashSet<>(); + for (PaimonPartitionBucket bucket : bucketsWithoutL0) { + if (flussTableBucketMapper.toTableBucket(bucket) != null) { + bucketsToAnchor.add(bucket); + } + } + if (bucketsToAnchor.isEmpty()) { + return earliestSnapshotIdToKeep; + } + boolean allAnchorsResolved = true; + + for (long currentSnapshotId = latestCompactedSnapshotId; + currentSnapshotId >= earliestSnapshotId && !bucketsToAnchor.isEmpty(); + currentSnapshotId--) { + Snapshot currentSnapshot = snapshotManager.tryGetSnapshot(currentSnapshotId); + if (currentSnapshot == null + || currentSnapshot.commitKind() != Snapshot.CommitKind.COMPACT) { + continue; + } + // The first flush encountered going backwards is the bucket's most recent flush. + for (PaimonPartitionBucket partitionBucket : getBucketsWithFlushedL0(currentSnapshot)) { + if (!bucketsToAnchor.remove(partitionBucket)) { + continue; + } + Snapshot previousAppendSnapshot = findBaseAnchorAppendSnapshot(currentSnapshot); + if (previousAppendSnapshot == null) { + // can't determine this bucket's base anchor; don't tighten retention + allAnchorsResolved = false; + continue; + } + if (earliestSnapshotIdToKeep <= 0 + || previousAppendSnapshot.id() < earliestSnapshotIdToKeep) { + earliestSnapshotIdToKeep = previousAppendSnapshot.id(); + } + } + } + + if (!bucketsToAnchor.isEmpty()) { + // some bucket's most recent flush was not found within the retained snapshots + allAnchorsResolved = false; + } + + return allAnchorsResolved ? earliestSnapshotIdToKeep : LakeCommitResult.KEEP_ALL_PREVIOUS; + } + + /** + * Finds the base anchor APPEND snapshot for the bucket(s) whose L0 files were flushed by the + * given compacted snapshot. + * + *

The anchor is the previous APPEND of the latest snapshot that still exactly holds those + * flushed L0 files (see {@link PaimonDvTableUtils#findLatestSnapshotExactlyHoldingL0Files}). + * Its tiered offset is the bucket's readable offset, and it must stay retained until the bucket + * is flushed again. + * + * @param flushingCompactedSnapshot the COMPACT snapshot that flushed the bucket's L0 files + * @return the base anchor APPEND snapshot, or {@code null} if it cannot be determined (e.g. the + * holding snapshot or all earlier APPEND snapshots have been expired) + */ + @Nullable + private Snapshot findBaseAnchorAppendSnapshot(Snapshot flushingCompactedSnapshot) + throws IOException { + Snapshot sourceSnapshot = + findLatestSnapshotExactlyHoldingL0Files(fileStoreTable, flushingCompactedSnapshot); + if (sourceSnapshot == null) { + return null; + } + return sourceSnapshot.commitKind() == Snapshot.CommitKind.APPEND + ? sourceSnapshot + : findPreviousSnapshot(sourceSnapshot.id(), Snapshot.CommitKind.APPEND); + } + /** * Checks that the given lake snapshot belongs to the current table (same table id). Throws when * the table may have been dropped and re-created with a different id; the tiering committer @@ -495,16 +591,10 @@ private LakeSnapshot getOrFetchLakeSnapshot(long snapshotId, Map bucketsWithoutL0 = new HashSet<>(); Set bucketsWithL0 = new HashSet<>(); - // Scan the snapshot to get all splits including level0 - Map scanOptions = new HashMap<>(); - scanOptions.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), String.valueOf(snapshot.id())); - // hacky: set batch scan mode to compact to make sure we can get l0 level files - scanOptions.put( - CoreOptions.BATCH_SCAN_MODE.key(), CoreOptions.BatchScanMode.COMPACT.getValue()); - + // Scan the snapshot to get all data files including L0 level files Map>> manifestsByBucket = FileStoreScan.Plan.groupByPartFiles( - fileStoreTable.copy(scanOptions).store().newScan().plan().files()); + fileStoreTable.store().newScan().withSnapshot(snapshot).plan().files()); for (Map.Entry>> manifestsByBucketEntry : manifestsByBucket.entrySet()) { diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java index 0ce3d5b8ef..ea4fdce282 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetrieverTest.java @@ -732,9 +732,12 @@ void testGetReadableSnapshotAndOffsetsForPartitionedTable() throws Exception { assertThat(readableSnapshotAndOffsets.getReadableOffsets()) .isEqualTo(expectedReadableOffsets); - // After compact 13, all buckets have no L0 in the compacted snapshot, we only need - // to keep the previous append snapshot 12 - assertThat(readableSnapshotAndOffsets.getEarliestSnapshotIdToKeep()).isEqualTo(snapshot12); + // All buckets have no L0 in the compacted snapshot, but each bucket's base is anchored to + // the previous APPEND of its most recent flush: partition0/bucket0 -> snapshot6 (flushed at + // s7, never flushed since), partition1/bucket0 -> snapshot9, partition0/bucket1 -> + // snapshot12. The earliest snapshot we must keep is the minimum of these anchors + // (snapshot6), since a future recomputation for partition0/bucket0 would trace back to it. + assertThat(readableSnapshotAndOffsets.getEarliestSnapshotIdToKeep()).isEqualTo(snapshot6); } private long latestSnapshot(FileStoreTable fileStoreTable) {