diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java index 251523b445c..13b0e6ad2e4 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java @@ -234,7 +234,7 @@ private SortedKeyValueIterator createIterator(KeyExtent extent, var cs = CryptoFactoryLoader.getServiceForClientWithTable(systemConf, tableConf, tableId); FileSystem fs = VolumeConfiguration.fileSystemForPath(file.getPathStr(), conf); FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() - .forFile(file.getPathStr(), fs, conf, cs).withTableConfiguration(tableCC).build(); + .forFile(file.getPathStr(), fs, conf, cs, null).withTableConfiguration(tableCC).build(); if (scannerSamplerConfigImpl != null) { reader = reader.getSample(scannerSamplerConfigImpl); if (reader == null) { diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java index 51a23026526..575a221b8b6 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java @@ -266,8 +266,10 @@ public MLong(long i) { } public static Map estimateSizes(AccumuloConfiguration acuConf, Path mapFile, - long fileSize, Collection extents, FileSystem ns, Cache fileLenCache, - CryptoService cs) throws IOException { + FileStatus status, Collection extents, FileSystem ns, + Cache fileLenCache, CryptoService cs) throws IOException { + + final long fileSize = status.getLen(); if (extents.size() == 1) { return Collections.singletonMap(extents.iterator().next(), fileSize); @@ -282,7 +284,7 @@ public static Map estimateSizes(AccumuloConfiguration acuConf, P Text row = new Text(); FileSKVIterator index = FileOperations.getInstance().newIndexReaderBuilder() - .forFile(mapFile.toString(), ns, ns.getConf(), cs).withTableConfiguration(acuConf) + .forFile(mapFile.toString(), ns, ns.getConf(), cs, status).withTableConfiguration(acuConf) .withFileLenCache(fileLenCache).build(); try { @@ -365,9 +367,9 @@ private static Text nextRow(Text row) { public static List findOverlappingTablets(ClientContext context, KeyExtentCache keyExtentCache, Path file, FileSystem fs, Cache fileLenCache, - CryptoService cs) throws IOException { + CryptoService cs, FileStatus status) throws IOException { try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() - .forFile(file.toString(), fs, fs.getConf(), cs) + .forFile(file.toString(), fs, fs.getConf(), cs, status) .withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache) .seekToBeginning().build()) { @@ -574,12 +576,12 @@ public SortedMap computeFileToTabletMappings(FileSystem fs CompletableFuture> future = CompletableFuture.supplyAsync(() -> { try { long t1 = System.currentTimeMillis(); - List extents = - findOverlappingTablets(context, extentCache, filePath, fs, fileLensCache, cs); + List extents = findOverlappingTablets(context, extentCache, filePath, fs, + fileLensCache, cs, fileStatus); // make sure file isn't going to too many tablets checkTabletCount(maxTablets, extents.size(), filePath.toString()); Map estSizes = estimateSizes(context.getConfiguration(), filePath, - fileStatus.getLen(), extents, fs, fileLensCache, cs); + fileStatus, extents, fs, fileLensCache, cs); Map pathLocations = new HashMap<>(); for (KeyExtent ke : extents) { pathLocations.put(ke, new Bulk.FileInfo(filePath, estSizes.getOrDefault(ke, 0L))); diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java index a874ae952e4..c3acdbe9472 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@ -544,7 +544,7 @@ public static LoadPlan compute(URI file, Map properties) throws I CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, properties); var tableConf = SiteConfiguration.empty().withOverrides(properties).build(); try (var reader = FileOperations.getInstance().newReaderBuilder() - .forFile(file.toString(), fs, conf, cs).withTableConfiguration(tableConf).build();) { + .forFile(file.toString(), fs, conf, cs, null).withTableConfiguration(tableConf).build();) { var firstRow = reader.getFirstKey().getRow(); var lastRow = reader.getLastKey().getRow(); return LoadPlan.builder().loadFileTo(path.getName(), RangeType.FILE, firstRow, lastRow) diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java index ce5c392fdcd..66271e8c52e 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.mapred.FileOutputCommitter; @@ -191,12 +192,14 @@ public static class FileOptions { public final Set columnFamilies; public final boolean inclusive; public final boolean dropCacheBehind; + public final FileStatus status; public FileOptions(TableId tableId, AccumuloConfiguration tableConfiguration, String filename, FileSystem fs, Configuration fsConf, RateLimiter rateLimiter, String compression, FSDataOutputStream outputStream, boolean enableAccumuloStart, CacheProvider cacheProvider, Cache fileLenCache, boolean seekToBeginning, CryptoService cryptoService, - Range range, Set columnFamilies, boolean inclusive, boolean dropCacheBehind) { + Range range, Set columnFamilies, boolean inclusive, boolean dropCacheBehind, + FileStatus status) { this.tableId = tableId; this.tableConfiguration = tableConfiguration; this.filename = filename; @@ -214,6 +217,7 @@ public FileOptions(TableId tableId, AccumuloConfiguration tableConfiguration, St this.columnFamilies = columnFamilies; this.inclusive = inclusive; this.dropCacheBehind = dropCacheBehind; + this.status = status; } public TableId getTableId() { @@ -293,6 +297,7 @@ public static class FileHelper { private RateLimiter rateLimiter; private CryptoService cryptoService; private boolean dropCacheBehind = false; + private FileStatus status; protected FileHelper table(TableId tid) { this.tableId = tid; @@ -334,31 +339,36 @@ protected FileHelper dropCacheBehind(boolean drop) { return this; } + protected FileHelper fileStatus(FileStatus status) { + this.status = status; + return this; + } + protected FileOptions toWriterBuilderOptions(String compression, FSDataOutputStream outputStream, boolean startEnabled) { return new FileOptions(tableId, tableConfiguration, filename, fs, fsConf, rateLimiter, compression, outputStream, startEnabled, NULL_PROVIDER, null, false, cryptoService, null, - null, true, dropCacheBehind); + null, true, dropCacheBehind, status); } protected FileOptions toReaderBuilderOptions(CacheProvider cacheProvider, Cache fileLenCache, boolean seekToBeginning) { return new FileOptions(tableId, tableConfiguration, filename, fs, fsConf, rateLimiter, null, null, false, cacheProvider == null ? NULL_PROVIDER : cacheProvider, fileLenCache, - seekToBeginning, cryptoService, null, null, true, dropCacheBehind); + seekToBeginning, cryptoService, null, null, true, dropCacheBehind, status); } protected FileOptions toIndexReaderBuilderOptions(Cache fileLenCache) { return new FileOptions(tableId, tableConfiguration, filename, fs, fsConf, rateLimiter, null, null, false, NULL_PROVIDER, fileLenCache, false, cryptoService, null, null, true, - dropCacheBehind); + dropCacheBehind, status); } protected FileOptions toScanReaderBuilderOptions(Range range, Set columnFamilies, boolean inclusive) { return new FileOptions(tableId, tableConfiguration, filename, fs, fsConf, rateLimiter, null, null, false, NULL_PROVIDER, null, false, cryptoService, range, columnFamilies, inclusive, - dropCacheBehind); + dropCacheBehind, status); } protected AccumuloConfiguration getTableConfiguration() { @@ -436,8 +446,8 @@ public class ReaderBuilder extends FileHelper implements ReaderTableConfiguratio private boolean seekToBeginning = false; public ReaderTableConfiguration forFile(String filename, FileSystem fs, Configuration fsConf, - CryptoService cs) { - filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs); + CryptoService cs, FileStatus status) { + filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs).fileStatus(status); return this; } @@ -504,8 +514,8 @@ public class IndexReaderBuilder extends FileHelper implements IndexReaderTableCo private Cache fileLenCache = null; public IndexReaderTableConfiguration forFile(String filename, FileSystem fs, - Configuration fsConf, CryptoService cs) { - filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs); + Configuration fsConf, CryptoService cs, FileStatus status) { + filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs).fileStatus(status); return this; } @@ -536,8 +546,8 @@ public class ScanReaderBuilder extends FileHelper implements ScanReaderTableConf private boolean inclusive; public ScanReaderTableConfiguration forFile(String filename, FileSystem fs, - Configuration fsConf, CryptoService cs) { - filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs); + Configuration fsConf, CryptoService cs, FileStatus status) { + filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs).fileStatus(status); return this; } diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java index 19033e4394c..4c51ca9d8b3 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java @@ -26,6 +26,8 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -45,7 +47,9 @@ import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Seekable; import org.slf4j.Logger; @@ -86,30 +90,54 @@ public CachableBuilder conf(Configuration hadoopConf) { return this; } - public CachableBuilder fsPath(FileSystem fs, Path dataFile) { - return fsPath(fs, dataFile, false); + public CachableBuilder fsPath(FileSystem fs, Path dataFile, FileStatus status) { + return fsPath(fs, dataFile, false, status); } - public CachableBuilder fsPath(FileSystem fs, Path dataFile, boolean dropCacheBehind) { + public CachableBuilder fsPath(FileSystem fs, Path dataFile, boolean dropCacheBehind, + FileStatus status) { this.cacheId = pathToCacheId(dataFile); this.inputSupplier = () -> { - FSDataInputStream is = fs.open(dataFile); - if (dropCacheBehind) { - // Tell the DataNode that the write ahead log does not need to be cached in the OS page - // cache + FutureDataInputStreamBuilder builder = fs.openFile(dataFile); + if (status != null) { + builder.withFileStatus(status); + } + CompletableFuture future = builder.build(); + while (!future.isDone()) { try { - is.setDropBehind(Boolean.TRUE); - log.trace("Called setDropBehind(TRUE) for stream reading file {}", dataFile); - } catch (UnsupportedOperationException e) { - log.debug("setDropBehind not enabled for wal file: {}", dataFile); - } catch (IOException e) { - log.debug("IOException setting drop behind for file: {}, msg: {}", dataFile, - e.getMessage()); + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while opening file: " + dataFile, e); + } + } + try { + FSDataInputStream is = future.get(); + if (dropCacheBehind) { + // Tell the DataNode that the write ahead log does not need to be cached in the OS page + // cache + try { + is.setDropBehind(Boolean.TRUE); + log.trace("Called setDropBehind(TRUE) for stream reading file {}", dataFile); + } catch (UnsupportedOperationException e) { + log.debug("setDropBehind not enabled for wal file: {}", dataFile); + } catch (IOException e) { + log.debug("IOException setting drop behind for file: {}, msg: {}", dataFile, + e.getMessage()); + } } + return is; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while opening file: " + dataFile, e); + } catch (CancellationException e) { + throw new IOException("Cancelled while opening file: " + dataFile, e); + } catch (ExecutionException e) { + throw new IOException("Error trying to open file: " + dataFile, e); } - return is; }; - this.lengthSupplier = () -> fs.getFileStatus(dataFile).getLen(); + this.lengthSupplier = + () -> status == null ? fs.getFileStatus(dataFile).getLen() : status.getLen(); return this; } diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java index 3937ca383b4..f333c39ab59 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java @@ -304,7 +304,7 @@ private TreeSet getIndexKeys(AccumuloConfiguration accumuloConf, Configu try { for (Path file : files) { FileSKVIterator reader = FileOperations.getInstance().newIndexReaderBuilder() - .forFile(file.toString(), fs, hadoopConf, cs).withTableConfiguration(accumuloConf) + .forFile(file.toString(), fs, hadoopConf, cs, null).withTableConfiguration(accumuloConf) .build(); readers.add(reader); fileReaders.add(reader); @@ -333,7 +333,7 @@ private TreeSet getSplitsFromFullScan(SiteConfiguration accumuloConf, try { for (Path file : files) { FileSKVIterator reader = FileOperations.getInstance().newScanReaderBuilder() - .forFile(file.toString(), fs, hadoopConf, cs).withTableConfiguration(accumuloConf) + .forFile(file.toString(), fs, hadoopConf, cs, null).withTableConfiguration(accumuloConf) .overRange(new Range(), Set.of(), false).build(); readers.add(reader); fileReaders.add(reader); @@ -367,7 +367,7 @@ private TreeSet getSplitsBySize(AccumuloConfiguration accumuloConf, try { for (Path file : files) { FileSKVIterator reader = FileOperations.getInstance().newScanReaderBuilder() - .forFile(file.toString(), fs, hadoopConf, cs).withTableConfiguration(accumuloConf) + .forFile(file.toString(), fs, hadoopConf, cs, null).withTableConfiguration(accumuloConf) .overRange(new Range(), Set.of(), false).build(); readers.add(reader); fileReaders.add(reader); diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java index 1c3f5c90f4e..11a488cefe5 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/PrintInfo.java @@ -197,7 +197,8 @@ public void execute(final String[] args) throws Exception { CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, siteConfig.getAllCryptoProperties()); - CachableBuilder cb = new CachableBuilder().fsPath(fs, path).conf(conf).cryptoService(cs); + CachableBuilder cb = + new CachableBuilder().fsPath(fs, path, null).conf(conf).cryptoService(cs); Reader iter = new RFile.Reader(cb); MetricsGatherer>> vmg = new VisMetricsGatherer(); diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java index c87b0716424..b20ebfa3765 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java @@ -62,7 +62,8 @@ public class RFileOperations extends FileOperations { private static RFile.Reader getReader(FileOptions options) throws IOException { CachableBuilder cb = new CachableBuilder() - .fsPath(options.getFileSystem(), new Path(options.getFilename()), options.dropCacheBehind) + .fsPath(options.getFileSystem(), new Path(options.getFilename()), options.dropCacheBehind, + options.status) .conf(options.getConfiguration()).fileLen(options.getFileLenCache()) .cacheProvider(options.cacheProvider).readLimiter(options.getRateLimiter()) .cryptoService(options.getCryptoService()); diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java index e3adf9e5175..e4a33ad4043 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java @@ -80,7 +80,8 @@ public void execute(String[] args) throws Exception { AccumuloConfiguration aconf = opts.getSiteConfiguration(); CryptoService cs = CryptoFactoryLoader.getServiceForServer(aconf); Path path = new Path(file); - CachableBuilder cb = new CachableBuilder().fsPath(fs, path).conf(conf).cryptoService(cs); + CachableBuilder cb = + new CachableBuilder().fsPath(fs, path, null).conf(conf).cryptoService(cs); try (Reader iter = new RFile.Reader(cb)) { if (!file.endsWith(".rf")) { diff --git a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java index 83c06339747..3645e5e15df 100644 --- a/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java +++ b/core/src/main/java/org/apache/accumulo/core/summary/SummaryReader.java @@ -196,8 +196,9 @@ public static SummaryReader load(FileSystem fs, Configuration conf, SummarizerFa CompositeCache compositeCache = new CompositeCache(BlockCacheUtil.instrument(CacheType.SUMMARY, summaryCache), BlockCacheUtil.instrument(CacheType.INDEX, indexCache)); - CachableBuilder cb = new CachableBuilder().fsPath(fs, file).conf(conf).fileLen(fileLenCache) - .cacheProvider(new BasicCacheProvider(compositeCache, null)).cryptoService(cryptoService); + CachableBuilder cb = new CachableBuilder().fsPath(fs, file, null).conf(conf) + .fileLen(fileLenCache).cacheProvider(new BasicCacheProvider(compositeCache, null)) + .cryptoService(cryptoService); bcReader = new CachableBlockFile.Reader(cb); return load(bcReader, summarySelector, factory); } catch (FileNotFoundException fne) { diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java index ae3dccaecb2..68751641e6c 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java @@ -822,7 +822,7 @@ public void testWrongGroup() throws Exception { private Reader getReader(LocalFileSystem localFs, String testFile) throws IOException { return (Reader) FileOperations.getInstance().newReaderBuilder() - .forFile(testFile, localFs, localFs.getConf(), NoCryptoServiceFactory.NONE) + .forFile(testFile, localFs, localFs.getConf(), NoCryptoServiceFactory.NONE, null) .withTableConfiguration(DefaultConfiguration.getInstance()).build(); } diff --git a/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java b/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java index 1d0b5ef539d..3ebb898595b 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java @@ -99,7 +99,7 @@ public void test() throws IOException { t1 = System.currentTimeMillis(); FileSKVIterator bmfr = FileOperations.getInstance().newReaderBuilder() - .forFile(fname, fs, conf, NoCryptoServiceFactory.NONE).withTableConfiguration(acuconf) + .forFile(fname, fs, conf, NoCryptoServiceFactory.NONE, null).withTableConfiguration(acuconf) .build(); t2 = System.currentTimeMillis(); log.debug("Opened {} in {}", fname, (t2 - t1)); diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java index 8bc20a1f937..b876df818ad 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java @@ -185,7 +185,7 @@ public void openReader() throws IOException { AccumuloConfiguration defaultConf = DefaultConfiguration.getInstance(); // the caches used to obfuscate the multithreaded issues - CachableBuilder b = new CachableBuilder().fsPath(fs, path).conf(conf) + CachableBuilder b = new CachableBuilder().fsPath(fs, path, null).conf(conf) .cryptoService(CryptoFactoryLoader.getServiceForServer(defaultConf)); reader = new RFile.Reader(new CachableBlockFile.Reader(b)); iter = new ColumnFamilySkippingIterator(reader); diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloFileOutputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloFileOutputFormatIT.java index bcebf005bd8..5815eafaf6e 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloFileOutputFormatIT.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapred/AccumuloFileOutputFormatIT.java @@ -199,7 +199,7 @@ private void handleWriteTests(boolean content) throws Exception { DefaultConfiguration acuconf = DefaultConfiguration.getInstance(); FileSKVIterator sample = FileOperations.getInstance().newReaderBuilder() .forFile(files[0].toString(), FileSystem.getLocal(conf), conf, - NoCryptoServiceFactory.NONE) + NoCryptoServiceFactory.NONE, null) .withTableConfiguration(acuconf).build() .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG)); assertNotNull(sample); diff --git a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloFileOutputFormatIT.java b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloFileOutputFormatIT.java index 287f41550b4..76c11d72c27 100644 --- a/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloFileOutputFormatIT.java +++ b/hadoop-mapreduce/src/test/java/org/apache/accumulo/hadoop/its/mapreduce/AccumuloFileOutputFormatIT.java @@ -214,7 +214,7 @@ private void handleWriteTests(boolean content) throws Exception { DefaultConfiguration acuconf = DefaultConfiguration.getInstance(); FileSKVIterator sample = RFileOperations.getInstance().newReaderBuilder() .forFile(files[0].toString(), FileSystem.getLocal(conf), conf, - NoCryptoServiceFactory.NONE) + NoCryptoServiceFactory.NONE, null) .withTableConfiguration(acuconf).build() .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG)); assertNotNull(sample); diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java index 8d224b6b450..61457d907e0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java +++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java @@ -65,6 +65,7 @@ import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -126,6 +127,8 @@ public AssignmentStats importFiles(List files) { final Map> completeFailures = Collections.synchronizedSortedMap(new TreeMap<>()); + final Map fileStatuses = new HashMap<>(); + ClientService.Client client = null; final TabletLocator locator = TabletLocator.getLocator(context, tableId); @@ -143,8 +146,10 @@ public AssignmentStats importFiles(List files) { Runnable getAssignments = () -> { List tabletsToAssignMapFileTo = Collections.emptyList(); try { - tabletsToAssignMapFileTo = - findOverlappingTablets(context, fs, locator, mapFile, tableConf.getCryptoService()); + FileStatus status = fs.getFileStatus(mapFile); + fileStatuses.put(mapFile, status); + tabletsToAssignMapFileTo = findOverlappingTablets(context, fs, locator, mapFile, + tableConf.getCryptoService(), status); } catch (Exception ex) { log.warn("Unable to find tablets that overlap file " + mapFile, ex); } @@ -173,7 +178,7 @@ public AssignmentStats importFiles(List files) { assignmentStats.attemptingAssignments(assignments); Map> assignmentFailures = - assignMapFiles(fs, assignments, paths, numAssignThreads, numThreads); + assignMapFiles(fs, assignments, paths, numAssignThreads, numThreads, fileStatuses); assignmentStats.assignmentsFailed(assignmentFailures); Map failureCount = new TreeMap<>(); @@ -212,8 +217,9 @@ public AssignmentStats importFiles(List files) { timer.start(Timers.QUERY_METADATA); try { - tabletsToAssignMapFileTo.addAll(findOverlappingTablets(context, fs, locator, - entry.getKey(), ke, tableConf.getCryptoService())); + Path p = entry.getKey(); + tabletsToAssignMapFileTo.addAll(findOverlappingTablets(context, fs, locator, p, ke, + tableConf.getCryptoService(), fileStatuses.get(p))); keListIter.remove(); } catch (Exception ex) { log.warn("Exception finding overlapping tablets, will retry tablet " + ke, ex); @@ -228,7 +234,7 @@ public AssignmentStats importFiles(List files) { assignmentStats.attemptingAssignments(assignments); Map> assignmentFailures2 = - assignMapFiles(fs, assignments, paths, numAssignThreads, numThreads); + assignMapFiles(fs, assignments, paths, numAssignThreads, numThreads, fileStatuses); assignmentStats.assignmentsFailed(assignmentFailures2); // merge assignmentFailures2 into assignmentFailures @@ -347,15 +353,21 @@ private static List extentsOf(List locations) { } private Map> estimateSizes(final VolumeManager vm, - Map> assignments, Collection paths, int numThreads) { + Map> assignments, Collection paths, int numThreads, + Map statuses) { long t1 = System.currentTimeMillis(); final Map mapFileSizes = new TreeMap<>(); try { for (Path path : paths) { - FileSystem fs = vm.getFileSystemByPath(path); - mapFileSizes.put(path, fs.getContentSummary(path).getLength()); + FileStatus status = statuses.get(path); + if (status == null) { + FileSystem fs = vm.getFileSystemByPath(path); + mapFileSizes.put(path, fs.getContentSummary(path).getLength()); + } else { + mapFileSizes.put(path, status.getLen()); + } } } catch (IOException e) { log.error("Failed to get map files in for {}: {}", paths, e.getMessage(), e); @@ -386,9 +398,9 @@ private Map> estimateSizes(final VolumeManager vm, Path mapFile = entry.getKey(); FileSystem ns = context.getVolumeManager().getFileSystemByPath(mapFile); - estimatedSizes = BulkImport.estimateSizes(context.getConfiguration(), mapFile, - mapFileSizes.get(entry.getKey()), extentsOf(entry.getValue()), ns, null, - tableConf.getCryptoService()); + estimatedSizes = + BulkImport.estimateSizes(context.getConfiguration(), mapFile, statuses.get(mapFile), + extentsOf(entry.getValue()), ns, null, tableConf.getCryptoService()); } catch (IOException e) { log.warn("Failed to estimate map file sizes {}", e.getMessage()); } @@ -447,10 +459,10 @@ private static Map locationsOf(Map> private Map> assignMapFiles(VolumeManager fs, Map> assignments, Collection paths, int numThreads, - int numMapThreads) { + int numMapThreads, Map statuses) { timer.start(Timers.EXAMINE_MAP_FILES); Map> assignInfo = - estimateSizes(fs, assignments, paths, numMapThreads); + estimateSizes(fs, assignments, paths, numMapThreads, statuses); timer.stop(Timers.EXAMINE_MAP_FILES); Map> ret; @@ -623,15 +635,16 @@ private List assignMapFiles(ClientContext context, HostAndPort locati } public static List findOverlappingTablets(ServerContext context, VolumeManager fs, - TabletLocator locator, Path file, CryptoService cs) throws Exception { - return findOverlappingTablets(context, fs, locator, file, null, null, cs); + TabletLocator locator, Path file, CryptoService cs, FileStatus status) throws Exception { + return findOverlappingTablets(context, fs, locator, file, null, null, cs, status); } public static List findOverlappingTablets(ServerContext context, VolumeManager fs, - TabletLocator locator, Path file, KeyExtent failed, CryptoService cs) throws Exception { + TabletLocator locator, Path file, KeyExtent failed, CryptoService cs, FileStatus status) + throws Exception { locator.invalidateCache(failed); Text start = getStartRowForExtent(failed); - return findOverlappingTablets(context, fs, locator, file, start, failed.endRow(), cs); + return findOverlappingTablets(context, fs, locator, file, start, failed.endRow(), cs, status); } protected static Text getStartRowForExtent(KeyExtent extent) { @@ -648,16 +661,16 @@ protected static Text getStartRowForExtent(KeyExtent extent) { static final byte[] byte0 = {0}; public static List findOverlappingTablets(ServerContext context, VolumeManager vm, - TabletLocator locator, Path file, Text startRow, Text endRow, CryptoService cs) - throws Exception { + TabletLocator locator, Path file, Text startRow, Text endRow, CryptoService cs, + FileStatus status) throws Exception { List result = new ArrayList<>(); Collection columnFamilies = Collections.emptyList(); String filename = file.toString(); // log.debug(filename + " finding overlapping tablets " + startRow + " -> " + endRow); FileSystem fs = vm.getFileSystemByPath(file); - try (FileSKVIterator reader = - FileOperations.getInstance().newReaderBuilder().forFile(filename, fs, fs.getConf(), cs) - .withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) { + try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() + .forFile(filename, fs, fs.getConf(), cs, status) + .withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) { Text row = startRow; if (row == null) { row = new Text(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java index 1045d13680f..e3df4837b0a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java @@ -494,7 +494,7 @@ private List> openMapDataFiles( } ReaderBuilder readerBuilder = fileFactory.newReaderBuilder() - .forFile(mapFile.getPathStr(), fs, fs.getConf(), cryptoService) + .forFile(mapFile.getPathStr(), fs, fs.getConf(), cryptoService, null) .withTableConfiguration(acuTableConf).withRateLimiter(env.getReadLimiter()); if (dropCacheBehindCompactionInputFile) { readerBuilder.dropCachesBehind(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java index 9a3aad33c35..c0ea25df621 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java @@ -310,7 +310,7 @@ private Map reserveReaders(KeyExtent tablet, Collection< FileSystem ns = context.getVolumeManager().getFileSystemByPath(path); // log.debug("Opening "+file + " path " + path); FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() - .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService(), null) .withTableConfiguration(tableConf).withCacheProvider(cacheProvider) .withFileLenCache(fileLenCache).build(); readersReserved.put(reader, file); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java index 3c189644df8..a2a43ea9d4a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java @@ -48,6 +48,7 @@ import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -150,7 +151,7 @@ public static Collection reduceFiles(ServerContext context, TableConfigu for (String file : inFiles) { ns = context.getVolumeManager().getFileSystemByPath(new Path(file)); reader = FileOperations.getInstance().newIndexReaderBuilder() - .forFile(file, ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(file, ns, ns.getConf(), tableConf.getCryptoService(), null) .withTableConfiguration(tableConf).build(); iters.add(reader); } @@ -434,14 +435,15 @@ private static long countIndexEntries(ServerContext context, TableConfiguration FileSKVIterator reader = null; Path path = new Path(file); FileSystem ns = context.getVolumeManager().getFileSystemByPath(path); + FileStatus status = ns.getFileStatus(path); try { if (useIndex) { reader = FileOperations.getInstance().newIndexReaderBuilder() - .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService(), status) .withTableConfiguration(tableConf).build(); } else { reader = FileOperations.getInstance().newScanReaderBuilder() - .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService(), status) .withTableConfiguration(tableConf) .overRange(new Range(prevEndRow, false, null, true), Set.of(), false).build(); } @@ -468,11 +470,11 @@ private static long countIndexEntries(ServerContext context, TableConfiguration if (useIndex) { readers.add(FileOperations.getInstance().newIndexReaderBuilder() - .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService(), status) .withTableConfiguration(tableConf).build()); } else { readers.add(FileOperations.getInstance().newScanReaderBuilder() - .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService(), status) .withTableConfiguration(tableConf) .overRange(new Range(prevEndRow, false, null, true), Set.of(), false).build()); } @@ -494,7 +496,7 @@ public static Map tryToGetFirstAndLastRows(ServerContext co FileSystem ns = context.getVolumeManager().getFileSystemByPath(mapfile.getPath()); try { reader = FileOperations.getInstance().newReaderBuilder() - .forFile(mapfile.getPathStr(), ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(mapfile.getPathStr(), ns, ns.getConf(), tableConf.getCryptoService(), null) .withTableConfiguration(tableConf).build(); Key firstKey = reader.getFirstKey(); @@ -532,7 +534,7 @@ public static WritableComparable findLastKey(ServerContext context, for (TabletFile file : mapFiles) { FileSystem ns = context.getVolumeManager().getFileSystemByPath(file.getPath()); FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() - .forFile(file.getPathStr(), ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(file.getPathStr(), ns, ns.getConf(), tableConf.getCryptoService(), null) .withTableConfiguration(tableConf).seekToBeginning().build(); try { diff --git a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java index 9b69efa1963..5e0647d6971 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java @@ -144,8 +144,8 @@ public void testFindOverlappingTablets() throws Exception { writer.append(new Key("xyzzy", "cf", "cq"), empty); writer.close(); try (var vm = VolumeManagerImpl.getLocalForTesting("file:///")) { - List overlaps = - BulkImporter.findOverlappingTablets(context, vm, locator, new Path(file), null, null, cs); + List overlaps = BulkImporter.findOverlappingTablets(context, vm, locator, + new Path(file), null, null, cs, null); assertEquals(5, overlaps.size()); Collections.sort(overlaps); assertEquals(new KeyExtent(tableId, new Text("a"), null), overlaps.get(0).tablet_extent); @@ -158,7 +158,7 @@ public void testFindOverlappingTablets() throws Exception { assertEquals(new KeyExtent(tableId, null, new Text("l")), overlaps.get(4).tablet_extent); List overlaps2 = BulkImporter.findOverlappingTablets(context, vm, locator, - new Path(file), new KeyExtent(tableId, new Text("h"), new Text("b")), cs); + new Path(file), new KeyExtent(tableId, new Text("h"), new Text("b")), cs, null); assertEquals(3, overlaps2.size()); assertEquals(new KeyExtent(tableId, new Text("d"), new Text("cm")), overlaps2.get(0).tablet_extent); diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index bc6d7e2bb78..67017e6f533 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -711,9 +711,9 @@ private long estimateOverlappingEntries(KeyExtent extent, StoredTabletFile file, FileOperations fileFactory = FileOperations.getInstance(); FileSystem fs = getContext().getVolumeManager().getFileSystemByPath(file.getPath()); - try (FileSKVIterator reader = - fileFactory.newReaderBuilder().forFile(file.getPathStr(), fs, fs.getConf(), cryptoService) - .withTableConfiguration(tableConf).dropCachesBehind().build()) { + try (FileSKVIterator reader = fileFactory.newReaderBuilder() + .forFile(file.getPathStr(), fs, fs.getConf(), cryptoService, null) + .withTableConfiguration(tableConf).dropCachesBehind().build()) { return reader.estimateOverlappingEntries(extent); } catch (IOException ioe) { throw new UncheckedIOException(ioe); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java index 9250d3a9ca4..8d65049df7e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java @@ -473,7 +473,7 @@ MetadataTime computeRootTabletTime(ServerContext context, Collection goo var tableConf = context.getTableConfiguration(RootTable.ID); long maxTime = -1; try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() - .forFile(path.toString(), ns, ns.getConf(), NoCryptoServiceFactory.NONE) + .forFile(path.toString(), ns, ns.getConf(), NoCryptoServiceFactory.NONE, null) .withTableConfiguration(tableConf).seekToBeginning().build()) { while (reader.hasTop()) { maxTime = Math.max(maxTime, reader.getTopKey().getTimestamp()); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java index 306a4e7d270..6d4a1e65b04 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java @@ -602,7 +602,7 @@ private synchronized FileSKVIterator getReader() throws IOException { TableConfiguration tableConf = context.getTableConfiguration(tableId); reader = new RFileOperations().newReaderBuilder() - .forFile(memDumpFile, fs, conf, tableConf.getCryptoService()) + .forFile(memDumpFile, fs, conf, tableConf.getCryptoService(), null) .withTableConfiguration(tableConf).seekToBeginning().build(); if (iflag != null) { reader.setInterruptFlag(iflag); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java index f2a1a5c990d..b56612314a6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java @@ -26,8 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; -import java.util.SortedSet; -import java.util.TreeSet; +import java.util.TreeMap; import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl; import org.apache.accumulo.core.data.Key; @@ -80,7 +79,7 @@ public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, L for (Path logDir : recoveryLogDirs) { LOG.debug("Opening recovery log dir {}", logDir.getName()); - SortedSet logFiles = getFiles(vm, logDir); + TreeMap logFiles = getFiles(vm, logDir); var fs = vm.getFileSystemByPath(logDir); // only check the first key once to prevent extra iterator creation and seeking @@ -88,9 +87,9 @@ public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, L validateFirstKey(context, cryptoService, fs, logFiles, logDir); } - for (Path log : logFiles) { + for (Entry entry : logFiles.entrySet()) { FileSKVIterator fileIter = FileOperations.getInstance().newReaderBuilder() - .forFile(log.toString(), fs, fs.getConf(), cryptoService) + .forFile(entry.getKey().toString(), fs, fs.getConf(), cryptoService, entry.getValue()) .withTableConfiguration(context.getConfiguration()).seekToBeginning().build(); if (range != null) { fileIter.seek(range, Collections.emptySet(), false); @@ -98,11 +97,13 @@ public RecoveryLogsIterator(ServerContext context, List recoveryLogDirs, L Iterator> scanIter = new IteratorAdapter(fileIter); if (scanIter.hasNext()) { - LOG.debug("Write ahead log {} has data in range {} {}", log.getName(), start, end); + LOG.debug("Write ahead log {} has data in range {} {}", entry.getKey().getName(), start, + end); iterators.add(scanIter); fileIters.add(fileIter); } else { - LOG.debug("Write ahead log {} has no data in range {} {}", log.getName(), start, end); + LOG.debug("Write ahead log {} has no data in range {} {}", entry.getKey().getName(), + start, end); fileIter.close(); } } @@ -137,12 +138,12 @@ public void close() throws IOException { /** * Check for sorting signal files (finished/failed) and get the logs in the provided directory. */ - private SortedSet getFiles(VolumeManager fs, Path directory) throws IOException { + private TreeMap getFiles(VolumeManager fs, Path directory) throws IOException { boolean foundFinish = false; // Path::getName compares the last component of each Path value. In this case, the last // component should // always have the format 'part-r-XXXXX.rf', where XXXXX are one-up values. - SortedSet logFiles = new TreeSet<>(Comparator.comparing(Path::getName)); + TreeMap logFiles = new TreeMap<>(Comparator.comparing(Path::getName)); for (FileStatus child : fs.listStatus(directory)) { if (child.getPath().getName().startsWith("_")) { continue; @@ -156,7 +157,7 @@ private SortedSet getFiles(VolumeManager fs, Path directory) throws IOExce } FileSystem ns = fs.getFileSystemByPath(child.getPath()); Path fullLogPath = ns.makeQualified(child.getPath()); - logFiles.add(fullLogPath); + logFiles.put(fullLogPath, child); } if (!foundFinish) { throw new IOException( @@ -169,9 +170,10 @@ private SortedSet getFiles(VolumeManager fs, Path directory) throws IOExce * Check that the first entry in the WAL is OPEN. Only need to do this once. */ private void validateFirstKey(ServerContext context, CryptoService cs, FileSystem fs, - SortedSet logFiles, Path fullLogPath) throws IOException { + TreeMap logFiles, Path fullLogPath) throws IOException { + Entry first = logFiles.firstEntry(); try (FileSKVIterator fileIter = FileOperations.getInstance().newReaderBuilder() - .forFile(logFiles.first().toString(), fs, fs.getConf(), cs) + .forFile(first.getKey().toString(), fs, fs.getConf(), cs, first.getValue()) .withTableConfiguration(context.getConfiguration()).seekToBeginning().build()) { Iterator> iterator = new IteratorAdapter(fileIter); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java index d332050aabc..c656b5b4aaa 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java @@ -120,7 +120,7 @@ public static Map> getFirstAndLastKeys(Tablet tab for (StoredTabletFile file : allFiles) { FileSystem ns = fs.getFileSystemByPath(file.getPath()); try (FileSKVIterator openReader = - fileFactory.newReaderBuilder().forFile(file.getPathStr(), ns, ns.getConf(), cs) + fileFactory.newReaderBuilder().forFile(file.getPathStr(), ns, ns.getConf(), cs, null) .withTableConfiguration(tableConf).seekToBeginning().build()) { Key first = openReader.getFirstKey(); Key last = openReader.getLastKey(); @@ -372,7 +372,7 @@ public Optional> getSample(CompactableFile fil FileSystem ns = tablet.getTabletServer().getVolumeManager().getFileSystemByPath(path); var tableConf = tablet.getTableConfiguration(); var fiter = fileFactory.newReaderBuilder() - .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService()) + .forFile(path.toString(), ns, ns.getConf(), tableConf.getCryptoService(), null) .withTableConfiguration(tableConf).seekToBeginning().build(); return Optional.ofNullable(fiter.getSample(new SamplerConfigurationImpl(sc))); } catch (IOException e) { diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java index 8d48ba72f08..d610e6944c7 100644 --- a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java +++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java @@ -202,7 +202,7 @@ private void handleWriteTests(boolean content) throws Exception { DefaultConfiguration acuconf = DefaultConfiguration.getInstance(); FileSKVIterator sample = RFileOperations.getInstance().newReaderBuilder() .forFile(files[0].toString(), FileSystem.getLocal(conf), conf, - NoCryptoServiceFactory.NONE) + NoCryptoServiceFactory.NONE, null) .withTableConfiguration(acuconf).build() .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG)); assertNotNull(sample); diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java index 606f9597591..522dc0515cd 100644 --- a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java +++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java @@ -220,7 +220,7 @@ private void handleWriteTests(boolean content) throws Exception { DefaultConfiguration acuconf = DefaultConfiguration.getInstance(); FileSKVIterator sample = RFileOperations.getInstance().newReaderBuilder() .forFile(files[0].toString(), FileSystem.getLocal(conf), conf, - NoCryptoServiceFactory.NONE) + NoCryptoServiceFactory.NONE, null) .withTableConfiguration(acuconf).build() .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG)); assertNotNull(sample);