Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private SortedKeyValueIterator<Key,Value> createIterator(KeyExtent extent,
var cs = CryptoFactoryLoader.getServiceForClientWithTable(systemConf, tableConf, tableId);
FileSystem fs = VolumeConfiguration.fileSystemForPath(file.getPathStr(), conf);
FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
.forFile(file.getPathStr(), fs, conf, cs).withTableConfiguration(tableCC).build();
.forFile(file.getPathStr(), fs, conf, cs, null).withTableConfiguration(tableCC).build();
if (scannerSamplerConfigImpl != null) {
reader = reader.getSample(scannerSamplerConfigImpl);
if (reader == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,10 @@ public MLong(long i) {
}

public static Map<KeyExtent,Long> estimateSizes(AccumuloConfiguration acuConf, Path mapFile,
long fileSize, Collection<KeyExtent> extents, FileSystem ns, Cache<String,Long> fileLenCache,
CryptoService cs) throws IOException {
FileStatus status, Collection<KeyExtent> extents, FileSystem ns,
Cache<String,Long> fileLenCache, CryptoService cs) throws IOException {

final long fileSize = status.getLen();

if (extents.size() == 1) {
return Collections.singletonMap(extents.iterator().next(), fileSize);
Expand All @@ -282,7 +284,7 @@ public static Map<KeyExtent,Long> estimateSizes(AccumuloConfiguration acuConf, P
Text row = new Text();

FileSKVIterator index = FileOperations.getInstance().newIndexReaderBuilder()
.forFile(mapFile.toString(), ns, ns.getConf(), cs).withTableConfiguration(acuConf)
.forFile(mapFile.toString(), ns, ns.getConf(), cs, status).withTableConfiguration(acuConf)
.withFileLenCache(fileLenCache).build();

try {
Expand Down Expand Up @@ -365,9 +367,9 @@ private static Text nextRow(Text row) {

public static List<KeyExtent> findOverlappingTablets(ClientContext context,
KeyExtentCache keyExtentCache, Path file, FileSystem fs, Cache<String,Long> fileLenCache,
CryptoService cs) throws IOException {
CryptoService cs, FileStatus status) throws IOException {
try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder()
.forFile(file.toString(), fs, fs.getConf(), cs)
.forFile(file.toString(), fs, fs.getConf(), cs, status)
.withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache)
.seekToBeginning().build()) {

Expand Down Expand Up @@ -574,12 +576,12 @@ public SortedMap<KeyExtent,Bulk.Files> computeFileToTabletMappings(FileSystem fs
CompletableFuture<Map<KeyExtent,Bulk.FileInfo>> future = CompletableFuture.supplyAsync(() -> {
try {
long t1 = System.currentTimeMillis();
List<KeyExtent> extents =
findOverlappingTablets(context, extentCache, filePath, fs, fileLensCache, cs);
List<KeyExtent> extents = findOverlappingTablets(context, extentCache, filePath, fs,
fileLensCache, cs, fileStatus);
// make sure file isn't going to too many tablets
checkTabletCount(maxTablets, extents.size(), filePath.toString());
Map<KeyExtent,Long> estSizes = estimateSizes(context.getConfiguration(), filePath,
fileStatus.getLen(), extents, fs, fileLensCache, cs);
fileStatus, extents, fs, fileLensCache, cs);
Map<KeyExtent,Bulk.FileInfo> pathLocations = new HashMap<>();
for (KeyExtent ke : extents) {
pathLocations.put(ke, new Bulk.FileInfo(filePath, estSizes.getOrDefault(ke, 0L)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ public static LoadPlan compute(URI file, Map<String,String> properties) throws I
CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, properties);
var tableConf = SiteConfiguration.empty().withOverrides(properties).build();
try (var reader = FileOperations.getInstance().newReaderBuilder()
.forFile(file.toString(), fs, conf, cs).withTableConfiguration(tableConf).build();) {
.forFile(file.toString(), fs, conf, cs, null).withTableConfiguration(tableConf).build();) {
var firstRow = reader.getFirstKey().getRow();
var lastRow = reader.getLastKey().getRow();
return LoadPlan.builder().loadFileTo(path.getName(), RangeType.FILE, firstRow, lastRow)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.FileOutputCommitter;

Expand Down Expand Up @@ -191,12 +192,14 @@ public static class FileOptions {
public final Set<ByteSequence> columnFamilies;
public final boolean inclusive;
public final boolean dropCacheBehind;
public final FileStatus status;

public FileOptions(TableId tableId, AccumuloConfiguration tableConfiguration, String filename,
FileSystem fs, Configuration fsConf, RateLimiter rateLimiter, String compression,
FSDataOutputStream outputStream, boolean enableAccumuloStart, CacheProvider cacheProvider,
Cache<String,Long> fileLenCache, boolean seekToBeginning, CryptoService cryptoService,
Range range, Set<ByteSequence> columnFamilies, boolean inclusive, boolean dropCacheBehind) {
Range range, Set<ByteSequence> columnFamilies, boolean inclusive, boolean dropCacheBehind,
FileStatus status) {
this.tableId = tableId;
this.tableConfiguration = tableConfiguration;
this.filename = filename;
Expand All @@ -214,6 +217,7 @@ public FileOptions(TableId tableId, AccumuloConfiguration tableConfiguration, St
this.columnFamilies = columnFamilies;
this.inclusive = inclusive;
this.dropCacheBehind = dropCacheBehind;
this.status = status;
}

public TableId getTableId() {
Expand Down Expand Up @@ -293,6 +297,7 @@ public static class FileHelper {
private RateLimiter rateLimiter;
private CryptoService cryptoService;
private boolean dropCacheBehind = false;
private FileStatus status;

protected FileHelper table(TableId tid) {
this.tableId = tid;
Expand Down Expand Up @@ -334,31 +339,36 @@ protected FileHelper dropCacheBehind(boolean drop) {
return this;
}

protected FileHelper fileStatus(FileStatus status) {
this.status = status;
return this;
}

protected FileOptions toWriterBuilderOptions(String compression,
FSDataOutputStream outputStream, boolean startEnabled) {
return new FileOptions(tableId, tableConfiguration, filename, fs, fsConf, rateLimiter,
compression, outputStream, startEnabled, NULL_PROVIDER, null, false, cryptoService, null,
null, true, dropCacheBehind);
null, true, dropCacheBehind, status);
}

protected FileOptions toReaderBuilderOptions(CacheProvider cacheProvider,
Cache<String,Long> fileLenCache, boolean seekToBeginning) {
return new FileOptions(tableId, tableConfiguration, filename, fs, fsConf, rateLimiter, null,
null, false, cacheProvider == null ? NULL_PROVIDER : cacheProvider, fileLenCache,
seekToBeginning, cryptoService, null, null, true, dropCacheBehind);
seekToBeginning, cryptoService, null, null, true, dropCacheBehind, status);
}

protected FileOptions toIndexReaderBuilderOptions(Cache<String,Long> fileLenCache) {
return new FileOptions(tableId, tableConfiguration, filename, fs, fsConf, rateLimiter, null,
null, false, NULL_PROVIDER, fileLenCache, false, cryptoService, null, null, true,
dropCacheBehind);
dropCacheBehind, status);
}

protected FileOptions toScanReaderBuilderOptions(Range range, Set<ByteSequence> columnFamilies,
boolean inclusive) {
return new FileOptions(tableId, tableConfiguration, filename, fs, fsConf, rateLimiter, null,
null, false, NULL_PROVIDER, null, false, cryptoService, range, columnFamilies, inclusive,
dropCacheBehind);
dropCacheBehind, status);
}

protected AccumuloConfiguration getTableConfiguration() {
Expand Down Expand Up @@ -436,8 +446,8 @@ public class ReaderBuilder extends FileHelper implements ReaderTableConfiguratio
private boolean seekToBeginning = false;

public ReaderTableConfiguration forFile(String filename, FileSystem fs, Configuration fsConf,
CryptoService cs) {
filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs);
CryptoService cs, FileStatus status) {
filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs).fileStatus(status);
return this;
}

Expand Down Expand Up @@ -504,8 +514,8 @@ public class IndexReaderBuilder extends FileHelper implements IndexReaderTableCo
private Cache<String,Long> fileLenCache = null;

public IndexReaderTableConfiguration forFile(String filename, FileSystem fs,
Configuration fsConf, CryptoService cs) {
filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs);
Configuration fsConf, CryptoService cs, FileStatus status) {
filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs).fileStatus(status);
return this;
}

Expand Down Expand Up @@ -536,8 +546,8 @@ public class ScanReaderBuilder extends FileHelper implements ScanReaderTableConf
private boolean inclusive;

public ScanReaderTableConfiguration forFile(String filename, FileSystem fs,
Configuration fsConf, CryptoService cs) {
filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs);
Configuration fsConf, CryptoService cs, FileStatus status) {
filename(filename).fs(fs).fsConf(fsConf).cryptoService(cs).fileStatus(status);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand All @@ -45,7 +47,9 @@
import org.apache.accumulo.core.util.ratelimit.RateLimiter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -86,30 +90,54 @@ public CachableBuilder conf(Configuration hadoopConf) {
return this;
}

public CachableBuilder fsPath(FileSystem fs, Path dataFile) {
return fsPath(fs, dataFile, false);
public CachableBuilder fsPath(FileSystem fs, Path dataFile, FileStatus status) {
return fsPath(fs, dataFile, false, status);
}

public CachableBuilder fsPath(FileSystem fs, Path dataFile, boolean dropCacheBehind) {
public CachableBuilder fsPath(FileSystem fs, Path dataFile, boolean dropCacheBehind,
FileStatus status) {
this.cacheId = pathToCacheId(dataFile);
this.inputSupplier = () -> {
FSDataInputStream is = fs.open(dataFile);
if (dropCacheBehind) {
// Tell the DataNode that the write ahead log does not need to be cached in the OS page
// cache
FutureDataInputStreamBuilder builder = fs.openFile(dataFile);
if (status != null) {
builder.withFileStatus(status);
}
CompletableFuture<FSDataInputStream> future = builder.build();
while (!future.isDone()) {
try {
is.setDropBehind(Boolean.TRUE);
log.trace("Called setDropBehind(TRUE) for stream reading file {}", dataFile);
} catch (UnsupportedOperationException e) {
log.debug("setDropBehind not enabled for wal file: {}", dataFile);
} catch (IOException e) {
log.debug("IOException setting drop behind for file: {}, msg: {}", dataFile,
e.getMessage());
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while opening file: " + dataFile, e);
}
}
try {
FSDataInputStream is = future.get();
if (dropCacheBehind) {
// Tell the DataNode that the write ahead log does not need to be cached in the OS page
// cache
try {
is.setDropBehind(Boolean.TRUE);
log.trace("Called setDropBehind(TRUE) for stream reading file {}", dataFile);
} catch (UnsupportedOperationException e) {
log.debug("setDropBehind not enabled for wal file: {}", dataFile);
} catch (IOException e) {
log.debug("IOException setting drop behind for file: {}, msg: {}", dataFile,
e.getMessage());
}
}
return is;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while opening file: " + dataFile, e);
} catch (CancellationException e) {
throw new IOException("Cancelled while opening file: " + dataFile, e);
} catch (ExecutionException e) {
throw new IOException("Error trying to open file: " + dataFile, e);
}
return is;
};
this.lengthSupplier = () -> fs.getFileStatus(dataFile).getLen();
this.lengthSupplier =
() -> status == null ? fs.getFileStatus(dataFile).getLen() : status.getLen();
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ private TreeSet<String> getIndexKeys(AccumuloConfiguration accumuloConf, Configu
try {
for (Path file : files) {
FileSKVIterator reader = FileOperations.getInstance().newIndexReaderBuilder()
.forFile(file.toString(), fs, hadoopConf, cs).withTableConfiguration(accumuloConf)
.forFile(file.toString(), fs, hadoopConf, cs, null).withTableConfiguration(accumuloConf)
.build();
readers.add(reader);
fileReaders.add(reader);
Expand Down Expand Up @@ -333,7 +333,7 @@ private TreeSet<String> getSplitsFromFullScan(SiteConfiguration accumuloConf,
try {
for (Path file : files) {
FileSKVIterator reader = FileOperations.getInstance().newScanReaderBuilder()
.forFile(file.toString(), fs, hadoopConf, cs).withTableConfiguration(accumuloConf)
.forFile(file.toString(), fs, hadoopConf, cs, null).withTableConfiguration(accumuloConf)
.overRange(new Range(), Set.of(), false).build();
readers.add(reader);
fileReaders.add(reader);
Expand Down Expand Up @@ -367,7 +367,7 @@ private TreeSet<String> getSplitsBySize(AccumuloConfiguration accumuloConf,
try {
for (Path file : files) {
FileSKVIterator reader = FileOperations.getInstance().newScanReaderBuilder()
.forFile(file.toString(), fs, hadoopConf, cs).withTableConfiguration(accumuloConf)
.forFile(file.toString(), fs, hadoopConf, cs, null).withTableConfiguration(accumuloConf)
.overRange(new Range(), Set.of(), false).build();
readers.add(reader);
fileReaders.add(reader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ public void execute(final String[] args) throws Exception {

CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE,
siteConfig.getAllCryptoProperties());
CachableBuilder cb = new CachableBuilder().fsPath(fs, path).conf(conf).cryptoService(cs);
CachableBuilder cb =
new CachableBuilder().fsPath(fs, path, null).conf(conf).cryptoService(cs);
Reader iter = new RFile.Reader(cb);
MetricsGatherer<Map<String,ArrayList<VisibilityMetric>>> vmg = new VisMetricsGatherer();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public class RFileOperations extends FileOperations {

private static RFile.Reader getReader(FileOptions options) throws IOException {
CachableBuilder cb = new CachableBuilder()
.fsPath(options.getFileSystem(), new Path(options.getFilename()), options.dropCacheBehind)
.fsPath(options.getFileSystem(), new Path(options.getFilename()), options.dropCacheBehind,
options.status)
.conf(options.getConfiguration()).fileLen(options.getFileLenCache())
.cacheProvider(options.cacheProvider).readLimiter(options.getRateLimiter())
.cryptoService(options.getCryptoService());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public void execute(String[] args) throws Exception {
AccumuloConfiguration aconf = opts.getSiteConfiguration();
CryptoService cs = CryptoFactoryLoader.getServiceForServer(aconf);
Path path = new Path(file);
CachableBuilder cb = new CachableBuilder().fsPath(fs, path).conf(conf).cryptoService(cs);
CachableBuilder cb =
new CachableBuilder().fsPath(fs, path, null).conf(conf).cryptoService(cs);
try (Reader iter = new RFile.Reader(cb)) {

if (!file.endsWith(".rf")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,9 @@ public static SummaryReader load(FileSystem fs, Configuration conf, SummarizerFa
CompositeCache compositeCache =
new CompositeCache(BlockCacheUtil.instrument(CacheType.SUMMARY, summaryCache),
BlockCacheUtil.instrument(CacheType.INDEX, indexCache));
CachableBuilder cb = new CachableBuilder().fsPath(fs, file).conf(conf).fileLen(fileLenCache)
.cacheProvider(new BasicCacheProvider(compositeCache, null)).cryptoService(cryptoService);
CachableBuilder cb = new CachableBuilder().fsPath(fs, file, null).conf(conf)
.fileLen(fileLenCache).cacheProvider(new BasicCacheProvider(compositeCache, null))
.cryptoService(cryptoService);
bcReader = new CachableBlockFile.Reader(cb);
return load(bcReader, summarySelector, factory);
} catch (FileNotFoundException fne) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ public void testWrongGroup() throws Exception {

private Reader getReader(LocalFileSystem localFs, String testFile) throws IOException {
return (Reader) FileOperations.getInstance().newReaderBuilder()
.forFile(testFile, localFs, localFs.getConf(), NoCryptoServiceFactory.NONE)
.forFile(testFile, localFs, localFs.getConf(), NoCryptoServiceFactory.NONE, null)
.withTableConfiguration(DefaultConfiguration.getInstance()).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void test() throws IOException {

t1 = System.currentTimeMillis();
FileSKVIterator bmfr = FileOperations.getInstance().newReaderBuilder()
.forFile(fname, fs, conf, NoCryptoServiceFactory.NONE).withTableConfiguration(acuconf)
.forFile(fname, fs, conf, NoCryptoServiceFactory.NONE, null).withTableConfiguration(acuconf)
.build();
t2 = System.currentTimeMillis();
log.debug("Opened {} in {}", fname, (t2 - t1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void openReader() throws IOException {
AccumuloConfiguration defaultConf = DefaultConfiguration.getInstance();

// the caches used to obfuscate the multithreaded issues
CachableBuilder b = new CachableBuilder().fsPath(fs, path).conf(conf)
CachableBuilder b = new CachableBuilder().fsPath(fs, path, null).conf(conf)
.cryptoService(CryptoFactoryLoader.getServiceForServer(defaultConf));
reader = new RFile.Reader(new CachableBlockFile.Reader(b));
iter = new ColumnFamilySkippingIterator(reader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private void handleWriteTests(boolean content) throws Exception {
DefaultConfiguration acuconf = DefaultConfiguration.getInstance();
FileSKVIterator sample = FileOperations.getInstance().newReaderBuilder()
.forFile(files[0].toString(), FileSystem.getLocal(conf), conf,
NoCryptoServiceFactory.NONE)
NoCryptoServiceFactory.NONE, null)
.withTableConfiguration(acuconf).build()
.getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
assertNotNull(sample);
Expand Down
Loading
Loading