Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@
public class ReplicationLoadSink {
private final long ageOfLastAppliedOp;
private final long timestampsOfLastAppliedOp;
private final long timestampStarted;
private final long totalOpsProcessed;

// TODO: add the builder for this class
@InterfaceAudience.Private
public ReplicationLoadSink(long age, long timestamp) {
public ReplicationLoadSink(long age, long timestamp, long timestampStarted,
long totalOpsProcessed) {
this.ageOfLastAppliedOp = age;
this.timestampsOfLastAppliedOp = timestamp;
this.timestampStarted = timestampStarted;
this.totalOpsProcessed = totalOpsProcessed;
}

public long getAgeOfLastAppliedOp() {
Expand All @@ -34,4 +39,12 @@ public long getAgeOfLastAppliedOp() {
public long getTimestampsOfLastAppliedOp() {
return this.timestampsOfLastAppliedOp;
}

public long getTimestampStarted() {
return timestampStarted;
}

public long getTotalOpsProcessed() {
return totalOpsProcessed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2843,7 +2843,10 @@ public static void mergeFrom(Message.Builder builder, CodedInputStream codedInpu

public static ReplicationLoadSink toReplicationLoadSink(
ClusterStatusProtos.ReplicationLoadSink rls) {
return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(), rls.getTimeStampsOfLastAppliedOp());
return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(),
rls.getTimeStampsOfLastAppliedOp(),
rls.getTimestampStarted(),
rls.getTotalOpsProcessed());
}

public static ReplicationLoadSource toReplicationLoadSource(
Expand Down Expand Up @@ -3438,6 +3441,8 @@ public static ClusterStatusProtos.ReplicationLoadSink toReplicationLoadSink(
return ClusterStatusProtos.ReplicationLoadSink.newBuilder()
.setAgeOfLastAppliedOp(rls.getAgeOfLastAppliedOp())
.setTimeStampsOfLastAppliedOp(rls.getTimestampsOfLastAppliedOp())
.setTimestampStarted(rls.getTimestampStarted())
.setTotalOpsProcessed(rls.getTotalOpsProcessed())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ public interface MetricsReplicationSinkSource {
void incrAppliedOps(long batchsize);
long getLastAppliedOpAge();
void incrAppliedHFiles(long hfileSize);
long getSinkAppliedOps();
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ public long getLastAppliedOpAge() {
public void incrAppliedHFiles(long hfiles) {
hfilesCounter.incr(hfiles);
}

@Override public long getSinkAppliedOps() {
return opsCounter.value();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ message ClientMetrics {
message ReplicationLoadSink {
required uint64 ageOfLastAppliedOp = 1;
required uint64 timeStampsOfLastAppliedOp = 2;
required uint64 timestampStarted = 3;
required uint64 totalOpsProcessed = 4;
}

message ReplicationLoadSource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
public class MetricsSink {

private long lastTimestampForAge = System.currentTimeMillis();
private long startTimestamp = System.currentTimeMillis();
private final MetricsReplicationSinkSource mss;

public MetricsSink() {
Expand Down Expand Up @@ -98,4 +99,21 @@ public long getAgeOfLastAppliedOp() {
public long getTimestampOfLastAppliedOp() {
return this.lastTimestampForAge;
}

/**
* Gets the time stamp from when the Sink was initialized.
* @return startTimestamp
*/
public long getStartTimestamp() {
return this.startTimestamp;
}

/**
* Gets the total number of OPs delivered to this sink.
* @return totalAplliedOps
*/
public long getAppliedOps() {
return this.mss.getSinkAppliedOps();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public void buildReplicationLoad(final List<ReplicationSourceInterface> sources,
ClusterStatusProtos.ReplicationLoadSink.newBuilder();
rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp());
rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps());
this.replicationLoadSink = rLoadSinkBuild.build();

this.replicationLoadSourceEntries = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ public class TestReplicationStatus extends TestReplicationBase {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationStatus.class);

private void insertRowsOnSource() throws IOException {
final byte[] qualName = Bytes.toBytes("q");
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
htable1.put(p);
}
}

/**
* Test for HBASE-9531.
* <p/>
Expand All @@ -70,12 +79,7 @@ public void testReplicationStatus() throws Exception {
Admin hbaseAdmin = UTIL1.getAdmin();
// disable peer <= WHY? I DON'T GET THIS DISABLE BUT TEST FAILS W/O IT.
hbaseAdmin.disableReplicationPeer(PEER_ID2);
final byte[] qualName = Bytes.toBytes("q");
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
htable1.put(p);
}
insertRowsOnSource();
LOG.info("AFTER PUTS");
// TODO: Change this wait to a barrier. I tried waiting on replication stats to
// change but sleeping in main thread seems to mess up background replication.
Expand Down Expand Up @@ -120,6 +124,35 @@ public void testReplicationStatus() throws Exception {
assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID());
}

@Test
public void testReplicationStatusSink() throws Exception {
try (Admin hbaseAdmin = UTIL2.getConnection().getAdmin()) {
ServerName server = UTIL2.getHBaseCluster().getRegionServer(0).getServerName();
ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server);
//First checks if status of timestamp of last applied op is same as RS start, since no edits
//were replicated yet
assertEquals(loadSink.getTimestampStarted(), loadSink.getTimestampsOfLastAppliedOp());
//now insert some rows on source, so that it gets delivered to target
insertRowsOnSource();
long wait = Waiter.waitFor(UTIL2.getConfiguration(),
10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server);
return loadSink.getTimestampsOfLastAppliedOp()>loadSink.getTimestampStarted();
}
});
//If wait is -1, we know predicate condition was never true
assertTrue(wait>=0);
}
}

private ReplicationLoadSink getLatestSinkMetric(Admin admin, ServerName server)
throws IOException {
ClusterMetrics metrics = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
return sm.getReplicationLoadSink();
}
/**
* Wait until Master shows metrics counts for ReplicationLoadSourceList that are
* greater than <code>greaterThan</code> for <code>serverName</code> before
Expand Down
16 changes: 11 additions & 5 deletions hbase-shell/src/main/ruby/hbase/admin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -835,12 +835,18 @@ def status(format, type)
r_source_string = ' SOURCE:'
r_load_sink = sl.getReplicationLoadSink
next if r_load_sink.nil?
if r_load_sink.getTimestampsOfLastAppliedOp() == r_load_sink.getTimestampStarted()
# If we have applied no operations since we've started replication,
# assume that we're not acting as a sink and don't print the normal information
r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
r_sink_string << ", Waiting for OPs... "
else
r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
r_sink_string << ", AgeOfLastAppliedOp=" + r_load_sink.getAgeOfLastAppliedOp().to_s
r_sink_string << ", TimeStampsOfLastAppliedOp=" +
(java.util.Date.new(r_load_sink.getTimestampsOfLastAppliedOp())).toString()
end

r_sink_string << ' AgeOfLastAppliedOp=' +
r_load_sink.getAgeOfLastAppliedOp.to_s
r_sink_string << ', TimeStampsOfLastAppliedOp=' +
java.util.Date.new(r_load_sink
.getTimestampsOfLastAppliedOp).toString
r_load_source_map = sl.getReplicationLoadSourceMap
build_source_string(r_load_source_map, r_source_string)
puts(format(' %<host>s:', host: server_name.getHostname))
Expand Down