Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "server/io/FS.proto";
message StoreFileEntry {
required string name = 1;
required uint64 size = 2;
optional Reference reference = 3;
}

message StoreFileList {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class HFileLink extends FileLink {
* The pattern should be used for hfile and reference links that can be found in
* /hbase/table/region/family/
*/
private static final Pattern REF_OR_HFILE_LINK_PATTERN =
public static final Pattern REF_OR_HFILE_LINK_PATTERN =
Pattern.compile(String.format("^(?:(%s)(?:=))?(%s)=(%s)-(.+)$", TableName.VALID_NAMESPACE_REGEX,
TableName.VALID_TABLE_QUALIFIER_REGEX, RegionInfoBuilder.ENCODED_REGION_NAME_REGEX));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class Reference {
* For split HStoreFiles, it specifies if the file covers the lower half or the upper half of the
* key range
*/
static enum Range {
public static enum Range {
/** HStoreFile contains upper half of key range */
top,
/** HStoreFile contains lower half of key range */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ private void createMergedRegion(final MasterProcedureEnv env) throws IOException
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), regionsToMerge[0].getTable());
final FileSystem fs = mfs.getFileSystem();
List<Path> mergedFiles = new ArrayList<>();
List<StoreFileInfo> mergedFiles = new ArrayList<StoreFileInfo>();
HRegionFileSystem mergeRegionFs = HRegionFileSystem
.createRegionOnFileSystem(env.getMasterConfiguration(), fs, tableDir, mergedRegion);

Expand All @@ -622,11 +622,11 @@ private void createMergedRegion(final MasterProcedureEnv env) throws IOException
.setState(State.MERGING_NEW);
}

private List<Path> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
private List<StoreFileInfo> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem regionFs,
HRegionFileSystem mergeRegionFs, RegionInfo mergedRegion) throws IOException {
final TableDescriptor htd =
env.getMasterServices().getTableDescriptors().get(mergedRegion.getTable());
List<Path> mergedFiles = new ArrayList<>();
List<StoreFileInfo> mergedFiles = new ArrayList<StoreFileInfo>();
for (ColumnFamilyDescriptor hcd : htd.getColumnFamilies()) {
String family = hcd.getNameAsString();
StoreFileTracker tracker =
Expand All @@ -643,7 +643,7 @@ private List<Path> mergeStoreFiles(MasterProcedureEnv env, HRegionFileSystem reg
// is running in a regionserver's Store context, or we might not be able
// to read the hfiles.
storeFileInfo.setConf(storeConfiguration);
Path refFile = mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
StoreFileInfo refFile = mergeRegionFs.mergeStoreFile(regionFs.getRegionInfo(), family,
new HStoreFile(storeFileInfo, hcd.getBloomFilterType(), CacheConfig.DISABLED), tracker);
mergedFiles.add(refFile);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WALSplitUtil;
Expand Down Expand Up @@ -660,20 +659,57 @@ public void createDaughterRegions(final MasterProcedureEnv env) throws IOExcepti
HRegionFileSystem regionFs = HRegionFileSystem.openRegionFromFileSystem(
env.getMasterConfiguration(), fs, tabledir, getParentRegion(), false);
regionFs.createSplitsDir(daughterOneRI, daughterTwoRI);
Pair<List<StoreFileInfo>, List<StoreFileInfo>> expectedReferences =
splitStoreFiles(env, regionFs);
final ExecutorService threadPool = Executors.newFixedThreadPool(2,
new ThreadFactoryBuilder().setNameFormat("RegionCommitter-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
Future<Path> futureOne = threadPool.submit(new Callable<Path>() {
@Override
public Path call() throws IOException {
return regionFs.commitDaughterRegion(daughterOneRI, expectedReferences.getFirst(), env);
}
});
Future<Path> futureTwo = threadPool.submit(new Callable<Path>() {
@Override
public Path call() throws IOException {
return regionFs.commitDaughterRegion(daughterTwoRI, expectedReferences.getSecond(), env);
}
});
handleThreadPoolShutdown(threadPool, env.getMasterConfiguration());

Pair<List<Path>, List<Path>> expectedReferences = splitStoreFiles(env, regionFs);

assertSplitResultFilesCount(fs, expectedReferences.getFirst().size(),
regionFs.getSplitsDir(daughterOneRI));
regionFs.commitDaughterRegion(daughterOneRI, expectedReferences.getFirst(), env);
assertSplitResultFilesCount(fs, expectedReferences.getFirst().size(),
new Path(tabledir, daughterOneRI.getEncodedName()));
try {
futureOne.get();
futureTwo.get();
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
} catch (ExecutionException e) {
throw new IOException("Daughter region commit failed", e);
}
}

assertSplitResultFilesCount(fs, expectedReferences.getSecond().size(),
regionFs.getSplitsDir(daughterTwoRI));
regionFs.commitDaughterRegion(daughterTwoRI, expectedReferences.getSecond(), env);
assertSplitResultFilesCount(fs, expectedReferences.getSecond().size(),
new Path(tabledir, daughterTwoRI.getEncodedName()));
private void handleThreadPoolShutdown(ExecutorService threadPool, Configuration conf)
throws IOException {
threadPool.shutdown();
// Wait for all the tasks to finish.
// When splits ran on the RegionServer, how-long-to-wait-configuration was named
// fileSplitTimeout. If set, use its value.
long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout",
conf.getLong("hbase.regionserver.fileSplitTimeout", 600000));
try {
boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, TimeUnit.MILLISECONDS);
if (stillRunning) {
threadPool.shutdownNow();
// wait for the thread to shutdown completely.
while (!threadPool.isTerminated()) {
Thread.sleep(50);
}
throw new IOException(
"Took too long to split the files and create the references, aborting split");
}
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
}

private void deleteDaughterRegions(final MasterProcedureEnv env) throws IOException {
Expand All @@ -689,8 +725,8 @@ private void deleteDaughterRegions(final MasterProcedureEnv env) throws IOExcept
* Create Split directory
* @param env MasterProcedureEnv
*/
private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv env,
final HRegionFileSystem regionFs) throws IOException {
private Pair<List<StoreFileInfo>, List<StoreFileInfo>> splitStoreFiles(
final MasterProcedureEnv env, final HRegionFileSystem regionFs) throws IOException {
final Configuration conf = env.getMasterConfiguration();
TableDescriptor htd = env.getMasterServices().getTableDescriptors().get(getTableName());
// The following code sets up a thread pool executor with as many slots as
Expand Down Expand Up @@ -745,7 +781,8 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads,
new ThreadFactoryBuilder().setNameFormat("StoreFileSplitter-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);
final List<Future<Pair<StoreFileInfo, StoreFileInfo>>> futures =
new ArrayList<Future<Pair<StoreFileInfo, StoreFileInfo>>>(nbFiles);

// Split each store file.
for (Map.Entry<String, Collection<StoreFileInfo>> e : files.entrySet()) {
Expand All @@ -769,35 +806,13 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
}
}
}
// Shutdown the pool
threadPool.shutdown();

// Wait for all the tasks to finish.
// When splits ran on the RegionServer, how-long-to-wait-configuration was named
// hbase.regionserver.fileSplitTimeout. If set, use its value.
long fileSplitTimeout = conf.getLong("hbase.master.fileSplitTimeout",
conf.getLong("hbase.regionserver.fileSplitTimeout", 600000));
try {
boolean stillRunning = !threadPool.awaitTermination(fileSplitTimeout, TimeUnit.MILLISECONDS);
if (stillRunning) {
threadPool.shutdownNow();
// wait for the thread to shutdown completely.
while (!threadPool.isTerminated()) {
Thread.sleep(50);
}
throw new IOException(
"Took too long to split the" + " files and create the references, aborting split");
}
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}

List<Path> daughterA = new ArrayList<>();
List<Path> daughterB = new ArrayList<>();
handleThreadPoolShutdown(threadPool, conf);
List<StoreFileInfo> daughterA = new ArrayList<>();
List<StoreFileInfo> daughterB = new ArrayList<>();
// Look for any exception
for (Future<Pair<Path, Path>> future : futures) {
for (Future<Pair<StoreFileInfo, StoreFileInfo>> future : futures) {
try {
Pair<Path, Path> p = future.get();
Pair<StoreFileInfo, StoreFileInfo> p = future.get();
if (p.getFirst() != null) {
daughterA.add(p.getFirst());
}
Expand All @@ -819,19 +834,8 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
return new Pair<>(daughterA, daughterB);
}

private void assertSplitResultFilesCount(final FileSystem fs,
final int expectedSplitResultFileCount, Path dir) throws IOException {
if (expectedSplitResultFileCount != 0) {
int resultFileCount = FSUtils.getRegionReferenceAndLinkFileCount(fs, dir);
if (expectedSplitResultFileCount != resultFileCount) {
throw new IOException("Failing split. Didn't have expected reference and HFileLink files"
+ ", expected=" + expectedSplitResultFileCount + ", actual=" + resultFileCount);
}
}
}

private Pair<Path, Path> splitStoreFile(HRegionFileSystem regionFs, TableDescriptor htd,
ColumnFamilyDescriptor hcd, HStoreFile sf) throws IOException {
private Pair<StoreFileInfo, StoreFileInfo> splitStoreFile(HRegionFileSystem regionFs,
TableDescriptor htd, ColumnFamilyDescriptor hcd, HStoreFile sf) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("pid=" + getProcId() + " splitting started for store file: " + sf.getPath()
+ " for region: " + getParentRegion().getShortNameToLog());
Expand All @@ -847,22 +851,22 @@ private Pair<Path, Path> splitStoreFile(HRegionFileSystem regionFs, TableDescrip
StoreFileTrackerFactory.create(regionFs.getFileSystem().getConf(), htd, hcd,
HRegionFileSystem.create(regionFs.getFileSystem().getConf(), regionFs.getFileSystem(),
regionFs.getTableDir(), daughterTwoRI));
final Path path_first = regionFs.splitStoreFile(this.daughterOneRI, familyName, sf, splitRow,
false, splitPolicy, daughterOneSft);
final Path path_second = regionFs.splitStoreFile(this.daughterTwoRI, familyName, sf, splitRow,
true, splitPolicy, daughterTwoSft);
final StoreFileInfo sfiFirst = regionFs.splitStoreFile(this.daughterOneRI, familyName, sf,
splitRow, false, splitPolicy, daughterOneSft);
final StoreFileInfo sfiSecond = regionFs.splitStoreFile(this.daughterTwoRI, familyName, sf,
splitRow, true, splitPolicy, daughterTwoSft);
if (LOG.isDebugEnabled()) {
LOG.debug("pid=" + getProcId() + " splitting complete for store file: " + sf.getPath()
+ " for region: " + getParentRegion().getShortNameToLog());
}
return new Pair<Path, Path>(path_first, path_second);
return new Pair<StoreFileInfo, StoreFileInfo>(sfiFirst, sfiSecond);
}

/**
* Utility class used to do the file splitting / reference writing in parallel instead of
* sequentially.
*/
private class StoreFileSplitter implements Callable<Pair<Path, Path>> {
private class StoreFileSplitter implements Callable<Pair<StoreFileInfo, StoreFileInfo>> {
private final HRegionFileSystem regionFs;
private final ColumnFamilyDescriptor hcd;
private final HStoreFile sf;
Expand All @@ -883,7 +887,7 @@ public StoreFileSplitter(HRegionFileSystem regionFs, TableDescriptor htd,
}

@Override
public Pair<Path, Path> call() throws IOException {
public Pair<StoreFileInfo, StoreFileInfo> call() throws IOException {
return splitStoreFile(regionFs, htd, hcd, sf);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public class HMobStore extends HStore {
// table, we need to find the original mob files by this table name. For details please see
// cloning snapshot for mob files.
private final byte[] refCellTags;
private StoreFileTracker mobStoreSFT = null;

public HMobStore(final HRegion region, final ColumnFamilyDescriptor family,
final Configuration confParam, boolean warmup) throws IOException {
Expand Down
Loading