7171import org .apache .hadoop .hbase .util .Bytes ;
7272import org .apache .hadoop .hbase .util .CommonFSUtils ;
7373import org .apache .hadoop .hbase .util .EnvironmentEdgeManager ;
74- import org .apache .hadoop .hbase .util .FSUtils ;
7574import org .apache .hadoop .hbase .util .Pair ;
7675import org .apache .hadoop .hbase .util .Threads ;
7776import org .apache .hadoop .hbase .wal .WALSplitUtil ;
@@ -660,20 +659,46 @@ public void createDaughterRegions(final MasterProcedureEnv env) throws IOExcepti
660659 HRegionFileSystem regionFs = HRegionFileSystem .openRegionFromFileSystem (
661660 env .getMasterConfiguration (), fs , tabledir , getParentRegion (), false );
662661 regionFs .createSplitsDir (daughterOneRI , daughterTwoRI );
662+ Pair <List <StoreFileInfo >, List <StoreFileInfo >> expectedReferences =
663+ splitStoreFiles (env , regionFs );
664+ final ExecutorService threadPool = Executors .newFixedThreadPool (2 ,
665+ new ThreadFactoryBuilder ().setNameFormat ("RegionCommitter-pool-%d" ).setDaemon (true )
666+ .setUncaughtExceptionHandler (Threads .LOGGING_EXCEPTION_HANDLER ).build ());
667+ threadPool .submit (new Callable <Path >() {
668+ @ Override
669+ public Path call () throws IOException {
670+ return regionFs .commitDaughterRegion (daughterOneRI , expectedReferences .getFirst (), env );
671+ }
672+ });
673+ threadPool .submit (new Callable <Path >() {
674+ @ Override
675+ public Path call () throws IOException {
676+ return regionFs .commitDaughterRegion (daughterTwoRI , expectedReferences .getSecond (), env );
677+ }
678+ });
679+ // Shutdown the pool
680+ threadPool .shutdown ();
663681
664- Pair <List <Path >, List <Path >> expectedReferences = splitStoreFiles (env , regionFs );
665-
666- assertSplitResultFilesCount (fs , expectedReferences .getFirst ().size (),
667- regionFs .getSplitsDir (daughterOneRI ));
668- regionFs .commitDaughterRegion (daughterOneRI , expectedReferences .getFirst (), env );
669- assertSplitResultFilesCount (fs , expectedReferences .getFirst ().size (),
670- new Path (tabledir , daughterOneRI .getEncodedName ()));
671-
672- assertSplitResultFilesCount (fs , expectedReferences .getSecond ().size (),
673- regionFs .getSplitsDir (daughterTwoRI ));
674- regionFs .commitDaughterRegion (daughterTwoRI , expectedReferences .getSecond (), env );
675- assertSplitResultFilesCount (fs , expectedReferences .getSecond ().size (),
676- new Path (tabledir , daughterTwoRI .getEncodedName ()));
682+ Configuration conf = env .getMasterConfiguration ();
683+ // Wait for all the tasks to finish.
684+ // When splits ran on the RegionServer, how-long-to-wait-configuration was named
685+ // hbase.regionserver.fileSplitTimeout. If set, use its value.
686+ long fileSplitTimeout = conf .getLong ("hbase.master.fileSplitTimeout" ,
687+ conf .getLong ("hbase.regionserver.fileSplitTimeout" , 600000 ));
688+ try {
689+ boolean stillRunning = !threadPool .awaitTermination (fileSplitTimeout , TimeUnit .MILLISECONDS );
690+ if (stillRunning ) {
691+ threadPool .shutdownNow ();
692+ // wait for the thread to shutdown completely.
693+ while (!threadPool .isTerminated ()) {
694+ Thread .sleep (50 );
695+ }
696+ throw new IOException (
697+ "Took too long to split the" + " files and create the references, aborting split" );
698+ }
699+ } catch (InterruptedException e ) {
700+ throw (InterruptedIOException ) new InterruptedIOException ().initCause (e );
701+ }
677702 }
678703
679704 private void deleteDaughterRegions (final MasterProcedureEnv env ) throws IOException {
@@ -689,8 +714,8 @@ private void deleteDaughterRegions(final MasterProcedureEnv env) throws IOExcept
689714 * Create Split directory
690715 * @param env MasterProcedureEnv
691716 */
692- private Pair <List <Path >, List <Path >> splitStoreFiles (final MasterProcedureEnv env ,
693- final HRegionFileSystem regionFs ) throws IOException {
717+ private Pair <List <StoreFileInfo >, List <StoreFileInfo >> splitStoreFiles (
718+ final MasterProcedureEnv env , final HRegionFileSystem regionFs ) throws IOException {
694719 final Configuration conf = env .getMasterConfiguration ();
695720 TableDescriptor htd = env .getMasterServices ().getTableDescriptors ().get (getTableName ());
696721 // The following code sets up a thread pool executor with as many slots as
@@ -745,7 +770,8 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
745770 final ExecutorService threadPool = Executors .newFixedThreadPool (maxThreads ,
746771 new ThreadFactoryBuilder ().setNameFormat ("StoreFileSplitter-pool-%d" ).setDaemon (true )
747772 .setUncaughtExceptionHandler (Threads .LOGGING_EXCEPTION_HANDLER ).build ());
748- final List <Future <Pair <Path , Path >>> futures = new ArrayList <Future <Pair <Path , Path >>>(nbFiles );
773+ final List <Future <Pair <StoreFileInfo , StoreFileInfo >>> futures =
774+ new ArrayList <Future <Pair <StoreFileInfo , StoreFileInfo >>>(nbFiles );
749775
750776 // Split each store file.
751777 for (Map .Entry <String , Collection <StoreFileInfo >> e : files .entrySet ()) {
@@ -792,12 +818,12 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
792818 throw (InterruptedIOException ) new InterruptedIOException ().initCause (e );
793819 }
794820
795- List <Path > daughterA = new ArrayList <>();
796- List <Path > daughterB = new ArrayList <>();
821+ List <StoreFileInfo > daughterA = new ArrayList <>();
822+ List <StoreFileInfo > daughterB = new ArrayList <>();
797823 // Look for any exception
798- for (Future <Pair <Path , Path >> future : futures ) {
824+ for (Future <Pair <StoreFileInfo , StoreFileInfo >> future : futures ) {
799825 try {
800- Pair <Path , Path > p = future .get ();
826+ Pair <StoreFileInfo , StoreFileInfo > p = future .get ();
801827 if (p .getFirst () != null ) {
802828 daughterA .add (p .getFirst ());
803829 }
@@ -819,19 +845,8 @@ private Pair<List<Path>, List<Path>> splitStoreFiles(final MasterProcedureEnv en
819845 return new Pair <>(daughterA , daughterB );
820846 }
821847
822- private void assertSplitResultFilesCount (final FileSystem fs ,
823- final int expectedSplitResultFileCount , Path dir ) throws IOException {
824- if (expectedSplitResultFileCount != 0 ) {
825- int resultFileCount = FSUtils .getRegionReferenceAndLinkFileCount (fs , dir );
826- if (expectedSplitResultFileCount != resultFileCount ) {
827- throw new IOException ("Failing split. Didn't have expected reference and HFileLink files"
828- + ", expected=" + expectedSplitResultFileCount + ", actual=" + resultFileCount );
829- }
830- }
831- }
832-
833- private Pair <Path , Path > splitStoreFile (HRegionFileSystem regionFs , TableDescriptor htd ,
834- ColumnFamilyDescriptor hcd , HStoreFile sf ) throws IOException {
848+ private Pair <StoreFileInfo , StoreFileInfo > splitStoreFile (HRegionFileSystem regionFs ,
849+ TableDescriptor htd , ColumnFamilyDescriptor hcd , HStoreFile sf ) throws IOException {
835850 if (LOG .isDebugEnabled ()) {
836851 LOG .debug ("pid=" + getProcId () + " splitting started for store file: " + sf .getPath ()
837852 + " for region: " + getParentRegion ().getShortNameToLog ());
@@ -847,22 +862,22 @@ private Pair<Path, Path> splitStoreFile(HRegionFileSystem regionFs, TableDescrip
847862 StoreFileTrackerFactory .create (regionFs .getFileSystem ().getConf (), htd , hcd ,
848863 HRegionFileSystem .create (regionFs .getFileSystem ().getConf (), regionFs .getFileSystem (),
849864 regionFs .getTableDir (), daughterTwoRI ));
850- final Path path_first = regionFs .splitStoreFile (this .daughterOneRI , familyName , sf , splitRow ,
851- false , splitPolicy , daughterOneSft );
852- final Path path_second = regionFs .splitStoreFile (this .daughterTwoRI , familyName , sf , splitRow ,
853- true , splitPolicy , daughterTwoSft );
865+ final StoreFileInfo sfiFirst = regionFs .splitStoreFile (this .daughterOneRI , familyName , sf ,
866+ splitRow , false , splitPolicy , daughterOneSft );
867+ final StoreFileInfo sfiSecond = regionFs .splitStoreFile (this .daughterTwoRI , familyName , sf ,
868+ splitRow , true , splitPolicy , daughterTwoSft );
854869 if (LOG .isDebugEnabled ()) {
855870 LOG .debug ("pid=" + getProcId () + " splitting complete for store file: " + sf .getPath ()
856871 + " for region: " + getParentRegion ().getShortNameToLog ());
857872 }
858- return new Pair <Path , Path >( path_first , path_second );
873+ return new Pair <StoreFileInfo , StoreFileInfo >( sfiFirst , sfiSecond );
859874 }
860875
861876 /**
862877 * Utility class used to do the file splitting / reference writing in parallel instead of
863878 * sequentially.
864879 */
865- private class StoreFileSplitter implements Callable <Pair <Path , Path >> {
880+ private class StoreFileSplitter implements Callable <Pair <StoreFileInfo , StoreFileInfo >> {
866881 private final HRegionFileSystem regionFs ;
867882 private final ColumnFamilyDescriptor hcd ;
868883 private final HStoreFile sf ;
@@ -883,7 +898,7 @@ public StoreFileSplitter(HRegionFileSystem regionFs, TableDescriptor htd,
883898 }
884899
885900 @ Override
886- public Pair <Path , Path > call () throws IOException {
901+ public Pair <StoreFileInfo , StoreFileInfo > call () throws IOException {
887902 return splitStoreFile (regionFs , htd , hcd , sf );
888903 }
889904 }
0 commit comments