From 365c340311232850467db9b01bed5870c4d807e4 Mon Sep 17 00:00:00 2001 From: yugeeklab Date: Thu, 11 Jun 2026 15:03:03 +0900 Subject: [PATCH] [core] Validate lookup hits against the current deletion vector in LookupLevels Lookup files are cached per data file name and freeze the deletion state of their build time. Data files are immutable but their deletion vectors are not, so a cached lookup file can keep serving a row that has since been marked deleted - and with deletion vectors enabled, delete records are dropped from compaction output at any non-zero output level (MergeTreeCompactManager), so no tombstone remains in the levels to shadow the stale hit. Such a stale hit corrupts every consumer of the lookup. In particular, the lookup changelog producer uses it as the changelog BEFORE image: a re-insert with content identical to the pre-delete row (modulo changelog-producer.row-deduplicate-ignore-fields) is judged "no change" and produces no changelog, although a DELETE changelog was already emitted by an earlier compaction. Downstream CDC consumers end up permanently diverged: the table holds a live row while the changelog stream says it was deleted. Validate the hit's position against the current deletion vector before returning it. A deleted hit means the newest version of the key in the searched levels is gone; deeper levels only hold older versions, so the key is reported as absent rather than continuing the search. Co-Authored-By: Claude Fable 5 --- .../apache/paimon/mergetree/LookupLevels.java | 39 +++++++++++- .../MergeTreeCompactManagerFactory.java | 13 +++- .../paimon/table/query/LocalTableQuery.java | 4 +- .../paimon/mergetree/ContainsLevelsTest.java | 3 +- .../paimon/mergetree/LookupLevelsTest.java | 62 ++++++++++++++++++- 5 files changed, 113 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java index f8dce8f2e853..cc13cfdf135c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java @@ -22,11 +22,14 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.RowCompactedSerializer; +import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.lookup.LookupStoreFactory; import org.apache.paimon.lookup.LookupStoreWriter; +import org.apache.paimon.mergetree.lookup.FilePosition; import org.apache.paimon.mergetree.lookup.LookupSerializerFactory; import org.apache.paimon.mergetree.lookup.PersistProcessor; +import org.apache.paimon.mergetree.lookup.PositionedKeyValue; import org.apache.paimon.mergetree.lookup.RemoteFileDownloader; import org.apache.paimon.reader.FileRecordIterator; import org.apache.paimon.reader.RecordReader; @@ -69,6 +72,7 @@ public class LookupLevels implements Levels.DropFileCallback, Closeable { private final LookupStoreFactory lookupStoreFactory; private final Function bfGenerator; private final Cache lookupFileCache; + private final DeletionVector.Factory dvFactory; private final Set ownCachedFiles; private final Map, PersistProcessor> schemaIdAndSerVersionToProcessors; @@ -86,7 +90,8 @@ public LookupLevels( Function localFileFactory, LookupStoreFactory lookupStoreFactory, Function bfGenerator, - Cache lookupFileCache) { + Cache lookupFileCache, + DeletionVector.Factory dvFactory) { this.schemaFunction = schemaFunction; this.currentSchemaId = currentSchemaId; this.levels = levels; @@ -99,6 +104,7 @@ public LookupLevels( this.lookupStoreFactory = lookupStoreFactory; this.bfGenerator = bfGenerator; this.lookupFileCache = lookupFileCache; + this.dvFactory = dvFactory; this.ownCachedFiles = new HashSet<>(); this.schemaIdAndSerVersionToProcessors = new ConcurrentHashMap<>(); levels.addDropFileCallback(this); @@ -129,7 +135,36 @@ public void notifyDropFile(String file) { @Nullable public T lookup(InternalRow key, int startLevel) throws IOException { - return LookupUtils.lookup(levels, key, startLevel, this::lookup, this::lookupLevel0); + T result = LookupUtils.lookup(levels, key, startLevel, this::lookup, this::lookupLevel0); + if (result != null && isDeletedByDeletionVector(result)) { + // The hit may be served by a cached lookup file built before the latest + // deletion vector update: data files are immutable, but their deletion + // vectors are not, and lookup files freeze the deletion state of their + // build time. A hit whose position is marked deleted means the newest + // version of this key is gone; deeper levels only hold older versions, + // so the key must be reported as absent instead of continuing the search. + return null; + } + return result; + } + + private boolean isDeletedByDeletionVector(T result) throws IOException { + String fileName; + long rowPosition; + if (result instanceof PositionedKeyValue) { + PositionedKeyValue positioned = (PositionedKeyValue) result; + fileName = positioned.fileName(); + rowPosition = positioned.rowPosition(); + } else if (result instanceof FilePosition) { + FilePosition position = (FilePosition) result; + fileName = position.fileName(); + rowPosition = position.rowPosition(); + } else { + // No position information persisted (e.g. value-only processors), cannot + // validate the hit against the deletion vector. + return false; + } + return dvFactory.create(fileName).map(dv -> dv.isDeleted(rowPosition)).orElse(false); } @Nullable diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java index 89d19313844e..8233915ed383 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManagerFactory.java @@ -309,7 +309,12 @@ private MergeTreeCompactRewriter createRewriter( } LookupLevels lookupLevels = createLookupLevels( - partition, bucket, levels, processorFactory, lookupReaderFactory); + partition, + bucket, + levels, + processorFactory, + lookupReaderFactory, + dvFactory); RemoteLookupFileManager remoteLookupFileManager = null; if (options.lookupRemoteFileEnabled()) { remoteLookupFileManager = @@ -351,7 +356,8 @@ private LookupLevels createLookupLevels( int bucket, Levels levels, PersistProcessor.Factory processorFactory, - FileReaderFactory readerFactory) { + FileReaderFactory readerFactory, + DeletionVector.Factory dvFactory) { if (ioManager == null) { throw new RuntimeException( "Can not use lookup, there is no temp disk directory to use."); @@ -384,7 +390,8 @@ private LookupLevels createLookupLevels( .getPathFile(), lookupStoreFactory, bfGenerator(options), - lookupFileCache); + lookupFileCache, + dvFactory); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index f2e83d073fc0..f7cd5a9a1567 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -168,7 +168,9 @@ private void newLookupLevels(BinaryRow partition, int bucket, List .getPathFile(), lookupStoreFactory, bfGenerator(options), - lookupFileCache); + lookupFileCache, + // TODO pass DeletionVector factory (see reader factory above) + DeletionVector.emptyFactory()); // Optimization - download lookup files if already persisted to object store // We download these files if three conditions are met diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index 7bc95449d5a6..2c575f0b45e4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -210,7 +210,8 @@ private LookupLevels createContainsLevels(Levels levels, MemorySize max 4096, new CompressOptions("none", 1)), rowCount -> BloomFilter.builder(rowCount, 0.01), - LookupFile.createCache(Duration.ofHours(1), maxDiskSize)); + LookupFile.createCache(Duration.ofHours(1), maxDiskSize), + DeletionVector.emptyFactory()); } private KeyValue kv(int key, int value) { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index 6aac8e1fe7bb..1c9cb7e47838 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -25,6 +25,7 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.serializer.RowCompactedSerializer; +import org.apache.paimon.deletionvectors.BitmapDeletionVector; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.format.FlushingFileFormat; import org.apache.paimon.fs.FileIOFinder; @@ -37,7 +38,9 @@ import org.apache.paimon.lookup.sort.SortLookupStoreFactory; import org.apache.paimon.manifest.FileSource; import org.apache.paimon.mergetree.lookup.DefaultLookupSerializerFactory; +import org.apache.paimon.mergetree.lookup.PersistValueAndPosProcessor; import org.apache.paimon.mergetree.lookup.PersistValueProcessor; +import org.apache.paimon.mergetree.lookup.PositionedKeyValue; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.schema.KeyValueFieldsExtractor; @@ -66,6 +69,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.function.Function; @@ -270,6 +274,61 @@ public void testLookupLevel0() throws Exception { assertThat(kv.value().getInt(1)).isEqualTo(11); } + @Test + public void testLookupRespectsDeletionVectorUpdates() throws IOException { + Levels levels = new Levels(comparator, Arrays.asList(newFile(1, kv(1, 11))), 3); + Map deletionVectors = new HashMap<>(); + LookupLevels lookupLevels = + createPositionedLookupLevels( + levels, fileName -> Optional.ofNullable(deletionVectors.get(fileName))); + + // first lookup hits the live row and warms the lookup file cache + PositionedKeyValue hit = lookupLevels.lookup(row(1), 1); + assertThat(hit).isNotNull(); + assertThat(hit.keyValue().value().getInt(1)).isEqualTo(11); + + // a deletion of an unrelated position must not affect the live row + BitmapDeletionVector unrelated = new BitmapDeletionVector(); + unrelated.delete(hit.rowPosition() + 1); + deletionVectors.put(hit.fileName(), unrelated); + hit = lookupLevels.lookup(row(1), 1); + assertThat(hit).isNotNull(); + assertThat(hit.keyValue().value().getInt(1)).isEqualTo(11); + + // mark the returned position deleted; data files are immutable so the cached + // lookup file is not rebuilt and would keep serving the stale row + BitmapDeletionVector deletionVector = new BitmapDeletionVector(); + deletionVector.delete(hit.rowPosition()); + deletionVectors.put(hit.fileName(), deletionVector); + + // the hit must now be validated against the current deletion vector + assertThat(lookupLevels.lookup(row(1), 1)).isNull(); + + lookupLevels.close(); + } + + private LookupLevels createPositionedLookupLevels( + Levels levels, DeletionVector.Factory dvFactory) { + return new LookupLevels<>( + schemaId -> rowType, + 0L, + levels, + comparator, + keyType, + PersistValueAndPosProcessor.factory(rowType), + new DefaultLookupSerializerFactory(), + file -> createReaderFactory().createRecordReader(file), + file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), + new SortLookupStoreFactory( + new RowCompactedSerializer(keyType).createSliceComparator(), + new CacheManager(MemorySize.ofMebiBytes(1)), + 4096, + new CompressOptions("none", 1)), + rowCount -> BloomFilter.builder(rowCount, 0.05), + LookupFile.createCache(Duration.ofHours(1), MemorySize.ofMebiBytes(10)), + dvFactory); + } + private LookupLevels createLookupLevels(Levels levels, MemorySize maxDiskSize) { return new LookupLevels<>( schemaId -> rowType, @@ -287,7 +346,8 @@ private LookupLevels createLookupLevels(Levels levels, MemorySize maxD 4096, new CompressOptions("none", 1)), rowCount -> BloomFilter.builder(rowCount, 0.05), - LookupFile.createCache(Duration.ofHours(1), maxDiskSize)); + LookupFile.createCache(Duration.ofHours(1), maxDiskSize), + DeletionVector.emptyFactory()); } private KeyValue kv(int key, int value) {