|
35 | 35 | import org.apache.hadoop.hbase.TableName; |
36 | 36 | import org.apache.hadoop.hbase.client.Connection; |
37 | 37 | import org.apache.hadoop.hbase.client.ConnectionFactory; |
38 | | -import org.apache.hadoop.hbase.client.Get; |
39 | 38 | import org.apache.hadoop.hbase.client.Put; |
40 | 39 | import org.apache.hadoop.hbase.client.Result; |
41 | 40 | import org.apache.hadoop.hbase.client.ResultScanner; |
|
61 | 60 | import org.apache.hadoop.hbase.util.Bytes; |
62 | 61 | import org.apache.hadoop.hbase.util.CommonFSUtils; |
63 | 62 | import org.apache.hadoop.hbase.util.Pair; |
64 | | -import org.apache.hadoop.hbase.util.Threads; |
65 | 63 | import org.apache.hadoop.hbase.zookeeper.ZKConfig; |
66 | 64 | import org.apache.hadoop.hbase.zookeeper.ZKWatcher; |
67 | 65 | import org.apache.hadoop.mapreduce.InputSplit; |
@@ -293,14 +291,15 @@ private void logFailRowAndIncreaseCounter(Context context, Counters counter, Res |
293 | 291 | return; |
294 | 292 | } |
295 | 293 |
|
| 294 | + VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, |
| 295 | + row, replicatedRow, counter, delimiter, tableScan, sourceTable, replicatedTable, |
| 296 | + reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, verbose); |
| 297 | + |
296 | 298 | if (reCompareExecutor == null) { |
297 | | - syncLogFailRowAndIncreaseCounter(context, counter, rowKey); |
| 299 | + runnable.run(); |
298 | 300 | return; |
299 | 301 | } |
300 | 302 |
|
301 | | - VerifyReplicationRecompareRunnable runnable = new VerifyReplicationRecompareRunnable(context, |
302 | | - row, replicatedRow, counter, delimiter, tableScan, sourceTable, replicatedTable, |
303 | | - reCompareTries, sleepMsBeforeReCompare, reCompareBackoffExponent, verbose); |
304 | 303 | reCompareExecutor.submit(runnable); |
305 | 304 | } |
306 | 305 |
|
@@ -375,38 +374,6 @@ protected void cleanup(Context context) { |
375 | 374 | } |
376 | 375 | } |
377 | 376 | } |
378 | | - |
379 | | - private void syncLogFailRowAndIncreaseCounter(Mapper.Context context, Counters counter, |
380 | | - byte[] row) { |
381 | | - int sleepMs = sleepMsBeforeReCompare; |
382 | | - int tries = 0; |
383 | | - |
384 | | - while (++tries <= reCompareTries) { |
385 | | - context.getCounter(VerifyReplication.Verifier.Counters.RECOMPARES).increment(1); |
386 | | - Threads.sleep(sleepMs); |
387 | | - try { |
388 | | - Result sourceResult = sourceTable.get(new Get(row)); |
389 | | - Result replicatedResult = replicatedTable.get(new Get(row)); |
390 | | - Result.compareResults(sourceResult, replicatedResult, false); |
391 | | - if (!sourceResult.isEmpty()) { |
392 | | - context.getCounter(Counters.GOODROWS).increment(1); |
393 | | - if (verbose) { |
394 | | - LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row) |
395 | | - + delimiter); |
396 | | - } |
397 | | - } |
398 | | - return; |
399 | | - } catch (Exception e) { |
400 | | - context.getCounter(Counters.FAILED_RECOMPARE).increment(1); |
401 | | - LOG.error("recompare fail after sleep, rowkey=" + delimiter + Bytes.toStringBinary(row) |
402 | | - + delimiter); |
403 | | - } |
404 | | - sleepMs = sleepMs * (2 ^ reCompareBackoffExponent); |
405 | | - } |
406 | | - context.getCounter(counter).increment(1); |
407 | | - context.getCounter(Counters.BADROWS).increment(1); |
408 | | - LOG.error("{}, rowkey={}{}{}", counter, delimiter, Bytes.toStringBinary(row), delimiter); |
409 | | - } |
410 | 377 | } |
411 | 378 |
|
412 | 379 | private static Pair<ReplicationPeerConfig, Configuration> |
|
0 commit comments