@@ -94,18 +94,20 @@ public class HFileReplicator implements Closeable {
94
94
private int maxCopyThreads ;
95
95
private int copiesPerThread ;
96
96
private List <String > sourceClusterIds ;
97
+ private ReplicationSinkTranslator translator ;
97
98
98
99
public HFileReplicator (Configuration sourceClusterConf , String sourceBaseNamespaceDirPath ,
99
100
String sourceHFileArchiveDirPath , Map <String , List <Pair <byte [], List <String >>>> tableQueueMap ,
100
- Configuration conf , AsyncClusterConnection connection , List <String > sourceClusterIds )
101
- throws IOException {
101
+ Configuration conf , AsyncClusterConnection connection , List <String > sourceClusterIds ,
102
+ ReplicationSinkTranslator translator ) throws IOException {
102
103
this .sourceClusterConf = sourceClusterConf ;
103
104
this .sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath ;
104
105
this .sourceHFileArchiveDirPath = sourceHFileArchiveDirPath ;
105
106
this .bulkLoadHFileMap = tableQueueMap ;
106
107
this .conf = conf ;
107
108
this .connection = connection ;
108
109
this .sourceClusterIds = sourceClusterIds ;
110
+ this .translator = translator ;
109
111
110
112
userProvider = UserProvider .instantiate (conf );
111
113
fsDelegationToken = new FsDelegationToken (userProvider , "renewer" );
@@ -131,29 +133,30 @@ public void close() throws IOException {
131
133
132
134
public Void replicate () throws IOException {
133
135
// Copy all the hfiles to the local file system
134
- Map <String , Path > tableStagingDirsMap = copyHFilesToStagingDir ();
136
+ Map <String , Path > tableToSinkStagingDir = copySourceHFilesToSinkStagingDir ();
135
137
136
138
int maxRetries = conf .getInt (HConstants .BULKLOAD_MAX_RETRIES_NUMBER , 10 );
137
139
138
- for (Entry <String , Path > tableStagingDir : tableStagingDirsMap .entrySet ()) {
139
- String tableNameString = tableStagingDir .getKey ();
140
- Path stagingDir = tableStagingDir .getValue ();
141
- TableName tableName = TableName .valueOf (tableNameString );
140
+ for (Entry <String , Path > tableStagingDir : tableToSinkStagingDir .entrySet ()) {
141
+ String tableNameStr = tableStagingDir .getKey ();
142
+ TableName tableName = TableName .valueOf (tableNameStr );
143
+ TableName sinkTableName = translator .getSinkTableName (tableName );
144
+ Path sinkStagingDir = tableStagingDir .getValue ();
142
145
143
146
// Prepare collection of queue of hfiles to be loaded(replicated)
144
147
Deque <LoadQueueItem > queue = new LinkedList <>();
145
- BulkLoadHFilesTool .prepareHFileQueue (conf , connection , tableName , stagingDir , queue , false ,
146
- false );
148
+ BulkLoadHFilesTool .prepareHFileQueue (conf , connection , sinkTableName , sinkStagingDir , queue ,
149
+ false , false );
147
150
148
151
if (queue .isEmpty ()) {
149
- LOG .warn ("Did not find any files to replicate in directory {}" , stagingDir .toUri ());
152
+ LOG .warn ("Did not find any files to replicate in directory {}" , sinkStagingDir .toUri ());
150
153
return null ;
151
154
}
152
155
fsDelegationToken .acquireDelegationToken (sinkFs );
153
156
try {
154
- doBulkLoad (conf , tableName , stagingDir , queue , maxRetries );
157
+ doBulkLoad (conf , sinkTableName , sinkStagingDir , queue , maxRetries );
155
158
} finally {
156
- cleanup (stagingDir );
159
+ cleanup (sinkStagingDir );
157
160
}
158
161
}
159
162
return null ;
@@ -194,12 +197,12 @@ private void cleanup(Path stagingDir) {
194
197
// Do not close the file system
195
198
}
196
199
197
- private Map <String , Path > copyHFilesToStagingDir () throws IOException {
200
+ private Map <String , Path > copySourceHFilesToSinkStagingDir () throws IOException {
198
201
Map <String , Path > mapOfCopiedHFiles = new HashMap <>();
199
202
Pair <byte [], List <String >> familyHFilePathsPair ;
200
203
List <String > hfilePaths ;
201
204
byte [] family ;
202
- Path familyStagingDir ;
205
+ Path sinkFamilyStagingDir ;
203
206
int familyHFilePathsPairsListSize ;
204
207
int totalNoOfHFiles ;
205
208
List <Pair <byte [], List <String >>> familyHFilePathsPairsList ;
@@ -224,32 +227,33 @@ private Map<String, Path> copyHFilesToStagingDir() throws IOException {
224
227
// For each table name in the map
225
228
for (Entry <String , List <Pair <byte [], List <String >>>> tableEntry : bulkLoadHFileMap
226
229
.entrySet ()) {
227
- String tableName = tableEntry .getKey ();
230
+ String tableNameStr = tableEntry .getKey ();
231
+ TableName tableName = TableName .valueOf (tableNameStr );
228
232
229
233
// Create staging directory for each table
230
- Path stagingDir = createStagingDir (hbaseStagingDir , user , TableName . valueOf ( tableName ) );
234
+ Path sinkStagingDir = createSinkStagingDir (hbaseStagingDir , user , tableName );
231
235
232
236
familyHFilePathsPairsList = tableEntry .getValue ();
233
237
familyHFilePathsPairsListSize = familyHFilePathsPairsList .size ();
234
238
235
- // For each list of family hfile paths pair in the table
239
+ // For each ( family, hfile paths) pair in the table
236
240
for (int i = 0 ; i < familyHFilePathsPairsListSize ; i ++) {
237
241
familyHFilePathsPair = familyHFilePathsPairsList .get (i );
238
242
239
243
family = familyHFilePathsPair .getFirst ();
240
244
hfilePaths = familyHFilePathsPair .getSecond ();
241
245
242
- familyStagingDir = new Path ( stagingDir , Bytes . toString ( family ) );
246
+ sinkFamilyStagingDir = getSinkFamilyStagingDir ( sinkStagingDir , tableName , family );
243
247
totalNoOfHFiles = hfilePaths .size ();
244
248
245
- // For each list of hfile paths for the family
249
+ // For each hfile path in the family
246
250
List <Future <Void >> futures = new ArrayList <>();
247
251
Callable <Void > c ;
248
252
Future <Void > future ;
249
253
int currentCopied = 0 ;
250
- // Copy the hfiles parallely
254
+ // Copy the hfiles in parallel
251
255
while (totalNoOfHFiles > currentCopied + this .copiesPerThread ) {
252
- c = new Copier (sourceFs , familyStagingDir ,
256
+ c = new Copier (sourceFs , sinkFamilyStagingDir ,
253
257
hfilePaths .subList (currentCopied , currentCopied + this .copiesPerThread ));
254
258
future = exec .submit (c );
255
259
futures .add (future );
@@ -258,7 +262,7 @@ private Map<String, Path> copyHFilesToStagingDir() throws IOException {
258
262
259
263
int remaining = totalNoOfHFiles - currentCopied ;
260
264
if (remaining > 0 ) {
261
- c = new Copier (sourceFs , familyStagingDir ,
265
+ c = new Copier (sourceFs , sinkFamilyStagingDir ,
262
266
hfilePaths .subList (currentCopied , currentCopied + remaining ));
263
267
future = exec .submit (c );
264
268
futures .add (future );
@@ -281,7 +285,7 @@ private Map<String, Path> copyHFilesToStagingDir() throws IOException {
281
285
}
282
286
// Add the staging directory to this table. Staging directory contains all the hfiles
283
287
// belonging to this table
284
- mapOfCopiedHFiles .put (tableName , stagingDir );
288
+ mapOfCopiedHFiles .put (tableNameStr , sinkStagingDir );
285
289
}
286
290
return mapOfCopiedHFiles ;
287
291
} finally {
@@ -294,12 +298,14 @@ private Map<String, Path> copyHFilesToStagingDir() throws IOException {
294
298
}
295
299
}
296
300
297
- private Path createStagingDir (Path baseDir , User user , TableName tableName ) throws IOException {
298
- String tblName = tableName .getNameAsString ().replace (":" , UNDERSCORE );
301
+ private Path createSinkStagingDir (Path baseDir , User user , TableName tableName )
302
+ throws IOException {
303
+ TableName sinkTableName = translator .getSinkTableName (tableName );
304
+ String sinkTableNameStr = sinkTableName .getNameAsString ().replace (":" , UNDERSCORE );
299
305
int RANDOM_WIDTH = 320 ;
300
306
int RANDOM_RADIX = 32 ;
301
307
String doubleUnderScore = UNDERSCORE + UNDERSCORE ;
302
- String randomDir = user .getShortName () + doubleUnderScore + tblName + doubleUnderScore
308
+ String randomDir = user .getShortName () + doubleUnderScore + sinkTableNameStr + doubleUnderScore
303
309
+ (new BigInteger (RANDOM_WIDTH , ThreadLocalRandom .current ()).toString (RANDOM_RADIX ));
304
310
return createStagingDir (baseDir , user , randomDir );
305
311
}
@@ -311,50 +317,55 @@ private Path createStagingDir(Path baseDir, User user, String randomDir) throws
311
317
return p ;
312
318
}
313
319
320
+ private Path getSinkFamilyStagingDir (Path baseDir , TableName tableName , byte [] family ) {
321
+ byte [] sinkFamily = translator .getSinkFamily (tableName , family );
322
+ return new Path (baseDir , Bytes .toString (sinkFamily ));
323
+ }
324
+
314
325
/**
315
326
* This class will copy the given hfiles from the given source file system to the given local file
316
327
* system staging directory.
317
328
*/
318
329
private class Copier implements Callable <Void > {
319
330
private FileSystem sourceFs ;
320
- private Path stagingDir ;
321
- private List <String > hfiles ;
331
+ private Path sinkStagingDir ;
332
+ private List <String > hfilePaths ;
322
333
323
- public Copier (FileSystem sourceFs , final Path stagingDir , final List <String > hfiles )
334
+ public Copier (FileSystem sourceFs , final Path sinkStagingDir , final List <String > hfilePaths )
324
335
throws IOException {
325
336
this .sourceFs = sourceFs ;
326
- this .stagingDir = stagingDir ;
327
- this .hfiles = hfiles ;
337
+ this .sinkStagingDir = sinkStagingDir ;
338
+ this .hfilePaths = hfilePaths ;
328
339
}
329
340
330
341
@ Override
331
342
public Void call () throws IOException {
332
343
Path sourceHFilePath ;
333
- Path localHFilePath ;
334
- int totalHFiles = hfiles .size ();
344
+ Path sinkHFilePath ;
345
+ int totalHFiles = hfilePaths .size ();
335
346
for (int i = 0 ; i < totalHFiles ; i ++) {
336
- sourceHFilePath = new Path (sourceBaseNamespaceDirPath , hfiles .get (i ));
337
- localHFilePath = new Path (stagingDir , sourceHFilePath .getName ());
347
+ sourceHFilePath = new Path (sourceBaseNamespaceDirPath , hfilePaths .get (i ));
348
+ sinkHFilePath = new Path (sinkStagingDir , sourceHFilePath .getName ());
338
349
try {
339
- FileUtil .copy (sourceFs , sourceHFilePath , sinkFs , localHFilePath , false , conf );
350
+ FileUtil .copy (sourceFs , sourceHFilePath , sinkFs , sinkHFilePath , false , conf );
340
351
// If any other exception other than FNFE then we will fail the replication requests and
341
352
// source will retry to replicate these data.
342
353
} catch (FileNotFoundException e ) {
343
- LOG .info ("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
354
+ LOG .info ("Failed to copy hfile from " + sourceHFilePath + " to " + sinkHFilePath
344
355
+ ". Trying to copy from hfile archive directory." , e );
345
- sourceHFilePath = new Path (sourceHFileArchiveDirPath , hfiles .get (i ));
356
+ sourceHFilePath = new Path (sourceHFileArchiveDirPath , hfilePaths .get (i ));
346
357
347
358
try {
348
- FileUtil .copy (sourceFs , sourceHFilePath , sinkFs , localHFilePath , false , conf );
359
+ FileUtil .copy (sourceFs , sourceHFilePath , sinkFs , sinkHFilePath , false , conf );
349
360
} catch (FileNotFoundException e1 ) {
350
361
// This will mean that the hfile does not exists any where in source cluster FS. So we
351
362
// cannot do anything here just log and continue.
352
- LOG .debug ("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
363
+ LOG .debug ("Failed to copy hfile from " + sourceHFilePath + " to " + sinkHFilePath
353
364
+ ". Hence ignoring this hfile from replication.." , e1 );
354
365
continue ;
355
366
}
356
367
}
357
- sinkFs .setPermission (localHFilePath , PERM_ALL_ACCESS );
368
+ sinkFs .setPermission (sinkHFilePath , PERM_ALL_ACCESS );
358
369
}
359
370
return null ;
360
371
}
0 commit comments