diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 5f7e3741927e..cae12ae82c59 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -49,10 +49,10 @@ import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.function.Supplier; @@ -98,7 +98,7 @@ protected KeyValueFileReaderFactory( this.ignoreLostFiles = coreOptions.scanIgnoreLostFile(); this.snapshotSequenceOrdering = coreOptions.snapshotSequenceOrdering(); this.partition = partition; - this.formatReaderMappings = new HashMap<>(); + this.formatReaderMappings = new ConcurrentHashMap<>(); this.dvFactory = dvFactory; } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java index ae59876f5310..7ffd9c31cff6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java @@ -37,10 +37,11 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.paimon.mergetree.LookupUtils.fileKibiBytes; import static org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString; -import static org.apache.paimon.utils.Preconditions.checkArgument; /** Lookup file for cache remote file to local. */ public class LookupFile { @@ -54,9 +55,9 @@ public class LookupFile { private final LookupStoreReader reader; private final Runnable callback; - private long requestCount; - private long hitCount; - private boolean isClosed = false; + private final AtomicLong requestCount = new AtomicLong(); + private final AtomicLong hitCount = new AtomicLong(); + private final AtomicBoolean isClosed = new AtomicBoolean(false); public LookupFile( File localFile, @@ -86,12 +87,14 @@ public String serVersion() { } @Nullable - public byte[] get(byte[] key) throws IOException { - checkArgument(!isClosed); - requestCount++; + public synchronized byte[] get(byte[] key) throws IOException { + if (isClosed.get()) { + return null; + } + requestCount.incrementAndGet(); byte[] res = reader.lookup(key); if (res != null) { - hitCount++; + hitCount.incrementAndGet(); } return res; } @@ -101,21 +104,66 @@ public int level() { } public boolean isClosed() { - return isClosed; + return isClosed.get(); } - public void close(RemovalCause cause) throws IOException { - reader.close(); - isClosed = true; - callback.run(); + public synchronized void close(RemovalCause cause) throws IOException { + if (!isClosed.compareAndSet(false, true)) { + return; + } + + Throwable throwable = null; + try { + reader.close(); + } catch (Throwable t) { + throwable = t; + } + + try { + callback.run(); + } catch (Throwable t) { + throwable = addSuppressed(throwable, t); + } + LOG.info( "Delete Lookup file {} due to {}. Access stats: requestCount={}, hitCount={}, size={}KB", localFile.getName(), cause, - requestCount, - hitCount, + requestCount.get(), + hitCount.get(), localFile.length() >> 10); - FileIOUtils.deleteFileOrDirectory(localFile); + + try { + FileIOUtils.deleteFileOrDirectory(localFile); + } catch (Throwable t) { + throwable = addSuppressed(throwable, t); + } + + throwIfNotNull(throwable); + } + + private static Throwable addSuppressed(@Nullable Throwable throwable, Throwable suppressed) { + if (throwable == null) { + return suppressed; + } + throwable.addSuppressed(suppressed); + return throwable; + } + + private static void throwIfNotNull(@Nullable Throwable throwable) throws IOException { + if (throwable == null) { + return; + } + if (throwable instanceof IOException) { + throw (IOException) throwable; + } + if (throwable instanceof RuntimeException) { + throw (RuntimeException) throwable; + } + if (throwable instanceof Error) { + throw (Error) throwable; + } + throw new IOException(throwable); } // ==================== Cache for Local File ====================== diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java index f8dce8f2e853..850892a8c078 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java @@ -56,6 +56,7 @@ public class LookupLevels implements Levels.DropFileCallback, Closeable { public static final String REMOTE_LOOKUP_FILE_SUFFIX = ".lookup"; + private static final int LOOKUP_FILE_LOCK_STRIPES = 1024; private final Function schemaFunction; private final long currentSchemaId; @@ -70,6 +71,7 @@ public class LookupLevels implements Levels.DropFileCallback, Closeable { private final Function bfGenerator; private final Cache lookupFileCache; private final Set ownCachedFiles; + private final Object[] lookupFileLocks; private final Map, PersistProcessor> schemaIdAndSerVersionToProcessors; @Nullable private RemoteFileDownloader remoteFileDownloader; @@ -99,7 +101,11 @@ public LookupLevels( this.lookupStoreFactory = lookupStoreFactory; this.bfGenerator = bfGenerator; this.lookupFileCache = lookupFileCache; - this.ownCachedFiles = new HashSet<>(); + this.ownCachedFiles = ConcurrentHashMap.newKeySet(); + this.lookupFileLocks = new Object[LOOKUP_FILE_LOCK_STRIPES]; + for (int i = 0; i < lookupFileLocks.length; i++) { + lookupFileLocks[i] = new Object(); + } this.schemaIdAndSerVersionToProcessors = new ConcurrentHashMap<>(); levels.addDropFileCallback(this); } @@ -144,29 +150,84 @@ private T lookup(InternalRow key, SortedRun level) throws IOException { @Nullable private T lookup(InternalRow key, DataFileMeta file) throws IOException { - LookupFile lookupFile = lookupFileCache.getIfPresent(file.fileName()); + byte[] keyBytes = serializeKey(key); + LookupResult lookupResult = lookupFile(file, keyBytes); + byte[] valueBytes = lookupResult.valueBytes; + if (valueBytes == null) { + return null; + } - boolean newCreatedLookupFile = false; - if (lookupFile == null) { - lookupFile = createLookupFile(file); - newCreatedLookupFile = true; + return readFromDisk( + getOrCreateProcessor(lookupResult.schemaId, lookupResult.serVersion), + key, + lookupResult.level, + valueBytes, + file.fileName()); + } + + private LookupResult lookupFile(DataFileMeta file, byte[] keyBytes) throws IOException { + String fileName = file.fileName(); + LookupFile lookupFile = lookupFileCache.getIfPresent(fileName); + LookupResult lookupResult = lookupCachedFile(fileName, lookupFile, keyBytes); + if (lookupResult != null) { + return lookupResult; } - byte[] valueBytes; - try { - byte[] keyBytes = keySerializer.serializeToBytes(key); - valueBytes = lookupFile.get(keyBytes); - } finally { - if (newCreatedLookupFile) { + Object lock = lookupFileLock(fileName); + synchronized (lock) { + lookupFile = lookupFileCache.getIfPresent(fileName); + lookupResult = lookupCachedFile(fileName, lookupFile, keyBytes); + if (lookupResult != null) { + return lookupResult; + } + + lookupFile = createLookupFile(file); + + try { + return LookupResult.of(lookupFile, lookupFile.get(keyBytes)); + } finally { addLocalFile(file, lookupFile); } } - if (valueBytes == null) { + } + + private Object lookupFileLock(String fileName) { + return lookupFileLocks[Math.floorMod(fileName.hashCode(), lookupFileLocks.length)]; + } + + @Nullable + private LookupResult lookupCachedFile( + String fileName, @Nullable LookupFile lookupFile, byte[] keyBytes) throws IOException { + if (lookupFile == null) { return null; } - return getOrCreateProcessor(lookupFile.schemaId(), lookupFile.serVersion()) - .readFromDisk(key, lookupFile.level(), valueBytes, file.fileName()); + byte[] valueBytes = lookupFile.get(keyBytes); + if (lookupFile.isClosed()) { + lookupFileCache.asMap().remove(fileName, lookupFile); + return null; + } + return LookupResult.of(lookupFile, valueBytes); + } + + private static class LookupResult { + + private final int level; + private final long schemaId; + private final String serVersion; + private final byte[] valueBytes; + + private LookupResult(int level, long schemaId, String serVersion, byte[] valueBytes) { + this.level = level; + this.schemaId = schemaId; + this.serVersion = serVersion; + this.valueBytes = valueBytes; + } + + private static LookupResult of(LookupFile lookupFile, byte[] valueBytes) { + return new LookupResult( + lookupFile.level(), lookupFile.schemaId(), lookupFile.serVersion(), valueBytes); + } } private PersistProcessor getOrCreateProcessor(long schemaId, String serVersion) { @@ -179,31 +240,80 @@ private PersistProcessor getOrCreateProcessor(long schemaId, String serVersio }); } + private T readFromDisk( + PersistProcessor processor, + InternalRow key, + int level, + byte[] valueBytes, + String fileName) { + synchronized (processor) { + return processor.readFromDisk(key, level, valueBytes, fileName); + } + } + + private byte[] persistToDisk(PersistProcessor processor, KeyValue kv) { + synchronized (processor) { + return processor.persistToDisk(kv); + } + } + + private byte[] persistToDisk(PersistProcessor processor, KeyValue kv, long rowPosition) { + synchronized (processor) { + return processor.persistToDisk(kv, rowPosition); + } + } + + private byte[] serializeKey(InternalRow key) { + synchronized (keySerializer) { + return keySerializer.serializeToBytes(key); + } + } + public LookupFile createLookupFile(DataFileMeta file) throws IOException { File localFile = localFileFactory.apply(file.fileName()); if (!localFile.createNewFile()) { throw new IOException("Can not create new file: " + localFile); } - long schemaId = this.currentSchemaId; - String fileSerVersion = serializerFactory.version(); - Optional downloadSerVersion = tryToDownloadRemoteSst(file, localFile); - if (downloadSerVersion.isPresent()) { - // use schema id from remote file - schemaId = file.schemaId(); - fileSerVersion = downloadSerVersion.get(); - } else { - createSstFileFromDataFile(file, localFile); - } + try { + long schemaId = this.currentSchemaId; + String fileSerVersion = serializerFactory.version(); + Optional downloadSerVersion = tryToDownloadRemoteSst(file, localFile); + if (downloadSerVersion.isPresent()) { + // use schema id from remote file + schemaId = file.schemaId(); + fileSerVersion = downloadSerVersion.get(); + } else { + createSstFileFromDataFile(file, localFile); + } - ownCachedFiles.add(file.fileName()); - return new LookupFile( - localFile, - file.level(), - schemaId, - fileSerVersion, - lookupStoreFactory.createReader(localFile), - () -> ownCachedFiles.remove(file.fileName())); + LookupFile lookupFile = + new LookupFile( + localFile, + file.level(), + schemaId, + fileSerVersion, + lookupStoreFactory.createReader(localFile), + () -> ownCachedFiles.remove(file.fileName())); + ownCachedFiles.add(file.fileName()); + return lookupFile; + } catch (Throwable t) { + try { + FileIOUtils.deleteFileOrDirectory(localFile); + } catch (Throwable deleteException) { + t.addSuppressed(deleteException); + } + if (t instanceof IOException) { + throw (IOException) t; + } + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } + if (t instanceof Error) { + throw (Error) t; + } + throw new IOException(t); + } } private Optional tryToDownloadRemoteSst(DataFileMeta file, File localFile) { @@ -248,8 +358,8 @@ private void createSstFileFromDataFile(DataFileMeta file, File localFile) throws FileRecordIterator batch; while ((batch = (FileRecordIterator) reader.readBatch()) != null) { while ((kv = batch.next()) != null) { - byte[] keyBytes = keySerializer.serializeToBytes(kv.key()); - byte[] valueBytes = processor.persistToDisk(kv, batch.returnedPosition()); + byte[] keyBytes = serializeKey(kv.key()); + byte[] valueBytes = persistToDisk(processor, kv, batch.returnedPosition()); kvWriter.put(keyBytes, valueBytes); } batch.releaseBatch(); @@ -258,8 +368,8 @@ private void createSstFileFromDataFile(DataFileMeta file, File localFile) throws RecordReader.RecordIterator batch; while ((batch = reader.readBatch()) != null) { while ((kv = batch.next()) != null) { - byte[] keyBytes = keySerializer.serializeToBytes(kv.key()); - byte[] valueBytes = processor.persistToDisk(kv); + byte[] keyBytes = serializeKey(kv.key()); + byte[] valueBytes = persistToDisk(processor, kv); kvWriter.put(keyBytes, valueBytes); } batch.releaseBatch(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index f2e83d073fc0..fdc57861b661 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -54,9 +54,11 @@ import java.io.IOException; import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; import static org.apache.paimon.lookup.LookupStoreFactory.bfGenerator; @@ -65,7 +67,7 @@ /** Implementation for {@link TableQuery} for caching data and file in local. */ public class LocalTableQuery implements TableQuery { - private final Map>> tableView; + private final Map> tableView; private final CoreOptions options; @@ -79,7 +81,7 @@ public class LocalTableQuery implements TableQuery { private IOManager ioManager; - @Nullable private Cache lookupFileCache; + @Nullable private volatile Cache lookupFileCache; private final RowType rowType; private final RowType partitionType; @@ -89,7 +91,7 @@ public class LocalTableQuery implements TableQuery { public LocalTableQuery(FileStoreTable table) { this.options = table.coreOptions(); - this.tableView = new HashMap<>(); + this.tableView = new ConcurrentHashMap<>(); FileStore tableStore = table.store(); if (!(tableStore instanceof KeyValueFileStore)) { throw new UnsupportedOperationException( @@ -118,28 +120,32 @@ public void refreshFiles( int bucket, List beforeFiles, List dataFiles) { - LookupLevels lookupLevels = - tableView.computeIfAbsent(partition, k -> new HashMap<>()).get(bucket); - if (lookupLevels == null) { - // Initial phase: ignore beforeFiles as they represent deletions from previous state - newLookupLevels(partition, bucket, dataFiles); - } else { - lookupLevels.getLevels().update(beforeFiles, dataFiles); + // Both tableView and its nested bucket maps are ConcurrentHashMaps; this nested + // computeIfAbsent pattern relies on each map providing atomic insertion. + BucketLookupState state = + tableView + .computeIfAbsent(partition, k -> new ConcurrentHashMap<>()) + .computeIfAbsent(bucket, k -> new BucketLookupState()); + state.lock.writeLock().lock(); + try { + if (state.lookupLevels == null) { + // Initial phase: ignore beforeFiles as they represent deletions from previous state + state.lookupLevels = createLookupLevels(partition, bucket, dataFiles); + } else { + state.lookupLevels.getLevels().update(beforeFiles, dataFiles); + } + } finally { + state.lock.writeLock().unlock(); } } - private void newLookupLevels(BinaryRow partition, int bucket, List dataFiles) { + private LookupLevels createLookupLevels( + BinaryRow partition, int bucket, List dataFiles) { Levels levels = new Levels(keyComparatorSupplier.get(), dataFiles, options.numLevels()); // TODO pass DeletionVector factory KeyValueFileReaderFactory factory = readerFactoryBuilder.build(partition, bucket, DeletionVector.emptyFactory()); Options options = this.options.toConfiguration(); - if (lookupFileCache == null) { - lookupFileCache = - LookupFile.createCache( - options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION), - options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE)); - } RowType readValueType = readerFactoryBuilder.readValueType(); LookupLevels lookupLevels = @@ -168,7 +174,7 @@ private void newLookupLevels(BinaryRow partition, int bucket, List .getPathFile(), lookupStoreFactory, bfGenerator(options), - lookupFileCache); + lookupFileCache(options)); // Optimization - download lookup files if already persisted to object store // We download these files if three conditions are met @@ -192,28 +198,53 @@ private void newLookupLevels(BinaryRow partition, int bucket, List this.options.lookupRemoteLevelThreshold()); } - tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket, lookupLevels); + return lookupLevels; + } + + private Cache lookupFileCache(Options options) { + Cache cache = lookupFileCache; + if (cache == null) { + synchronized (this) { + cache = lookupFileCache; + if (cache == null) { + cache = + LookupFile.createCache( + options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION), + options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE)); + lookupFileCache = cache; + } + } + } + return cache; } - /** TODO remove synchronized and supports multiple thread to lookup. */ @Nullable @Override - public synchronized InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) - throws IOException { - Map> buckets = tableView.get(partition); + public InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException { + Map buckets = tableView.get(partition); if (buckets == null || buckets.isEmpty()) { return null; } - LookupLevels lookupLevels = buckets.get(bucket); - if (lookupLevels == null) { + BucketLookupState state = buckets.get(bucket); + if (state == null) { return null; } - KeyValue kv = lookupLevels.lookup(key, startLevel); - if (kv == null || kv.valueKind().isRetract()) { - return null; - } else { - return kv.value(); + state.lock.readLock().lock(); + try { + LookupLevels lookupLevels = state.lookupLevels; + if (lookupLevels == null) { + return null; + } + + KeyValue kv = lookupLevels.lookup(key, startLevel); + if (kv == null || kv.valueKind().isRetract()) { + return null; + } else { + return kv.value(); + } + } finally { + state.lock.readLock().unlock(); } } @@ -240,11 +271,19 @@ public InternalRowSerializer createValueSerializer() { @Override public void close() throws IOException { - for (Map.Entry>> buckets : - tableView.entrySet()) { - for (Map.Entry> bucket : - buckets.getValue().entrySet()) { - bucket.getValue().close(); + // ConcurrentHashMap iteration is weakly consistent. close is expected not to race with + // refreshFiles for the same query instance; callers may rebuild this query after close. + for (Map.Entry> buckets : tableView.entrySet()) { + for (Map.Entry bucket : buckets.getValue().entrySet()) { + BucketLookupState state = bucket.getValue(); + state.lock.writeLock().lock(); + try { + if (state.lookupLevels != null) { + state.lookupLevels.close(); + } + } finally { + state.lock.writeLock().unlock(); + } } } if (lookupFileCache != null) { @@ -252,4 +291,11 @@ public void close() throws IOException { } tableView.clear(); } + + private static class BucketLookupState { + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + @Nullable private LookupLevels lookupLevels; + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index 0169a6d5e3d6..d2c5586fc035 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -33,7 +33,9 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.fileindex.FileIndexOptions; import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.format.FlushingFileFormat; +import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.SimpleColStats; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileIOFinder; @@ -45,6 +47,8 @@ import org.apache.paimon.operation.BlobFileContext; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.stats.StatsTestUtils; import org.apache.paimon.table.SpecialFields; @@ -71,7 +75,13 @@ import java.util.LinkedList; import java.util.List; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import static org.apache.paimon.TestKeyValueGenerator.DEFAULT_ROW_TYPE; @@ -103,6 +113,61 @@ public void testReadNonExistentFile() { "you can configure 'snapshot.time-retained' option with a larger value."); } + @Test + public void testConcurrentCreateRecordReaderBuildsFormatMappingOnce() throws Exception { + AtomicInteger readerFactoryCreations = new AtomicInteger(); + CountDownLatch firstCreationStarted = new CountDownLatch(1); + CountDownLatch releaseCreation = new CountDownLatch(1); + FileFormat blockingFormat = + new FlushingFileFormat("avro") { + @Override + public FormatReaderFactory createReaderFactory( + RowType dataSchemaRowType, + RowType projectedRowType, + List filters) { + readerFactoryCreations.incrementAndGet(); + firstCreationStarted.countDown(); + try { + releaseCreation.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + return super.createReaderFactory( + dataSchemaRowType, projectedRowType, filters); + } + }; + KeyValueFileReaderFactory readerFactory = + createReaderFactory(tempDir.toString(), ignored -> blockingFormat, null, null); + ExecutorService executor = Executors.newFixedThreadPool(2); + + try { + Future> first = + executor.submit( + () -> + readerFactory.createRecordReader( + newFile("concurrent-1.avro", 0, 0, 1, 0))); + assertThat(firstCreationStarted.await(10, TimeUnit.SECONDS)).isTrue(); + Future> second = + executor.submit( + () -> + readerFactory.createRecordReader( + newFile("concurrent-2.avro", 0, 0, 1, 0))); + + Thread.sleep(500); + assertThat(readerFactoryCreations.get()).isEqualTo(1); + + releaseCreation.countDown(); + assertThatThrownBy(() -> first.get(10, TimeUnit.SECONDS)) + .hasCauseInstanceOf(java.io.FileNotFoundException.class); + assertThatThrownBy(() -> second.get(10, TimeUnit.SECONDS)) + .hasCauseInstanceOf(java.io.FileNotFoundException.class); + } finally { + releaseCreation.countDown(); + executor.shutdownNow(); + } + } + @RepeatedTest(10) public void testWriteAndReadDataFileWithStatsCollectingRollingFile() throws Exception { testWriteAndReadDataFileImpl("avro"); @@ -354,6 +419,15 @@ protected KeyValueFileWriterFactory createWriterFactory( private KeyValueFileReaderFactory createReaderFactory( String pathStr, String format, RowType readKeyType, RowType readValueType) { + return createReaderFactory( + pathStr, ignore -> new FlushingFileFormat(format), readKeyType, readValueType); + } + + private KeyValueFileReaderFactory createReaderFactory( + String pathStr, + FileFormatDiscover formatDiscover, + RowType readKeyType, + RowType readValueType) { Path path = new Path(pathStr); FileIO fileIO = FileIOFinder.find(path); FileStorePathFactory pathFactory = createNonPartFactory(path); @@ -364,7 +438,7 @@ private KeyValueFileReaderFactory createReaderFactory( createTestSchemaManager(path).schema(0), KEY_TYPE, DEFAULT_ROW_TYPE, - ignore -> new FlushingFileFormat(format), + formatDiscover, pathFactory, new TestKeyValueGenerator.TestKeyValueFieldsExtractor(), new CoreOptions(new HashMap<>())); diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupFileTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupFileTest.java index 0855afeb7c6b..ec74aa9cd497 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupFileTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupFileTest.java @@ -18,19 +18,30 @@ package org.apache.paimon.mergetree; +import org.apache.paimon.lookup.LookupStoreReader; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause; + import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.paimon.data.BinaryRow.EMPTY_ROW; import static org.apache.paimon.data.BinaryRow.singleColumn; import static org.apache.paimon.mergetree.LookupFile.localFilePrefix; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link LookupFile}. */ public class LookupFileTest { + @TempDir java.nio.file.Path tempDir; + @Test public void testLocalFilePrefix() { RowType partType = RowType.of(DataTypes.STRING()); @@ -57,4 +68,35 @@ public void testLocalFilePrefix() { .isEqualTo( "2024073105-05-212323-10-data-ccbb95e7-8b8c-4549-8ca9-f553843d67ad-3.orc"); } + + @Test + public void testCloseCleansLocalStateWhenReaderCloseFails() throws Exception { + File localFile = tempDir.resolve("lookup-file").toFile(); + assertThat(localFile.createNewFile()).isTrue(); + AtomicBoolean callbackCalled = new AtomicBoolean(false); + LookupFile lookupFile = + new LookupFile( + localFile, + 1, + 0L, + "v1", + new LookupStoreReader() { + @Override + public byte[] lookup(byte[] key) { + return null; + } + + @Override + public void close() throws IOException { + throw new IOException("reader close failed"); + } + }, + () -> callbackCalled.set(true)); + + assertThatThrownBy(() -> lookupFile.close(RemovalCause.SIZE)) + .isInstanceOf(IOException.class) + .hasMessageContaining("reader close failed"); + assertThat(callbackCalled).isTrue(); + assertThat(localFile).doesNotExist(); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index 6aac8e1fe7bb..c86b4cfd1250 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -34,6 +34,9 @@ import org.apache.paimon.io.KeyValueFileWriterFactory; import org.apache.paimon.io.RollingFileWriter; import org.apache.paimon.io.cache.CacheManager; +import org.apache.paimon.lookup.LookupStoreFactory; +import org.apache.paimon.lookup.LookupStoreReader; +import org.apache.paimon.lookup.LookupStoreWriter; import org.apache.paimon.lookup.sort.SortLookupStoreFactory; import org.apache.paimon.manifest.FileSource; import org.apache.paimon.mergetree.lookup.DefaultLookupSerializerFactory; @@ -52,6 +55,8 @@ import org.apache.paimon.utils.BloomFilter; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause; + import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -67,6 +72,13 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import static org.apache.paimon.KeyValue.UNKNOWN_SEQUENCE; @@ -74,6 +86,7 @@ import static org.apache.paimon.options.MemorySize.VALUE_128_MB; import static org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test {@link LookupLevels}. */ public class LookupLevelsTest { @@ -270,7 +283,248 @@ public void testLookupLevel0() throws Exception { assertThat(kv.value().getInt(1)).isEqualTo(11); } + @Test + public void testConcurrentLookupCreatesSingleLookupFile() throws Exception { + DataFileMeta file = newFile(1, kv(1, 11), kv(2, 22)); + Levels levels = new Levels(comparator, Collections.singletonList(file), 1); + CountDownLatch ready = new CountDownLatch(2); + CountDownLatch start = new CountDownLatch(1); + AtomicInteger localFileRequests = new AtomicInteger(); + LookupLevels lookupLevels = + createLookupLevels( + levels, + MemorySize.ofMebiBytes(10), + fileName -> { + if (localFileRequests.incrementAndGet() == 1) { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + return new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + fileName); + }); + ExecutorService executor = Executors.newFixedThreadPool(2); + + try { + Future first = + executor.submit( + () -> { + ready.countDown(); + assertThat(start.await(10, TimeUnit.SECONDS)).isTrue(); + return lookupLevels.lookup(row(1), 1); + }); + Future second = + executor.submit( + () -> { + ready.countDown(); + assertThat(start.await(10, TimeUnit.SECONDS)).isTrue(); + return lookupLevels.lookup(row(1), 1); + }); + + assertThat(ready.await(10, TimeUnit.SECONDS)).isTrue(); + start.countDown(); + KeyValue firstResult = first.get(10, TimeUnit.SECONDS); + KeyValue secondResult = second.get(10, TimeUnit.SECONDS); + assertThat(firstResult).isNotNull(); + assertThat(secondResult).isNotNull(); + assertThat(firstResult.value()).isNotNull(); + assertThat(secondResult.value()).isNotNull(); + assertThat(firstResult.value().getInt(1)).isEqualTo(11); + assertThat(secondResult.value().getInt(1)).isEqualTo(11); + assertThat(localFileRequests.get()).isEqualTo(1); + assertThat(lookupLevels.lookupFiles().estimatedSize()).isEqualTo(1); + } finally { + executor.shutdownNow(); + lookupLevels.close(); + } + } + + @Test + public void testLookupFileLockWaitersAreNotSplitAfterFailure() throws Exception { + DataFileMeta file = newFile(1, kv(1, 11)); + Levels levels = new Levels(comparator, Collections.singletonList(file), 1); + CountDownLatch firstCreateEntered = new CountDownLatch(1); + CountDownLatch failFirstCreate = new CountDownLatch(1); + CountDownLatch secondCreateEntered = new CountDownLatch(1); + CountDownLatch finishSecondCreate = new CountDownLatch(1); + CountDownLatch thirdCreateEntered = new CountDownLatch(1); + AtomicInteger createAttempts = new AtomicInteger(); + LookupLevels lookupLevels = + new LookupLevels( + schemaId -> rowType, + 0L, + levels, + comparator, + keyType, + PersistValueProcessor.factory(rowType), + new DefaultLookupSerializerFactory(), + dataFile -> createReaderFactory().createRecordReader(dataFile), + fileName -> + new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), + createLookupStoreFactory(), + rowCount -> BloomFilter.builder(rowCount, 0.05), + LookupFile.createCache(Duration.ofHours(1), MemorySize.ofMebiBytes(10))) { + @Override + public LookupFile createLookupFile(DataFileMeta file) throws IOException { + int attempt = createAttempts.incrementAndGet(); + if (attempt == 1) { + firstCreateEntered.countDown(); + awaitLatch(failFirstCreate); + throw new IOException("first create failed"); + } else if (attempt == 2) { + secondCreateEntered.countDown(); + awaitLatch(finishSecondCreate); + return super.createLookupFile(file); + } else { + thirdCreateEntered.countDown(); + awaitLatch(finishSecondCreate); + return super.createLookupFile(file); + } + } + }; + ExecutorService executor = Executors.newFixedThreadPool(2); + FutureTask waitingLookup = new FutureTask<>(() -> lookupLevels.lookup(row(1), 1)); + Thread waitingThread = new Thread(waitingLookup); + + try { + Future failedLookup = executor.submit(() -> lookupLevels.lookup(row(1), 1)); + assertThat(firstCreateEntered.await(10, TimeUnit.SECONDS)).isTrue(); + + waitingThread.start(); + waitUntilBlocked(waitingThread); + + failFirstCreate.countDown(); + assertThatThrownBy(() -> failedLookup.get(10, TimeUnit.SECONDS)) + .hasCauseInstanceOf(IOException.class) + .hasMessageContaining("first create failed"); + assertThat(secondCreateEntered.await(10, TimeUnit.SECONDS)).isTrue(); + + Future laterLookup = executor.submit(() -> lookupLevels.lookup(row(1), 1)); + assertThat(thirdCreateEntered.await(500, TimeUnit.MILLISECONDS)).isFalse(); + + finishSecondCreate.countDown(); + KeyValue waitingResult = waitingLookup.get(10, TimeUnit.SECONDS); + KeyValue laterResult = laterLookup.get(10, TimeUnit.SECONDS); + assertThat(waitingResult).isNotNull(); + assertThat(laterResult).isNotNull(); + assertThat(waitingResult.value().getInt(1)).isEqualTo(11); + assertThat(laterResult.value().getInt(1)).isEqualTo(11); + assertThat(createAttempts.get()).isEqualTo(2); + } finally { + failFirstCreate.countDown(); + finishSecondCreate.countDown(); + executor.shutdownNow(); + lookupLevels.close(); + } + } + + @Test + public void testLookupReloadsClosedCachedLookupFile() throws Exception { + DataFileMeta file = newFile(1, kv(1, 11)); + Levels levels = new Levels(comparator, Collections.singletonList(file), 1); + LookupLevels lookupLevels = + createLookupLevels(levels, MemorySize.ofMebiBytes(10)); + + try { + KeyValue kv = lookupLevels.lookup(row(1), 1); + assertThat(kv).isNotNull(); + assertThat(kv.value().getInt(1)).isEqualTo(11); + + LookupFile cached = lookupLevels.lookupFiles().getIfPresent(file.fileName()); + assertThat(cached).isNotNull(); + cached.close(RemovalCause.SIZE); + + kv = lookupLevels.lookup(row(1), 1); + assertThat(kv).isNotNull(); + assertThat(kv.value().getInt(1)).isEqualTo(11); + LookupFile reloaded = lookupLevels.lookupFiles().getIfPresent(file.fileName()); + assertThat(reloaded).isNotNull().isNotSameAs(cached); + assertThat(reloaded.isClosed()).isFalse(); + } finally { + lookupLevels.close(); + } + } + + @Test + public void testCreateReaderFailureDoesNotLeakCachedFileName() throws Exception { + DataFileMeta file = newFile(1, kv(1, 11)); + Levels levels = new Levels(comparator, Collections.singletonList(file), 1); + SortLookupStoreFactory delegate = createLookupStoreFactory(); + LookupLevels lookupLevels = + createLookupLevels( + levels, + MemorySize.ofMebiBytes(10), + fileName -> + new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), + new LookupStoreFactory() { + @Override + public LookupStoreWriter createWriter( + File file, BloomFilter.Builder bloomFilter) throws IOException { + return delegate.createWriter(file, bloomFilter); + } + + @Override + public LookupStoreReader createReader(File file) throws IOException { + throw new IOException("reader failed"); + } + }); + + assertThatThrownBy(() -> lookupLevels.lookup(row(1), 1)) + .isInstanceOf(IOException.class) + .hasMessageContaining("reader failed"); + assertThat(lookupLevels.cachedFiles()).isEmpty(); + } + + @Test + public void testCreateReaderRuntimeFailureCleansLocalFile() throws Exception { + DataFileMeta file = newFile(1, kv(1, 11)); + Levels levels = new Levels(comparator, Collections.singletonList(file), 1); + SortLookupStoreFactory delegate = createLookupStoreFactory(); + File localFile = new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + "runtime-failure"); + LookupLevels lookupLevels = + createLookupLevels( + levels, + MemorySize.ofMebiBytes(10), + ignored -> localFile, + new LookupStoreFactory() { + @Override + public LookupStoreWriter createWriter( + File file, BloomFilter.Builder bloomFilter) throws IOException { + return delegate.createWriter(file, bloomFilter); + } + + @Override + public LookupStoreReader createReader(File file) { + throw new RuntimeException("reader runtime failed"); + } + }); + + assertThatThrownBy(() -> lookupLevels.lookup(row(1), 1)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("reader runtime failed"); + assertThat(localFile).doesNotExist(); + } + private LookupLevels createLookupLevels(Levels levels, MemorySize maxDiskSize) { + return createLookupLevels( + levels, + maxDiskSize, + file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID())); + } + + private LookupLevels createLookupLevels( + Levels levels, MemorySize maxDiskSize, Function localFileFactory) { + return createLookupLevels( + levels, maxDiskSize, localFileFactory, createLookupStoreFactory()); + } + + private LookupLevels createLookupLevels( + Levels levels, + MemorySize maxDiskSize, + Function localFileFactory, + LookupStoreFactory lookupStoreFactory) { return new LookupLevels<>( schemaId -> rowType, 0L, @@ -280,16 +534,20 @@ private LookupLevels createLookupLevels(Levels levels, MemorySize maxD PersistValueProcessor.factory(rowType), new DefaultLookupSerializerFactory(), file -> createReaderFactory().createRecordReader(file), - file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), - new SortLookupStoreFactory( - new RowCompactedSerializer(keyType).createSliceComparator(), - new CacheManager(MemorySize.ofMebiBytes(1)), - 4096, - new CompressOptions("none", 1)), + localFileFactory, + lookupStoreFactory, rowCount -> BloomFilter.builder(rowCount, 0.05), LookupFile.createCache(Duration.ofHours(1), maxDiskSize)); } + private SortLookupStoreFactory createLookupStoreFactory() { + return new SortLookupStoreFactory( + new RowCompactedSerializer(keyType).createSliceComparator(), + new CacheManager(MemorySize.ofMebiBytes(1)), + 4096, + new CompressOptions("none", 1)); + } + private KeyValue kv(int key, int value) { return kv(key, value, UNKNOWN_SEQUENCE); } @@ -350,6 +608,28 @@ public List valueFields(TableSchema schema) { return builder.build(BinaryRow.EMPTY_ROW, 0, DeletionVector.emptyFactory()); } + private static void awaitLatch(CountDownLatch latch) throws IOException { + try { + if (!latch.await(10, TimeUnit.SECONDS)) { + throw new IOException("Timed out waiting for latch."); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } + + private static void waitUntilBlocked(Thread thread) throws InterruptedException { + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); + while (System.nanoTime() < deadline) { + if (thread.getState() == Thread.State.BLOCKED) { + return; + } + Thread.sleep(10); + } + throw new AssertionError("Thread did not block on lookup file lock."); + } + private SchemaManager createSchemaManager(Path path) { TableSchema tableSchema = new TableSchema(