Skip to content

Commit c146ded

Browse files
committed
HBASE-29359 VerifyReplication needs to join on its verification tasks
1 parent c4d8b00 commit c146ded

File tree

2 files changed

+33
-40
lines changed

2 files changed

+33
-40
lines changed

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java

Lines changed: 31 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ public enum Counters {
156156
private int sleepMsBeforeReCompare;
157157
private String delimiter = "";
158158
private boolean verbose = false;
159-
private int batch = -1;
160159

161160
/**
162161
* Map method that compares every scanned row with the equivalent from a distant cluster.
@@ -178,7 +177,7 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
178177
}
179178
delimiter = conf.get(NAME + ".delimiter", "");
180179
verbose = conf.getBoolean(NAME + ".verbose", false);
181-
batch = conf.getInt(NAME + ".batch", -1);
180+
int batch = conf.getInt(NAME + ".batch", -1);
182181
final Scan scan = new Scan();
183182
if (batch > 0) {
184183
scan.setBatch(batch);
@@ -200,7 +199,7 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
200199
setRowPrefixFilter(scan, rowPrefixes);
201200
scan.setTimeRange(startTime, endTime);
202201
int versions = conf.getInt(NAME + ".versions", -1);
203-
LOG.info("Setting number of version inside map as: " + versions);
202+
LOG.info("Setting number of version inside map as: {}", versions);
204203
if (versions >= 0) {
205204
scan.readVersions(versions);
206205
}
@@ -245,9 +244,7 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
245244
String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null);
246245
FileSystem.setDefaultUri(peerConf, peerFSAddress);
247246
CommonFSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress));
248-
LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:"
249-
+ peerSnapshotTmpDir + " peer root uri:" + CommonFSUtils.getRootDir(peerConf)
250-
+ " peerFSAddress:" + peerFSAddress);
247+
LOG.info("Using peer snapshot:{} with temp dir:{} peer root uri:{} peerFSAddress:{}", peerSnapshotName, peerSnapshotTmpDir, CommonFSUtils.getRootDir(peerConf), peerFSAddress);
251248

252249
replicatedScanner = new TableSnapshotScanner(peerConf, CommonFSUtils.getRootDir(peerConf),
253250
new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan, true);
@@ -270,7 +267,7 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
270267
context.getCounter(Counters.GOODROWS).increment(1);
271268
if (verbose) {
272269
LOG.info(
273-
"Good row key: " + delimiter + Bytes.toStringBinary(value.getRow()) + delimiter);
270+
"Good row key: {}", delimiter + Bytes.toStringBinary(value.getRow()) + delimiter);
274271
}
275272
} catch (Exception e) {
276273
logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value,
@@ -291,7 +288,6 @@ public void map(ImmutableBytesWritable row, final Result value, Context context)
291288
}
292289
}
293290

294-
@SuppressWarnings("FutureReturnValueIgnored")
295291
private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row,
296292
Result replicatedRow) {
297293
byte[] rowKey = getRow(row, replicatedRow);
@@ -311,7 +307,11 @@ private void logFailRowAndIncreaseCounter(Context context, Counters counter, Res
311307
return;
312308
}
313309

314-
reCompareExecutor.submit(runnable);
310+
try {
311+
reCompareExecutor.submit(runnable).get();
312+
} catch (Exception e) {
313+
throw new RuntimeException(e);
314+
}
315315
}
316316

317317
@Override
@@ -389,18 +389,14 @@ protected void cleanup(Context context) {
389389

390390
private static Pair<ReplicationPeerConfig, Configuration>
391391
getPeerQuorumConfig(final Configuration conf, String peerId) throws IOException {
392-
ZKWatcher localZKW = null;
393-
try {
394-
localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() {
395-
@Override
396-
public void abort(String why, Throwable e) {
397-
}
392+
try (ZKWatcher localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() {
393+
@Override public void abort(String why, Throwable e) {
394+
}
398395

399-
@Override
400-
public boolean isAborted() {
401-
return false;
402-
}
403-
});
396+
@Override public boolean isAborted() {
397+
return false;
398+
}
399+
})) {
404400
ReplicationPeerStorage storage =
405401
ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), localZKW, conf);
406402
ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
@@ -409,10 +405,6 @@ public boolean isAborted() {
409405
} catch (ReplicationException e) {
410406
throw new IOException("An error occurred while trying to connect to the remote peer cluster",
411407
e);
412-
} finally {
413-
if (localZKW != null) {
414-
localZKW.close();
415-
}
416408
}
417409
}
418410

@@ -471,25 +463,25 @@ public Job createSubmittableJob(Configuration conf, String[] args) throws IOExce
471463
peerConfigPair = getPeerQuorumConfig(conf, peerId);
472464
ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
473465
peerQuorumAddress = peerConfig.getClusterKey();
474-
LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: "
475-
+ peerConfig.getConfiguration());
466+
LOG.info("Peer Quorum Address: {}, Peer Configuration: {}", peerQuorumAddress,
467+
peerConfig.getConfiguration());
476468
conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
477469
HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
478470
peerConfig.getConfiguration().entrySet());
479471
} else {
480472
assert this.peerQuorumAddress != null;
481473
peerQuorumAddress = this.peerQuorumAddress;
482-
LOG.info("Peer Quorum Address: " + peerQuorumAddress);
474+
LOG.info("Peer Quorum Address: {}", peerQuorumAddress);
483475
conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
484476
}
485477

486478
if (peerTableName != null) {
487-
LOG.info("Peer Table Name: " + peerTableName);
479+
LOG.info("Peer Table Name: {}", peerTableName);
488480
conf.set(NAME + ".peerTableName", peerTableName);
489481
}
490482

491483
conf.setInt(NAME + ".versions", versions);
492-
LOG.info("Number of version: " + versions);
484+
LOG.info("Number of version: {}", versions);
493485

494486
conf.setInt(NAME + ".recompareTries", reCompareTries);
495487
conf.setInt(NAME + ".recompareBackoffExponent", reCompareBackoffExponent);
@@ -524,7 +516,7 @@ public Job createSubmittableJob(Configuration conf, String[] args) throws IOExce
524516
}
525517
if (versions >= 0) {
526518
scan.readVersions(versions);
527-
LOG.info("Number of versions set to " + versions);
519+
LOG.info("Number of versions set to {}", versions);
528520
}
529521
if (families != null) {
530522
String[] fams = families.split(",");
@@ -537,8 +529,8 @@ public Job createSubmittableJob(Configuration conf, String[] args) throws IOExce
537529

538530
if (sourceSnapshotName != null) {
539531
Path snapshotTempPath = new Path(sourceSnapshotTmpDir);
540-
LOG.info(
541-
"Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir);
532+
LOG.info("Using source snapshot-{} with temp dir:{}", sourceSnapshotName,
533+
sourceSnapshotTmpDir);
542534
TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null,
543535
null, job, true, snapshotTempPath);
544536
restoreSnapshotForPeerCluster(conf, peerQuorumAddress);
@@ -819,7 +811,7 @@ private boolean isPeerQuorumAddress(String cmd) {
819811
* @param errorMsg Error message. Can be null.
820812
*/
821813
private static void printUsage(final String errorMsg) {
822-
if (errorMsg != null && errorMsg.length() > 0) {
814+
if (errorMsg != null && !errorMsg.isEmpty()) {
823815
System.err.println("ERROR: " + errorMsg);
824816
}
825817
System.err.println("Usage: verifyrep [--starttime=X]"
@@ -914,7 +906,7 @@ private static void printUsage(final String errorMsg) {
914906
+ "2181:/cluster-b \\\n" + " TestTable");
915907
}
916908

917-
private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads, Mapper.Context context) {
909+
private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads, Mapper<?, ?, ?, ?>.Context context) {
918910
if (maxThreads == 0) {
919911
return null;
920912
}
@@ -923,7 +915,7 @@ private static ThreadPoolExecutor buildReCompareExecutor(int maxThreads, Mapper.
923915
buildRejectedReComparePolicy(context));
924916
}
925917

926-
private static CallerRunsPolicy buildRejectedReComparePolicy(Mapper.Context context) {
918+
private static CallerRunsPolicy buildRejectedReComparePolicy(Mapper<?, ?, ?, ?>.Context context) {
927919
return new CallerRunsPolicy() {
928920
@Override
929921
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
@@ -938,9 +930,10 @@ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor e) {
938930
@Override
939931
public int run(String[] args) throws Exception {
940932
Configuration conf = this.getConf();
941-
Job job = createSubmittableJob(conf, args);
942-
if (job != null) {
943-
return job.waitForCompletion(true) ? 0 : 1;
933+
try (Job job = createSubmittableJob(conf, args)) {
934+
if (job != null) {
935+
return job.waitForCompletion(true) ? 0 : 1;
936+
}
944937
}
945938
return 1;
946939
}

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplicationRecompareRunnable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class VerifyReplicationRecompareRunnable implements Runnable {
3434
private static final Logger LOG =
3535
LoggerFactory.getLogger(VerifyReplicationRecompareRunnable.class);
3636

37-
private final Mapper.Context context;
37+
private final Mapper<?, ?, ?, ?>.Context context;
3838
private final VerifyReplication.Verifier.Counters originalCounter;
3939
private final String delimiter;
4040
private final byte[] row;
@@ -50,7 +50,7 @@ public class VerifyReplicationRecompareRunnable implements Runnable {
5050
private Result sourceResult;
5151
private Result replicatedResult;
5252

53-
public VerifyReplicationRecompareRunnable(Mapper.Context context, Result sourceResult,
53+
public VerifyReplicationRecompareRunnable(Mapper<?, ?, ?, ?>.Context context, Result sourceResult,
5454
Result replicatedResult, VerifyReplication.Verifier.Counters originalCounter, String delimiter,
5555
Scan tableScan, Table sourceTable, Table replicatedTable, int reCompareTries,
5656
int sleepMsBeforeReCompare, int reCompareBackoffExponent, boolean verbose) {

0 commit comments

Comments
 (0)