@@ -242,6 +242,7 @@ static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWrit
242
242
private final Map <byte [], WriterLength > writers = new TreeMap <>(Bytes .BYTES_COMPARATOR );
243
243
private final Map <byte [], byte []> previousRows = new TreeMap <>(Bytes .BYTES_COMPARATOR );
244
244
private final long now = EnvironmentEdgeManager .currentTime ();
245
+ private byte [] tableNameBytes = writeMultipleTables ? null : Bytes .toBytes (writeTableNames );
245
246
246
247
@ Override
247
248
public void write (ImmutableBytesWritable row , V cell ) throws IOException {
@@ -255,25 +256,22 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
255
256
byte [] rowKey = CellUtil .cloneRow (kv );
256
257
int length = (PrivateCellUtil .estimatedSerializedSizeOf (kv )) - Bytes .SIZEOF_INT ;
257
258
byte [] family = CellUtil .cloneFamily (kv );
258
- byte [] tableNameBytes = null ;
259
259
if (writeMultipleTables ) {
260
260
tableNameBytes = MultiTableHFileOutputFormat .getTableName (row .get ());
261
261
tableNameBytes = TableName .valueOf (tableNameBytes ).toBytes ();
262
262
if (!allTableNames .contains (Bytes .toString (tableNameBytes ))) {
263
263
throw new IllegalArgumentException (
264
264
"TableName " + Bytes .toString (tableNameBytes ) + " not expected" );
265
265
}
266
- } else {
267
- tableNameBytes = Bytes .toBytes (writeTableNames );
268
266
}
269
- Path tableRelPath = getTableRelativePath (tableNameBytes );
270
267
byte [] tableAndFamily = getTableNameSuffixedWithFamily (tableNameBytes , family );
271
268
WriterLength wl = this .writers .get (tableAndFamily );
272
269
273
270
// If this is a new column family, verify that the directory exists
274
271
if (wl == null ) {
275
272
Path writerPath = null ;
276
273
if (writeMultipleTables ) {
274
+ Path tableRelPath = getTableRelativePath (tableNameBytes );
277
275
writerPath = new Path (outputDir , new Path (tableRelPath , Bytes .toString (family )));
278
276
} else {
279
277
writerPath = new Path (outputDir , Bytes .toString (family ));
@@ -292,6 +290,7 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
292
290
293
291
// create a new WAL writer, if necessary
294
292
if (wl == null || wl .writer == null ) {
293
+ InetSocketAddress [] favoredNodes = null ;
295
294
if (conf .getBoolean (LOCALITY_SENSITIVE_CONF_KEY , DEFAULT_LOCALITY_SENSITIVE )) {
296
295
HRegionLocation loc = null ;
297
296
@@ -308,26 +307,22 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
308
307
loc = null ;
309
308
}
310
309
}
311
-
312
310
if (null == loc ) {
313
311
LOG .trace ("Failed get of location, use default writer {}" , Bytes .toString (rowKey ));
314
- wl = getNewWriter (tableNameBytes , family , conf , null );
315
312
} else {
316
313
LOG .debug ("First rowkey: [{}]" , Bytes .toString (rowKey ));
317
314
InetSocketAddress initialIsa =
318
315
new InetSocketAddress (loc .getHostname (), loc .getPort ());
319
316
if (initialIsa .isUnresolved ()) {
320
317
LOG .trace ("Failed resolve address {}, use default writer" , loc .getHostnamePort ());
321
- wl = getNewWriter (tableNameBytes , family , conf , null );
322
318
} else {
323
319
LOG .debug ("Use favored nodes writer: {}" , initialIsa .getHostString ());
324
- wl = getNewWriter (tableNameBytes , family , conf ,
325
- new InetSocketAddress [] { initialIsa });
320
+ favoredNodes = new InetSocketAddress [] { initialIsa };
326
321
}
327
322
}
328
- } else {
329
- wl = getNewWriter (tableNameBytes , family , conf , null );
330
323
}
324
+ wl = getNewWriter (tableNameBytes , family , conf , favoredNodes );
325
+
331
326
}
332
327
333
328
// we now have the proper WAL writer. full steam ahead
@@ -342,9 +337,9 @@ public void write(ImmutableBytesWritable row, V cell) throws IOException {
342
337
private Path getTableRelativePath (byte [] tableNameBytes ) {
343
338
String tableName = Bytes .toString (tableNameBytes );
344
339
String [] tableNameParts = tableName .split (":" );
345
- Path tableRelPath = new Path (tableName . split ( ":" ) [0 ]);
340
+ Path tableRelPath = new Path (tableNameParts [0 ]);
346
341
if (tableNameParts .length > 1 ) {
347
- tableRelPath = new Path (tableRelPath , tableName . split ( ":" ) [1 ]);
342
+ tableRelPath = new Path (tableRelPath , tableNameParts [1 ]);
348
343
}
349
344
return tableRelPath ;
350
345
}
0 commit comments