|
66 | 66 | import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
|
67 | 67 | import org.apache.hadoop.hbase.master.SplitLogManager;
|
68 | 68 | import org.apache.hadoop.hbase.regionserver.HRegion;
|
| 69 | +import org.apache.hadoop.hbase.regionserver.LastSequenceId; |
| 70 | +import org.apache.hadoop.hbase.regionserver.RegionServerServices; |
69 | 71 | import org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader;
|
70 | 72 | import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufWALStreamReader;
|
71 | 73 | import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
|
|
105 | 107 | import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
106 | 108 |
|
107 | 109 | import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
| 110 | +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos; |
108 | 111 | import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
109 | 112 |
|
110 | 113 | /**
|
@@ -372,6 +375,69 @@ private void loop(final Writer writer) {
|
372 | 375 | }
|
373 | 376 | }
|
374 | 377 |
|
| 378 | + //If another worker is assigned to split a WAl and last worker is still running, both should not impact each other's progress |
| 379 | + @Test |
| 380 | + public void testTwoWorkerSplittingSameWAL() throws IOException, InterruptedException { |
| 381 | + int numWriter = 1, entries = 10; |
| 382 | + generateWALs(numWriter, entries, -1, 0); |
| 383 | + FileStatus logfile = fs.listStatus(WALDIR)[0]; |
| 384 | + FileSystem spiedFs = Mockito.spy(fs); |
| 385 | + RegionServerServices zombieRSServices = Mockito.mock(RegionServerServices.class); |
| 386 | + RegionServerServices newWorkerRSServices = Mockito.mock(RegionServerServices.class); |
| 387 | + Mockito.when(zombieRSServices.getServerName()) |
| 388 | + .thenReturn(ServerName.valueOf("zombie-rs.abc.com,1234,1234567890")); |
| 389 | + Mockito.when(newWorkerRSServices.getServerName()) |
| 390 | + .thenReturn(ServerName.valueOf("worker-rs.abc.com,1234,1234569870")); |
| 391 | + Thread zombieWorker = new SplitWALWorker(logfile, spiedFs, zombieRSServices); |
| 392 | + Thread newWorker = new SplitWALWorker(logfile, spiedFs, newWorkerRSServices); |
| 393 | + zombieWorker.start(); |
| 394 | + newWorker.start(); |
| 395 | + newWorker.join(); |
| 396 | + zombieWorker.join(); |
| 397 | + |
| 398 | + for (String region : REGIONS) { |
| 399 | + Path[] logfiles = getLogForRegion(TABLE_NAME, region); |
| 400 | + assertEquals("wrong number of split files for region", numWriter, logfiles.length); |
| 401 | + |
| 402 | + int count = 0; |
| 403 | + for (Path lf : logfiles) { |
| 404 | + count += countWAL(lf); |
| 405 | + } |
| 406 | + assertEquals("wrong number of edits for region " + region, entries, count); |
| 407 | + } |
| 408 | + } |
| 409 | + |
| 410 | + private class SplitWALWorker extends Thread implements LastSequenceId { |
| 411 | + final FileStatus logfile; |
| 412 | + final FileSystem fs; |
| 413 | + final RegionServerServices rsServices; |
| 414 | + |
| 415 | + public SplitWALWorker(FileStatus logfile, FileSystem fs, RegionServerServices rsServices) { |
| 416 | + super(rsServices.getServerName().toShortString()); |
| 417 | + setDaemon(true); |
| 418 | + this.fs = fs; |
| 419 | + this.logfile = logfile; |
| 420 | + this.rsServices = rsServices; |
| 421 | + } |
| 422 | + |
| 423 | + @Override |
| 424 | + public void run() { |
| 425 | + try { |
| 426 | + boolean ret = |
| 427 | + WALSplitter.splitLogFile(HBASEDIR, logfile, fs, conf, null, this, null, wals, rsServices); |
| 428 | + assertTrue("Both splitting should pass", ret); |
| 429 | + } catch (IOException e) { |
| 430 | + LOG.warn(getName() + " Worker exiting " + e); |
| 431 | + } |
| 432 | + } |
| 433 | + |
| 434 | + @Override |
| 435 | + public ClusterStatusProtos.RegionStoreSequenceIds getLastSequenceId(byte[] encodedRegionName) { |
| 436 | + return ClusterStatusProtos.RegionStoreSequenceIds.newBuilder() |
| 437 | + .setLastFlushedSequenceId(HConstants.NO_SEQNUM).build(); |
| 438 | + } |
| 439 | + } |
| 440 | + |
375 | 441 | /**
|
376 | 442 | * @see "https://issues.apache.org/jira/browse/HBASE-3020"
|
377 | 443 | */
|
@@ -403,7 +469,7 @@ public void testOldRecoveredEditsFileSidelined() throws IOException {
|
403 | 469 | private Path createRecoveredEditsPathForRegion() throws IOException {
|
404 | 470 | byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
|
405 | 471 | Path p = WALSplitUtil.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1,
|
406 |
| - FILENAME_BEING_SPLIT, TMPDIRNAME, conf, 0L); |
| 472 | + FILENAME_BEING_SPLIT, TMPDIRNAME, conf, ""); |
407 | 473 | return p;
|
408 | 474 | }
|
409 | 475 |
|
|
0 commit comments