44
44
import org .apache .hadoop .hbase .regionserver .compactions .CompactionLifeCycleTracker ;
45
45
import org .apache .hadoop .hbase .regionserver .throttle .NoLimitThroughputController ;
46
46
import org .apache .hadoop .hbase .util .Bytes ;
47
+ import org .apache .hadoop .hbase .util .CommonFSUtils ;
47
48
import org .apache .hadoop .hbase .util .EnvironmentEdgeManager ;
48
49
import org .apache .hadoop .hbase .util .FSTableDescriptors ;
49
50
import org .apache .hadoop .hbase .util .FSUtils ;
57
58
import org .apache .hadoop .mapreduce .lib .input .FileSplit ;
58
59
import org .apache .hadoop .mapreduce .lib .input .TextInputFormat ;
59
60
import org .apache .hadoop .mapreduce .lib .output .NullOutputFormat ;
61
+ import org .apache .hadoop .mapreduce .security .TokenCache ;
60
62
import org .apache .hadoop .util .LineReader ;
61
63
import org .apache .hadoop .util .Tool ;
62
64
import org .apache .hadoop .util .ToolRunner ;
76
78
public class CompactionTool extends Configured implements Tool {
77
79
private static final Logger LOG = LoggerFactory .getLogger (CompactionTool .class );
78
80
79
- private final static String CONF_TMP_DIR = "hbase.tmp.dir" ;
80
81
private final static String CONF_COMPACT_ONCE = "hbase.compactiontool.compact.once" ;
81
82
private final static String CONF_COMPACT_MAJOR = "hbase.compactiontool.compact.major" ;
82
83
private final static String CONF_DELETE_COMPACTED = "hbase.compactiontool.delete" ;
@@ -89,12 +90,10 @@ private static class CompactionWorker {
89
90
private final boolean deleteCompacted ;
90
91
private final Configuration conf ;
91
92
private final FileSystem fs ;
92
- private final Path tmpDir ;
93
93
94
94
public CompactionWorker (final FileSystem fs , final Configuration conf ) {
95
95
this .conf = conf ;
96
96
this .deleteCompacted = conf .getBoolean (CONF_DELETE_COMPACTED , false );
97
- this .tmpDir = new Path (conf .get (CONF_TMP_DIR ));
98
97
this .fs = fs ;
99
98
}
100
99
@@ -105,7 +104,8 @@ public CompactionWorker(final FileSystem fs, final Configuration conf) {
105
104
* @param compactOnce Execute just a single step of compaction.
106
105
* @param major Request major compaction.
107
106
*/
108
- public void compact (final Path path , final boolean compactOnce , final boolean major ) throws IOException {
107
+ public void compact (final Path path , final boolean compactOnce , final boolean major )
108
+ throws IOException {
109
109
if (isFamilyDir (fs , path )) {
110
110
Path regionDir = path .getParent ();
111
111
Path tableDir = regionDir .getParent ();
@@ -150,7 +150,7 @@ private void compactRegion(final Path tableDir, final TableDescriptor htd,
150
150
private void compactStoreFiles (final Path tableDir , final TableDescriptor htd ,
151
151
final RegionInfo hri , final String familyName , final boolean compactOnce ,
152
152
final boolean major ) throws IOException {
153
- HStore store = getStore (conf , fs , tableDir , htd , hri , familyName , tmpDir );
153
+ HStore store = getStore (conf , fs , tableDir , htd , hri , familyName );
154
154
LOG .info ("Compact table=" + htd .getTableName () +
155
155
" region=" + hri .getRegionNameAsString () +
156
156
" family=" + familyName );
@@ -177,19 +177,10 @@ private void compactStoreFiles(final Path tableDir, final TableDescriptor htd,
177
177
store .close ();
178
178
}
179
179
180
- /**
181
- * Create a "mock" HStore that uses the tmpDir specified by the user and
182
- * the store dir to compact as source.
183
- */
184
180
private static HStore getStore (final Configuration conf , final FileSystem fs ,
185
181
final Path tableDir , final TableDescriptor htd , final RegionInfo hri ,
186
- final String familyName , final Path tempDir ) throws IOException {
187
- HRegionFileSystem regionFs = new HRegionFileSystem (conf , fs , tableDir , hri ) {
188
- @ Override
189
- public Path getTempDir () {
190
- return tempDir ;
191
- }
192
- };
182
+ final String familyName ) throws IOException {
183
+ HRegionFileSystem regionFs = new HRegionFileSystem (conf , fs , tableDir , hri );
193
184
HRegion region = new HRegion (regionFs , null , conf , htd , null );
194
185
return new HStore (region , htd .getColumnFamily (Bytes .toBytes (familyName )), conf , false );
195
186
}
@@ -221,7 +212,7 @@ public void setup(Context context) {
221
212
major = conf .getBoolean (CONF_COMPACT_MAJOR , false );
222
213
223
214
try {
224
- FileSystem fs = FileSystem . get (conf );
215
+ FileSystem fs = CommonFSUtils . getRootDirFileSystem (conf );
225
216
this .compactor = new CompactionWorker (fs , conf );
226
217
} catch (IOException e ) {
227
218
throw new RuntimeException ("Could not get the input FileSystem" , e );
@@ -301,23 +292,19 @@ private static String[] getStoreDirHosts(final FileSystem fs, final Path path)
301
292
* The file is a TextFile with each line corrisponding to a
302
293
* store files directory to compact.
303
294
*/
304
- public static void createInputFile (final FileSystem fs , final Path path ,
305
- final Set <Path > toCompactDirs ) throws IOException {
295
+ public static List < Path > createInputFile (final FileSystem fs , final FileSystem stagingFs ,
296
+ final Path path , final Set <Path > toCompactDirs ) throws IOException {
306
297
// Extract the list of store dirs
307
298
List <Path > storeDirs = new LinkedList <>();
308
299
for (Path compactDir : toCompactDirs ) {
309
300
if (isFamilyDir (fs , compactDir )) {
310
301
storeDirs .add (compactDir );
311
302
} else if (isRegionDir (fs , compactDir )) {
312
- for (Path familyDir : FSUtils .getFamilyDirs (fs , compactDir )) {
313
- storeDirs .add (familyDir );
314
- }
303
+ storeDirs .addAll (FSUtils .getFamilyDirs (fs , compactDir ));
315
304
} else if (isTableDir (fs , compactDir )) {
316
305
// Lookup regions
317
306
for (Path regionDir : FSUtils .getRegionDirs (fs , compactDir )) {
318
- for (Path familyDir : FSUtils .getFamilyDirs (fs , regionDir )) {
319
- storeDirs .add (familyDir );
320
- }
307
+ storeDirs .addAll (FSUtils .getFamilyDirs (fs , regionDir ));
321
308
}
322
309
} else {
323
310
throw new IOException (
@@ -326,7 +313,7 @@ public static void createInputFile(final FileSystem fs, final Path path,
326
313
}
327
314
328
315
// Write Input File
329
- FSDataOutputStream stream = fs .create (path );
316
+ FSDataOutputStream stream = stagingFs .create (path );
330
317
LOG .info ("Create input file=" + path + " with " + storeDirs .size () + " dirs to compact." );
331
318
try {
332
319
final byte [] newLine = Bytes .toBytes ("\n " );
@@ -337,6 +324,7 @@ public static void createInputFile(final FileSystem fs, final Path path,
337
324
} finally {
338
325
stream .close ();
339
326
}
327
+ return storeDirs ;
340
328
}
341
329
}
342
330
@@ -361,15 +349,20 @@ private int doMapReduce(final FileSystem fs, final Set<Path> toCompactDirs,
361
349
// add dependencies (including HBase ones)
362
350
TableMapReduceUtil .addDependencyJars (job );
363
351
364
- Path stagingDir = JobUtil .getStagingDir (conf );
352
+ Path stagingDir = JobUtil .getQualifiedStagingDir (conf );
353
+ FileSystem stagingFs = stagingDir .getFileSystem (conf );
365
354
try {
366
355
// Create input file with the store dirs
367
356
Path inputPath = new Path (stagingDir , "compact-" + EnvironmentEdgeManager .currentTime ());
368
- CompactionInputFormat .createInputFile (fs , inputPath , toCompactDirs );
357
+ List <Path > storeDirs = CompactionInputFormat .createInputFile (fs , stagingFs ,
358
+ inputPath , toCompactDirs );
369
359
CompactionInputFormat .addInputPath (job , inputPath );
370
360
371
361
// Initialize credential for secure cluster
372
362
TableMapReduceUtil .initCredentials (job );
363
+ // Despite the method name this will get delegation token for the filesystem
364
+ TokenCache .obtainTokensForNamenodes (job .getCredentials (),
365
+ storeDirs .toArray (new Path [0 ]), conf );
373
366
374
367
// Start the MR Job and wait
375
368
return job .waitForCompletion (true ) ? 0 : 1 ;
@@ -398,7 +391,7 @@ public int run(String[] args) throws Exception {
398
391
boolean mapred = false ;
399
392
400
393
Configuration conf = getConf ();
401
- FileSystem fs = FileSystem . get (conf );
394
+ FileSystem fs = CommonFSUtils . getRootDirFileSystem (conf );
402
395
403
396
try {
404
397
for (int i = 0 ; i < args .length ; ++i ) {
@@ -458,14 +451,15 @@ private void printUsage(final String message) {
458
451
System .err .println ("Note: -D properties will be applied to the conf used. " );
459
452
System .err .println ("For example: " );
460
453
System .err .println (" To stop delete of compacted file, pass -D" +CONF_DELETE_COMPACTED +"=false" );
461
- System .err .println (" To set tmp dir, pass -D" +CONF_TMP_DIR +"=ALTERNATE_DIR" );
462
454
System .err .println ();
463
455
System .err .println ("Examples:" );
464
456
System .err .println (" To compact the full 'TestTable' using MapReduce:" );
465
- System .err .println (" $ hbase " + this .getClass ().getName () + " -mapred hdfs://hbase/data/default/TestTable" );
457
+ System .err .println (" $ hbase " + this .getClass ().getName () +
458
+ " -mapred hdfs://hbase/data/default/TestTable" );
466
459
System .err .println ();
467
460
System .err .println (" To compact column family 'x' of the table 'TestTable' region 'abc':" );
468
- System .err .println (" $ hbase " + this .getClass ().getName () + " hdfs://hbase/data/default/TestTable/abc/x" );
461
+ System .err .println (" $ hbase " + this .getClass ().getName () +
462
+ " hdfs://hbase/data/default/TestTable/abc/x" );
469
463
}
470
464
471
465
public static void main (String [] args ) throws Exception {
0 commit comments