From 00b306705b89e1e8627263e8468fef1c116c1f17 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Thu, 2 Jul 2026 15:34:35 +0000 Subject: [PATCH] Use FileSystem.openFile with FileStatus to reduce NameNode RPCs This commit changes how we open Hadoop files for reading. Instead of calling Filesystem.open this changes the code to use FileSystem.openFile. The openFile method returns a Builder object that has a setter method for a FileStatus object. HDFS-17593 adds logic to the DFSClient to use the located blocks in the FileStatus to reduce NameNode RPCs to get the block locations. This is useful in code where we happen to already have the FileStatus object for the associated file that we want to open. --- .../core/clientImpl/OfflineIterator.java | 2 +- .../core/clientImpl/bulk/BulkImport.java | 18 +++--- .../apache/accumulo/core/data/LoadPlan.java | 2 +- .../accumulo/core/file/FileOperations.java | 32 ++++++---- .../blockfile/impl/CachableBlockFile.java | 60 ++++++++++++++----- .../core/file/rfile/GenerateSplits.java | 6 +- .../accumulo/core/file/rfile/PrintInfo.java | 3 +- .../core/file/rfile/RFileOperations.java | 3 +- .../accumulo/core/file/rfile/SplitLarge.java | 3 +- .../accumulo/core/summary/SummaryReader.java | 5 +- .../core/client/rfile/RFileClientTest.java | 2 +- .../core/file/BloomFilterLayerLookupTest.java | 2 +- .../file/rfile/MultiThreadedRFileTest.java | 2 +- .../mapred/AccumuloFileOutputFormatIT.java | 2 +- .../mapreduce/AccumuloFileOutputFormatIT.java | 2 +- .../accumulo/server/client/BulkImporter.java | 59 +++++++++++------- .../server/compaction/FileCompactor.java | 2 +- .../accumulo/server/fs/FileManager.java | 2 +- .../apache/accumulo/server/util/FileUtil.java | 16 ++--- .../server/client/BulkImporterTest.java | 6 +- .../apache/accumulo/compactor/Compactor.java | 6 +- .../manager/upgrade/Upgrader9to10.java | 2 +- .../apache/accumulo/tserver/InMemoryMap.java | 2 +- .../tserver/log/RecoveryLogsIterator.java | 26 ++++---- .../tserver/tablet/CompactableUtils.java | 4 +- .../mapred/AccumuloFileOutputFormatIT.java | 2 +- .../mapreduce/AccumuloFileOutputFormatIT.java | 2 +- 27 files changed, 167 insertions(+), 106 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java index 251523b445c..13b0e6ad2e4 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java @@ -234,7 +234,7 @@ private SortedKeyValueIterator createIterator(KeyExtent extent, var cs = CryptoFactoryLoader.getServiceForClientWithTable(systemConf, tableConf, tableId); FileSystem fs = VolumeConfiguration.fileSystemForPath(file.getPathStr(), conf); FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() - .forFile(file.getPathStr(), fs, conf, cs).withTableConfiguration(tableCC).build(); + .forFile(file.getPathStr(), fs, conf, cs, null).withTableConfiguration(tableCC).build(); if (scannerSamplerConfigImpl != null) { reader = reader.getSample(scannerSamplerConfigImpl); if (reader == null) { diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java index 51a23026526..575a221b8b6 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java @@ -266,8 +266,10 @@ public MLong(long i) { } public static Map estimateSizes(AccumuloConfiguration acuConf, Path mapFile, - long fileSize, Collection extents, FileSystem ns, Cache fileLenCache, - CryptoService cs) throws IOException { + FileStatus status, Collection extents, FileSystem ns, + Cache fileLenCache, CryptoService cs) throws IOException { + + final long fileSize = status.getLen(); if (extents.size() == 1) { return Collections.singletonMap(extents.iterator().next(), fileSize); @@ -282,7 +284,7 @@ public static Map estimateSizes(AccumuloConfiguration acuConf, P Text row = new Text(); FileSKVIterator index = FileOperations.getInstance().newIndexReaderBuilder() - .forFile(mapFile.toString(), ns, ns.getConf(), cs).withTableConfiguration(acuConf) + .forFile(mapFile.toString(), ns, ns.getConf(), cs, status).withTableConfiguration(acuConf) .withFileLenCache(fileLenCache).build(); try { @@ -365,9 +367,9 @@ private static Text nextRow(Text row) { public static List findOverlappingTablets(ClientContext context, KeyExtentCache keyExtentCache, Path file, FileSystem fs, Cache fileLenCache, - CryptoService cs) throws IOException { + CryptoService cs, FileStatus status) throws IOException { try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() - .forFile(file.toString(), fs, fs.getConf(), cs) + .forFile(file.toString(), fs, fs.getConf(), cs, status) .withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache) .seekToBeginning().build()) { @@ -574,12 +576,12 @@ public SortedMap computeFileToTabletMappings(FileSystem fs CompletableFuture> future = CompletableFuture.supplyAsync(() -> { try { long t1 = System.currentTimeMillis(); - List extents = - findOverlappingTablets(context, extentCache, filePath, fs, fileLensCache, cs); + List extents = findOverlappingTablets(context, extentCache, filePath, fs, + fileLensCache, cs, fileStatus); // make sure file isn't going to too many tablets checkTabletCount(maxTablets, extents.size(), filePath.toString()); Map estSizes = estimateSizes(context.getConfiguration(), filePath, - fileStatus.getLen(), extents, fs, fileLensCache, cs); + fileStatus, extents, fs, fileLensCache, cs); Map pathLocations = new HashMap<>(); for (KeyExtent ke : extents) { pathLocations.put(ke, new Bulk.FileInfo(filePath, estSizes.getOrDefault(ke, 0L))); diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java index a874ae952e4..c3acdbe9472 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@ -544,7 +544,7 @@ public static LoadPlan compute(URI file, Map properties) throws I CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, properties); var tableConf = SiteConfiguration.empty().withOverrides(properties).build(); try (var reader = FileOperations.getInstance().newReaderBuilder() - .forFile(file.toString(), fs, conf, cs).withTableConfiguration(tableConf).build();) { + .forFile(file.toString(), fs, conf, cs, null).withTableConfiguration(tableConf).build();) { var firstRow = reader.getFirstKey().getRow(); var lastRow = reader.getLastKey().getRow(); return LoadPlan.builder().loadFileTo(path.getName(), RangeType.FILE, firstRow, lastRow) diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java index ce5c392fdcd..66271e8c52e 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapred.FileOutputCommitter; @@ -191,12 +192,14 @@ public static class FileOptions { public final Set columnFamilies; public final boolean inclusive; public final boolean dropCacheBehind; + public final FileStatus status; public FileOptions(TableId tableId, AccumuloConfiguration tableConfiguration, String filename, FileSystem fs, Configuration fsConf, RateLimiter rateLimiter, String compression, FSDataOutputStream outputStream, boolean enableAccumuloStart, CacheProvider cacheProvider, Cache fileLenCache, boolean seekToBeginning, CryptoService cryptoService, - Range range, Set columnFamilies, boolean inclusive, boolean dropCacheBehind) { + Range range, Set columnFamilies, boolean inclusive, boolean dropCacheBehind, + FileStatus status) { this.tableId = tableId; this.tableConfiguration = tableConfiguration; this.filename = filename; @@ -214,6 +217,7 @@ public FileOptions(TableId tableId, AccumuloConfiguration tableConfiguration, St this.columnFamilies = columnFamilies; this.inclusive = inclusive; this.dropCacheBehind = dropCacheBehind; + this.status = status; } public TableId getTableId() { @@ -293,6 +297,7 @@ public static class FileHelper { private RateLimiter rateLimiter; private CryptoService cryptoService; private boolean dropCacheBehind = false; + private FileStatus status; protected FileHelper table(TableId tid) { this.tableId = tid; @@ -334,31 +339,36 @@ protected FileHelper dropCacheBehind(boolean drop) { return this; } + protected FileHelper fileStatus(FileStatus status) { + this.status = status; + return this; + } + protected FileOptions toWriterBuilderOptions(String compression, FSDataOutputStream outputStream, boolean startEnabled) { return new FileOptions(tableId, tableConfiguration, filename, fs, fsConf, rateLimiter, compression, outputStream, startEnabled, NULL_PROVIDER, null, false, cryptoService, null, - null, true, dropCacheBehind); + null, true, dropCacheBehind, status); } protected FileOptions toReaderBuilderOptions(CacheProvider cacheProvider, Cache fileLenCache, boolean seekToBeginning) { return new FileOptions(tableId, tableConfiguration, filename, fs, fsConf, rateLimiter, null, null, false, cacheProvider == null ? NULL_PROVIDER : cacheProvider, fileLenCache, - seekToBeginning, cryptoService, null, null, true, dropCacheBehind); + seekToBeginning, cryptoService, null, null, true, dropCacheBehind, status); } protected FileOptions toIndexReaderBuilderOptions(Cache fileLenCache) { return new FileOptions(tableId, tableConfiguration, filename, fs, fsConf, rateLimiter, null, null, false, NULL_PROVIDER, fileLenCache, false, cryptoService, null, null, true, - dropCacheBehind); + dropCacheBehind, status); } protected FileOptions toScanReaderBuilderOptions(Range range, Set columnFamilies, boolean inclusive) { return new FileOptions(tableId, tableConfiguration, filename, fs, fsConf, rateLimiter, null, null, false, NULL_PROVIDER, null, false, cryptoService, range, columnFamilies, inclusive, - dropCacheBehind); + dropCacheBehind, status); } protected AccumuloConfiguration getTableConfiguration() { @@ -436,8 +446,8 @@ public class ReaderBuilder extends FileHelper implements ReaderTableConfiguratio private boolean seekToBeginning = false; public ReaderTableConfiguration forFile(String filename, FileSystem fs, Configuration fsConf, - CryptoService cs) { - filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs); + CryptoService cs, FileStatus status) { + filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs).fileStatus(status); return this; } @@ -504,8 +514,8 @@ public class IndexReaderBuilder extends FileHelper implements IndexReaderTableCo private Cache fileLenCache = null; public IndexReaderTableConfiguration forFile(String filename, FileSystem fs, - Configuration fsConf, CryptoService cs) { - filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs); + Configuration fsConf, CryptoService cs, FileStatus status) { + filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs).fileStatus(status); return this; } @@ -536,8 +546,8 @@ public class ScanReaderBuilder extends FileHelper implements ScanReaderTableConf private boolean inclusive; public ScanReaderTableConfiguration forFile(String filename, FileSystem fs, - Configuration fsConf, CryptoService cs) { - filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs); + Configuration fsConf, CryptoService cs, FileStatus status) { + filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs).fileStatus(status); return this; } diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index 19033e4394c..4c51ca9d8b3 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@ -26,6 +26,8 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -45,7 +47,9 @@ import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; import org.slf4j.Logger; @@ -86,30 +90,54 @@ public CachableBuilder conf(Configuration hadoopConf) { return this; } - public CachableBuilder fsPath(FileSystem fs, Path dataFile) { - return fsPath(fs, dataFile, false); + public CachableBuilder fsPath(FileSystem fs, Path dataFile, FileStatus status) { + return fsPath(fs, dataFile, false, status); } - public CachableBuilder fsPath(FileSystem fs, Path dataFile, boolean dropCacheBehind) { + public CachableBuilder fsPath(FileSystem fs, Path dataFile, boolean dropCacheBehind, + FileStatus status) { this.cacheId = pathToCacheId(dataFile); this.inputSupplier = () -> { - FSDataInputStream is = fs.open(dataFile); - if (dropCacheBehind) { - // Tell the DataNode that the write ahead log does not need to be cached in the OS page - // cache + FutureDataInputStreamBuilder builder = fs.openFile(dataFile); + if (status != null) { + builder.withFileStatus(status); + } + CompletableFuture future = builder.build(); + while (!future.isDone()) { try { - is.setDropBehind(Boolean.TRUE); - log.trace("Called setDropBehind(TRUE) for stream reading file {}", dataFile); - } catch (UnsupportedOperationException e) { - log.debug("setDropBehind not enabled for wal file: {}", dataFile); - } catch (IOException e) { - log.debug("IOException setting drop behind for file: {}, msg: {}", dataFile, - e.getMessage()); + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while opening file: " + dataFile, e); + } + } + try { + FSDataInputStream is = future.get(); + if (dropCacheBehind) { + // Tell the DataNode that the write ahead log does not need to be cached in the OS page + // cache + try { + is.setDropBehind(Boolean.TRUE); + log.trace("Called setDropBehind(TRUE) for stream reading file {}", dataFile); + } catch (UnsupportedOperationException e) { + log.debug("setDropBehind not enabled for wal file: {}", dataFile); + } catch (IOException e) { + log.debug("IOException setting drop behind for file: {}, msg: {}", dataFile, + e.getMessage()); + } } + return is; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while opening file: " + dataFile, e); + } catch (CancellationException e) { + throw new IOException("Cancelled while opening file: " + dataFile, e); + } catch (ExecutionException e) { + throw new IOException("Error trying to open file: " + dataFile, e); } - return is; }; - this.lengthSupplier = () -> fs.getFileStatus(dataFile).getLen(); + this.lengthSupplier = + () -> status == null ? fs.getFileStatus(dataFile).getLen() : status.getLen(); return this; } diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java index 3937ca383b4..f333c39ab59 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java @@ -304,7 +304,7 @@ private TreeSet getIndexKeys(AccumuloConfiguration accumuloConf, Configu try { for (Path file : files) { FileSKVIterator reader = FileOperations.getInstance().newIndexReaderBuilder() - .forFile(file.toString(), fs, hadoopConf, cs).withTableConfiguration(accumuloConf) + .forFile(file.toString(), fs, hadoopConf, cs, null).withTableConfiguration(accumuloConf) .build(); readers.add(reader); fileReaders.add(reader); @@ -333,7 +333,7 @@ private TreeSet getSplitsFromFullScan(SiteConfiguration accumuloConf, try { for (Path file : files) { FileSKVIterator reader = FileOperations.getInstance().newScanReaderBuilder() - .forFile(file.toString(), fs, hadoopConf, cs).withTableConfiguration(accumuloConf) + .forFile(file.toString(), fs, hadoopConf, cs, null).withTableConfiguration(accumuloConf) .overRange(new Range(), Set.of(), false).build(); readers.add(reader); fileReaders.add(reader); @@ -367,7 +367,7 @@ private TreeSet getSplitsBySize(AccumuloConfiguration accumuloConf, try { for (Path file : files) { FileSKVIterator reader = FileOperations.getInstance().newScanReaderBuilder() - .forFile(file.toString(), fs, hadoopConf, cs).withTableConfiguration(accumuloConf) + .forFile(file.toString(), fs, hadoopConf, cs, null).withTableConfiguration(accumuloConf) .overRange(new Range(), Set.of(), false).build(); readers.add(reader); fileReaders.add(reader); diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java index 1c3f5c90f4e..11a488cefe5 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java @@ -197,7 +197,8 @@ public void execute(final String[] args) throws Exception { CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, siteConfig.getAllCryptoProperties()); - CachableBuilder cb = new CachableBuilder().fsPath(fs, path).conf(conf).cryptoService(cs); + CachableBuilder cb = + new CachableBuilder().fsPath(fs, path, null).conf(conf).cryptoService(cs); Reader iter = new RFile.Reader(cb); MetricsGatherer>> vmg = new VisMetricsGatherer(); diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java index c87b0716424..b20ebfa3765 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java @@ -62,7 +62,8 @@ public class RFileOperations extends FileOperations { private static RFile.Reader getReader(FileOptions options) throws IOException { CachableBuilder cb = new CachableBuilder() - .fsPath(options.getFileSystem(), new Path(options.getFilename()), options.dropCacheBehind) + .fsPath(options.getFileSystem(), new Path(options.getFilename()), options.dropCacheBehind, + options.status) .conf(options.getConfiguration()).fileLen(options.getFileLenCache()) .cacheProvider(options.cacheProvider).readLimiter(options.getRateLimiter()) .cryptoService(options.getCryptoService()); diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java index e3adf9e5175..e4a33ad4043 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java @@ -80,7 +80,8 @@ public void execute(String[] args) throws Exception { AccumuloConfiguration aconf = opts.getSiteConfiguration(); CryptoService cs = CryptoFactoryLoader.getServiceForServer(aconf); Path path = new Path(file); - CachableBuilder cb = new CachableBuilder().fsPath(fs, path).conf(conf).cryptoService(cs); + CachableBuilder cb = + new CachableBuilder().fsPath(fs, path, null).conf(conf).cryptoService(cs); try (Reader iter = new RFile.Reader(cb)) { if (!file.endsWith(".rf")) { diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java index 83c06339747..3645e5e15df 100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java @@ -196,8 +196,9 @@ public static SummaryReader load(FileSystem fs, Configuration conf, SummarizerFa CompositeCache compositeCache = new CompositeCache(BlockCacheUtil.instrument(CacheType.SUMMARY, summaryCache), BlockCacheUtil.instrument(CacheType.INDEX, indexCache)); - CachableBuilder cb = new CachableBuilder().fsPath(fs, file).conf(conf).fileLen(fileLenCache) - .cacheProvider(new BasicCacheProvider(compositeCache, null)).cryptoService(cryptoService); + CachableBuilder cb = new CachableBuilder().fsPath(fs, file, null).conf(conf) + .fileLen(fileLenCache).cacheProvider(new BasicCacheProvider(compositeCache, null)) + .cryptoService(cryptoService); bcReader = new CachableBlockFile.Reader(cb); return load(bcReader, summarySelector, factory); } catch (FileNotFoundException fne) { diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java index ae3dccaecb2..68751641e6c 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java @@ -822,7 +822,7 @@ public void testWrongGroup() throws Exception { private Reader getReader(LocalFileSystem localFs, String testFile) throws IOException { return (Reader) FileOperations.getInstance().newReaderBuilder() - .forFile(testFile, localFs, localFs.getConf(), NoCryptoServiceFactory.NONE) + .forFile(testFile, localFs, localFs.getConf(), NoCryptoServiceFactory.NONE, null) .withTableConfiguration(DefaultConfiguration.getInstance()).build(); } diff --git a/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java b/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java index 1d0b5ef539d..3ebb898595b 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java @@ -99,7 +99,7 @@ public void test() throws IOException { t1 = System.currentTimeMillis(); FileSKVIterator bmfr = FileOperations.getInstance().newReaderBuilder() - .forFile(fname, fs, conf, NoCryptoServiceFactory.NONE).withTableConfiguration(acuconf) + .forFile(fname, fs, conf, NoCryptoServiceFactory.NONE, null).withTableConfiguration(acuconf) .build(); t2 = System.currentTimeMillis(); log.debug("Opened {} in {}", fname, (t2 - t1)); diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java index 8bc20a1f937..b876df818ad 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java @@ -185,7 +185,7 @@ public void openReader() throws IOException { AccumuloConfiguration defaultConf = DefaultConfiguration.getInstance(); // the caches used to obfuscate the multithreaded issues - CachableBuilder b = new CachableBuilder().fsPath(fs, path).conf(conf) + CachableBuilder b = new CachableBuilder().fsPath(fs, path, null).conf(conf) .cryptoService(CryptoFactoryLoader.getServiceForServer(defaultConf)); reader = new RFile.Reader(new CachableBlockFile.Reader(b)); iter = new ColumnFamilySkippingIterator(reader); diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloFileOutputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloFileOutputFormatIT.java index bcebf005bd8..5815eafaf6e 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloFileOutputFormatIT.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloFileOutputFormatIT.java @@ -199,7 +199,7 @@ private void handleWriteTests(boolean content) throws Exception { DefaultConfiguration acuconf = DefaultConfiguration.getInstance(); FileSKVIterator sample = FileOperations.getInstance().newReaderBuilder() .forFile(files[0].toString(), FileSystem.getLocal(conf), conf, - NoCryptoServiceFactory.NONE) + NoCryptoServiceFactory.NONE, null) .withTableConfiguration(acuconf).build() .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG)); assertNotNull(sample); diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloFileOutputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloFileOutputFormatIT.java index 287f41550b4..76c11d72c27 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloFileOutputFormatIT.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloFileOutputFormatIT.java @@ -214,7 +214,7 @@ private void handleWriteTests(boolean content) throws Exception { DefaultConfiguration acuconf = DefaultConfiguration.getInstance(); FileSKVIterator sample = RFileOperations.getInstance().newReaderBuilder() .forFile(files[0].toString(), FileSystem.getLocal(conf), conf, - NoCryptoServiceFactory.NONE) + NoCryptoServiceFactory.NONE, null) .withTableConfiguration(acuconf).build() .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG)); assertNotNull(sample); diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java index 8d224b6b450..61457d907e0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java +++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java @@ -65,6 +65,7 @@ import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -126,6 +127,8 @@ public AssignmentStats importFiles(List files) { final Map> completeFailures = Collections.synchronizedSortedMap(new TreeMap<>()); + final Map fileStatuses = new HashMap<>(); + ClientService.Client client = null; final TabletLocator locator = TabletLocator.getLocator(context, tableId); @@ -143,8 +146,10 @@ public AssignmentStats importFiles(List files) { Runnable getAssignments = () -> { List tabletsToAssignMapFileTo = Collections.emptyList(); try { - tabletsToAssignMapFileTo = - findOverlappingTablets(context, fs, locator, mapFile, tableConf.getCryptoService()); + FileStatus status = fs.getFileStatus(mapFile); + fileStatuses.put(mapFile, status); + tabletsToAssignMapFileTo = findOverlappingTablets(context, fs, locator, mapFile, + tableConf.getCryptoService(), status); } catch (Exception ex) { log.warn("Unable to find tablets that overlap file " + mapFile, ex); } @@ -173,7 +178,7 @@ public AssignmentStats importFiles(List files) { assignmentStats.attemptingAssignments(assignments); Map> assignmentFailures = - assignMapFiles(fs, assignments, paths, numAssignThreads, numThreads); + assignMapFiles(fs, assignments, paths, numAssignThreads, numThreads, fileStatuses); assignmentStats.assignmentsFailed(assignmentFailures); Map failureCount = new TreeMap<>(); @@ -212,8 +217,9 @@ public AssignmentStats importFiles(List files) { timer.start(Timers.QUERY_METADATA); try { - tabletsToAssignMapFileTo.addAll(findOverlappingTablets(context, fs, locator, - entry.getKey(), ke, tableConf.getCryptoService())); + Path p = entry.getKey(); + tabletsToAssignMapFileTo.addAll(findOverlappingTablets(context, fs, locator, p, ke, + tableConf.getCryptoService(), fileStatuses.get(p))); keListIter.remove(); } catch (Exception ex) { log.warn("Exception finding overlapping tablets, will retry tablet " + ke, ex); @@ -228,7 +234,7 @@ public AssignmentStats importFiles(List files) { assignmentStats.attemptingAssignments(assignments); Map> assignmentFailures2 = - assignMapFiles(fs, assignments, paths, numAssignThreads, numThreads); + assignMapFiles(fs, assignments, paths, numAssignThreads, numThreads, fileStatuses); assignmentStats.assignmentsFailed(assignmentFailures2); // merge assignmentFailures2 into assignmentFailures @@ -347,15 +353,21 @@ private static List extentsOf(List locations) { } private Map> estimateSizes(final VolumeManager vm, - Map> assignments, Collection paths, int numThreads) { + Map> assignments, Collection paths, int numThreads, + Map statuses) { long t1 = System.currentTimeMillis(); final Map mapFileSizes = new TreeMap<>(); try { for (Path path : paths) { - FileSystem fs = vm.getFileSystemByPath(path); - mapFileSizes.put(path, fs.getContentSummary(path).getLength()); + FileStatus status = statuses.get(path); + if (status == null) { + FileSystem fs = vm.getFileSystemByPath(path); + mapFileSizes.put(path, fs.getContentSummary(path).getLength()); + } else { + mapFileSizes.put(path, status.getLen()); + } } } catch (IOException e) { log.error("Failed to get map files in for {}: {}", paths, e.getMessage(), e); @@ -386,9 +398,9 @@ private Map> estimateSizes(final VolumeManager vm, Path mapFile = entry.getKey(); FileSystem ns = context.getVolumeManager().getFileSystemByPath(mapFile); - estimatedSizes = BulkImport.estimateSizes(context.getConfiguration(), mapFile, - mapFileSizes.get(entry.getKey()), extentsOf(entry.getValue()), ns, null, - tableConf.getCryptoService()); + estimatedSizes = + BulkImport.estimateSizes(context.getConfiguration(), mapFile, statuses.get(mapFile), + extentsOf(entry.getValue()), ns, null, tableConf.getCryptoService()); } catch (IOException e) { log.warn("Failed to estimate map file sizes {}", e.getMessage()); } @@ -447,10 +459,10 @@ private static Map locationsOf(Map> private Map> assignMapFiles(VolumeManager fs, Map> assignments, Collection paths, int numThreads, - int numMapThreads) { + int numMapThreads, Map statuses) { timer.start(Timers.EXAMINE_MAP_FILES); Map> assignInfo = - estimateSizes(fs, assignments, paths, numMapThreads); + estimateSizes(fs, assignments, paths, numMapThreads, statuses); timer.stop(Timers.EXAMINE_MAP_FILES); Map> ret; @@ -623,15 +635,16 @@ private List assignMapFiles(ClientContext context, HostAndPort locati } public static List findOverlappingTablets(ServerContext context, VolumeManager fs, - TabletLocator locator, Path file, CryptoService cs) throws Exception { - return findOverlappingTablets(context, fs, locator, file, null, null, cs); + TabletLocator locator, Path file, CryptoService cs, FileStatus status) throws Exception { + return findOverlappingTablets(context, fs, locator, file, null, null, cs, status); } public static List findOverlappingTablets(ServerContext context, VolumeManager fs, - TabletLocator locator, Path file, KeyExtent failed, CryptoService cs) throws Exception { + TabletLocator locator, Path file, KeyExtent failed, CryptoService cs, FileStatus status) + throws Exception { locator.invalidateCache(failed); Text start = getStartRowForExtent(failed); - return findOverlappingTablets(context, fs, locator, file, start, failed.endRow(), cs); + return findOverlappingTablets(context, fs, locator, file, start, failed.endRow(), cs, status); } protected static Text getStartRowForExtent(KeyExtent extent) { @@ -648,16 +661,16 @@ protected static Text getStartRowForExtent(KeyExtent extent) { static final byte[] byte0 = {0}; public static List findOverlappingTablets(ServerContext context, VolumeManager vm, - TabletLocator locator, Path file, Text startRow, Text endRow, CryptoService cs) - throws Exception { + TabletLocator locator, Path file, Text startRow, Text endRow, CryptoService cs, + FileStatus status) throws Exception { List result = new ArrayList<>(); Collection columnFamilies = Collections.emptyList(); String filename = file.toString(); // log.debug(filename + " finding overlapping tablets " + startRow + " -> " + endRow); FileSystem fs = vm.getFileSystemByPath(file); - try (FileSKVIterator reader = - FileOperations.getInstance().newReaderBuilder().forFile(filename, fs, fs.getConf(), cs) - .withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) { + try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() + .forFile(filename, fs, fs.getConf(), cs, status) + .withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) { Text row = startRow; if (row == null) { row = new Text(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java index 1045d13680f..e3df4837b0a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java @@ -494,7 +494,7 @@ private List> openMapDataFiles( } ReaderBuilder readerBuilder = fileFactory.newReaderBuilder() - .forFile(mapFile.getPathStr(), fs, fs.getConf(), cryptoService) + .forFile(mapFile.getPathStr(), fs, fs.getConf(), cryptoService, null) .withTableConfiguration(acuTableConf).withRateLimiter(env.getReadLimiter()); if (dropCacheBehindCompactionInputFile) { readerBuilder.dropCachesBehind(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java index 9a3aad33c35..c0ea25df621 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java @@ -310,7 +310,7 @@ private Map reserveReaders(KeyExtent tablet, Collection< FileSystem ns = context.getVolumeManager().getFileSystemByPath(path); // log.debug("Opening "+file + " path " + path); FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() - .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService(), null) .withTableConfiguration(tableConf).withCacheProvider(cacheProvider) .withFileLenCache(fileLenCache).build(); readersReserved.put(reader, file); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java index 3c189644df8..a2a43ea9d4a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java @@ -48,6 +48,7 @@ import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -150,7 +151,7 @@ public static Collection reduceFiles(ServerContext context, TableConfigu for (String file : inFiles) { ns = context.getVolumeManager().getFileSystemByPath(new Path(file)); reader = FileOperations.getInstance().newIndexReaderBuilder() - .forFile(file, ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(file, ns, ns.getConf(), tableConf.getCryptoService(), null) .withTableConfiguration(tableConf).build(); iters.add(reader); } @@ -434,14 +435,15 @@ private static long countIndexEntries(ServerContext context, TableConfiguration FileSKVIterator reader = null; Path path = new Path(file); FileSystem ns = context.getVolumeManager().getFileSystemByPath(path); + FileStatus status = ns.getFileStatus(path); try { if (useIndex) { reader = FileOperations.getInstance().newIndexReaderBuilder() - .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService(), status) .withTableConfiguration(tableConf).build(); } else { reader = FileOperations.getInstance().newScanReaderBuilder() - .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService(), status) .withTableConfiguration(tableConf) .overRange(new Range(prevEndRow, false, null, true), Set.of(), false).build(); } @@ -468,11 +470,11 @@ private static long countIndexEntries(ServerContext context, TableConfiguration if (useIndex) { readers.add(FileOperations.getInstance().newIndexReaderBuilder() - .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService(), status) .withTableConfiguration(tableConf).build()); } else { readers.add(FileOperations.getInstance().newScanReaderBuilder() - .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService(), status) .withTableConfiguration(tableConf) .overRange(new Range(prevEndRow, false, null, true), Set.of(), false).build()); } @@ -494,7 +496,7 @@ public static Map tryToGetFirstAndLastRows(ServerContext co FileSystem ns = context.getVolumeManager().getFileSystemByPath(mapfile.getPath()); try { reader = FileOperations.getInstance().newReaderBuilder() - .forFile(mapfile.getPathStr(), ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(mapfile.getPathStr(), ns, ns.getConf(), tableConf.getCryptoService(), null) .withTableConfiguration(tableConf).build(); Key firstKey = reader.getFirstKey(); @@ -532,7 +534,7 @@ public static WritableComparable findLastKey(ServerContext context, for (TabletFile file : mapFiles) { FileSystem ns = context.getVolumeManager().getFileSystemByPath(file.getPath()); FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() - .forFile(file.getPathStr(), ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(file.getPathStr(), ns, ns.getConf(), tableConf.getCryptoService(), null) .withTableConfiguration(tableConf).seekToBeginning().build(); try { diff --git a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java index 9b69efa1963..5e0647d6971 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java @@ -144,8 +144,8 @@ public void testFindOverlappingTablets() throws Exception { writer.append(new Key("xyzzy", "cf", "cq"), empty); writer.close(); try (var vm = VolumeManagerImpl.getLocalForTesting("file:///")) { - List overlaps = - BulkImporter.findOverlappingTablets(context, vm, locator, new Path(file), null, null, cs); + List overlaps = BulkImporter.findOverlappingTablets(context, vm, locator, + new Path(file), null, null, cs, null); assertEquals(5, overlaps.size()); Collections.sort(overlaps); assertEquals(new KeyExtent(tableId, new Text("a"), null), overlaps.get(0).tablet_extent); @@ -158,7 +158,7 @@ public void testFindOverlappingTablets() throws Exception { assertEquals(new KeyExtent(tableId, null, new Text("l")), overlaps.get(4).tablet_extent); List overlaps2 = BulkImporter.findOverlappingTablets(context, vm, locator, - new Path(file), new KeyExtent(tableId, new Text("h"), new Text("b")), cs); + new Path(file), new KeyExtent(tableId, new Text("h"), new Text("b")), cs, null); assertEquals(3, overlaps2.size()); assertEquals(new KeyExtent(tableId, new Text("d"), new Text("cm")), overlaps2.get(0).tablet_extent); diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index bc6d7e2bb78..67017e6f533 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -711,9 +711,9 @@ private long estimateOverlappingEntries(KeyExtent extent, StoredTabletFile file, FileOperations fileFactory = FileOperations.getInstance(); FileSystem fs = getContext().getVolumeManager().getFileSystemByPath(file.getPath()); - try (FileSKVIterator reader = - fileFactory.newReaderBuilder().forFile(file.getPathStr(), fs, fs.getConf(), cryptoService) - .withTableConfiguration(tableConf).dropCachesBehind().build()) { + try (FileSKVIterator reader = fileFactory.newReaderBuilder() + .forFile(file.getPathStr(), fs, fs.getConf(), cryptoService, null) + .withTableConfiguration(tableConf).dropCachesBehind().build()) { return reader.estimateOverlappingEntries(extent); } catch (IOException ioe) { throw new UncheckedIOException(ioe); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java index 9250d3a9ca4..8d65049df7e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java @@ -473,7 +473,7 @@ MetadataTime computeRootTabletTime(ServerContext context, Collection goo var tableConf = context.getTableConfiguration(RootTable.ID); long maxTime = -1; try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() - .forFile(path.toString(), ns, ns.getConf(), NoCryptoServiceFactory.NONE) + .forFile(path.toString(), ns, ns.getConf(), NoCryptoServiceFactory.NONE, null) .withTableConfiguration(tableConf).seekToBeginning().build()) { while (reader.hasTop()) { maxTime = Math.max(maxTime, reader.getTopKey().getTimestamp()); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java index 306a4e7d270..6d4a1e65b04 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java @@ -602,7 +602,7 @@ private synchronized FileSKVIterator getReader() throws IOException { TableConfiguration tableConf = context.getTableConfiguration(tableId); reader = new RFileOperations().newReaderBuilder() - .forFile(memDumpFile, fs, conf, tableConf.getCryptoService()) + .forFile(memDumpFile, fs, conf, tableConf.getCryptoService(), null) .withTableConfiguration(tableConf).seekToBeginning().build(); if (iflag != null) { reader.setInterruptFlag(iflag); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java index f2a1a5c990d..b56612314a6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java @@ -26,8 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; -import java.util.SortedSet; -import java.util.TreeSet; +import java.util.TreeMap; import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl; import org.apache.accumulo.core.data.Key; @@ -80,7 +79,7 @@ public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, L for (Path logDir : recoveryLogDirs) { LOG.debug("Opening recovery log dir {}", logDir.getName()); - SortedSet logFiles = getFiles(vm, logDir); + TreeMap logFiles = getFiles(vm, logDir); var fs = vm.getFileSystemByPath(logDir); // only check the first key once to prevent extra iterator creation and seeking @@ -88,9 +87,9 @@ public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, L validateFirstKey(context, cryptoService, fs, logFiles, logDir); } - for (Path log : logFiles) { + for (Entry entry : logFiles.entrySet()) { FileSKVIterator fileIter = FileOperations.getInstance().newReaderBuilder() - .forFile(log.toString(), fs, fs.getConf(), cryptoService) + .forFile(entry.getKey().toString(), fs, fs.getConf(), cryptoService, entry.getValue()) .withTableConfiguration(context.getConfiguration()).seekToBeginning().build(); if (range != null) { fileIter.seek(range, Collections.emptySet(), false); @@ -98,11 +97,13 @@ public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, L Iterator> scanIter = new IteratorAdapter(fileIter); if (scanIter.hasNext()) { - LOG.debug("Write ahead log {} has data in range {} {}", log.getName(), start, end); + LOG.debug("Write ahead log {} has data in range {} {}", entry.getKey().getName(), start, + end); iterators.add(scanIter); fileIters.add(fileIter); } else { - LOG.debug("Write ahead log {} has no data in range {} {}", log.getName(), start, end); + LOG.debug("Write ahead log {} has no data in range {} {}", entry.getKey().getName(), + start, end); fileIter.close(); } } @@ -137,12 +138,12 @@ public void close() throws IOException { /** * Check for sorting signal files (finished/failed) and get the logs in the provided directory. */ - private SortedSet getFiles(VolumeManager fs, Path directory) throws IOException { + private TreeMap getFiles(VolumeManager fs, Path directory) throws IOException { boolean foundFinish = false; // Path::getName compares the last component of each Path value. In this case, the last // component should // always have the format 'part-r-XXXXX.rf', where XXXXX are one-up values. - SortedSet logFiles = new TreeSet<>(Comparator.comparing(Path::getName)); + TreeMap logFiles = new TreeMap<>(Comparator.comparing(Path::getName)); for (FileStatus child : fs.listStatus(directory)) { if (child.getPath().getName().startsWith("_")) { continue; @@ -156,7 +157,7 @@ private SortedSet getFiles(VolumeManager fs, Path directory) throws IOExce } FileSystem ns = fs.getFileSystemByPath(child.getPath()); Path fullLogPath = ns.makeQualified(child.getPath()); - logFiles.add(fullLogPath); + logFiles.put(fullLogPath, child); } if (!foundFinish) { throw new IOException( @@ -169,9 +170,10 @@ private SortedSet getFiles(VolumeManager fs, Path directory) throws IOExce * Check that the first entry in the WAL is OPEN. Only need to do this once. */ private void validateFirstKey(ServerContext context, CryptoService cs, FileSystem fs, - SortedSet logFiles, Path fullLogPath) throws IOException { + TreeMap logFiles, Path fullLogPath) throws IOException { + Entry first = logFiles.firstEntry(); try (FileSKVIterator fileIter = FileOperations.getInstance().newReaderBuilder() - .forFile(logFiles.first().toString(), fs, fs.getConf(), cs) + .forFile(first.getKey().toString(), fs, fs.getConf(), cs, first.getValue()) .withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) { Iterator> iterator = new IteratorAdapter(fileIter); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java index d332050aabc..c656b5b4aaa 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java @@ -120,7 +120,7 @@ public static Map> getFirstAndLastKeys(Tablet tab for (StoredTabletFile file : allFiles) { FileSystem ns = fs.getFileSystemByPath(file.getPath()); try (FileSKVIterator openReader = - fileFactory.newReaderBuilder().forFile(file.getPathStr(), ns, ns.getConf(), cs) + fileFactory.newReaderBuilder().forFile(file.getPathStr(), ns, ns.getConf(), cs, null) .withTableConfiguration(tableConf).seekToBeginning().build()) { Key first = openReader.getFirstKey(); Key last = openReader.getLastKey(); @@ -372,7 +372,7 @@ public Optional> getSample(CompactableFile fil FileSystem ns = tablet.getTabletServer().getVolumeManager().getFileSystemByPath(path); var tableConf = tablet.getTableConfiguration(); var fiter = fileFactory.newReaderBuilder() - .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService(), null) .withTableConfiguration(tableConf).seekToBeginning().build(); return Optional.ofNullable(fiter.getSample(new SamplerConfigurationImpl(sc))); } catch (IOException e) { diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java index 8d48ba72f08..d610e6944c7 100644 --- a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java +++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java @@ -202,7 +202,7 @@ private void handleWriteTests(boolean content) throws Exception { DefaultConfiguration acuconf = DefaultConfiguration.getInstance(); FileSKVIterator sample = RFileOperations.getInstance().newReaderBuilder() .forFile(files[0].toString(), FileSystem.getLocal(conf), conf, - NoCryptoServiceFactory.NONE) + NoCryptoServiceFactory.NONE, null) .withTableConfiguration(acuconf).build() .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG)); assertNotNull(sample); diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java index 606f9597591..522dc0515cd 100644 --- a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java +++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java @@ -220,7 +220,7 @@ private void handleWriteTests(boolean content) throws Exception { DefaultConfiguration acuconf = DefaultConfiguration.getInstance(); FileSKVIterator sample = RFileOperations.getInstance().newReaderBuilder() .forFile(files[0].toString(), FileSystem.getLocal(conf), conf, - NoCryptoServiceFactory.NONE) + NoCryptoServiceFactory.NONE, null) .withTableConfiguration(acuconf).build() .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG)); assertNotNull(sample);