Skip to content

Commit e5345b3

Browse files
authored
HBASE-21406 "status 'replication'" should not show SINK if the cluste… (apache#1761)
Signed-off-by: Jan Hentschel <jan.hentschel@ultratendency.com> Signed-off by: Viraj Jasani <vjasani@apache.org> Signed-off-by: Josh Elser <elserj@apache.org>
1 parent bad2d4e commit e5345b3

File tree

9 files changed

+97
-13
lines changed

9 files changed

+97
-13
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,17 @@
1919
public class ReplicationLoadSink {
2020
private final long ageOfLastAppliedOp;
2121
private final long timestampsOfLastAppliedOp;
22+
private final long timestampStarted;
23+
private final long totalOpsProcessed;
2224

2325
// TODO: add the builder for this class
2426
@InterfaceAudience.Private
25-
public ReplicationLoadSink(long age, long timestamp) {
27+
public ReplicationLoadSink(long age, long timestamp, long timestampStarted,
28+
long totalOpsProcessed) {
2629
this.ageOfLastAppliedOp = age;
2730
this.timestampsOfLastAppliedOp = timestamp;
31+
this.timestampStarted = timestampStarted;
32+
this.totalOpsProcessed = totalOpsProcessed;
2833
}
2934

3035
public long getAgeOfLastAppliedOp() {
@@ -34,4 +39,12 @@ public long getAgeOfLastAppliedOp() {
3439
public long getTimestampsOfLastAppliedOp() {
3540
return this.timestampsOfLastAppliedOp;
3641
}
42+
43+
public long getTimestampStarted() {
44+
return timestampStarted;
45+
}
46+
47+
public long getTotalOpsProcessed() {
48+
return totalOpsProcessed;
49+
}
3750
}

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2843,7 +2843,10 @@ public static void mergeFrom(Message.Builder builder, CodedInputStream codedInpu
28432843

28442844
public static ReplicationLoadSink toReplicationLoadSink(
28452845
ClusterStatusProtos.ReplicationLoadSink rls) {
2846-
return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(), rls.getTimeStampsOfLastAppliedOp());
2846+
return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(),
2847+
rls.getTimeStampsOfLastAppliedOp(),
2848+
rls.getTimestampStarted(),
2849+
rls.getTotalOpsProcessed());
28472850
}
28482851

28492852
public static ReplicationLoadSource toReplicationLoadSource(
@@ -3438,6 +3441,8 @@ public static ClusterStatusProtos.ReplicationLoadSink toReplicationLoadSink(
34383441
return ClusterStatusProtos.ReplicationLoadSink.newBuilder()
34393442
.setAgeOfLastAppliedOp(rls.getAgeOfLastAppliedOp())
34403443
.setTimeStampsOfLastAppliedOp(rls.getTimestampsOfLastAppliedOp())
3444+
.setTimestampStarted(rls.getTimestampStarted())
3445+
.setTotalOpsProcessed(rls.getTotalOpsProcessed())
34413446
.build();
34423447
}
34433448

hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,5 @@ public interface MetricsReplicationSinkSource {
3232
void incrAppliedOps(long batchsize);
3333
long getLastAppliedOpAge();
3434
void incrAppliedHFiles(long hfileSize);
35+
long getSinkAppliedOps();
3536
}

hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,8 @@ public long getLastAppliedOpAge() {
5858
public void incrAppliedHFiles(long hfiles) {
5959
hfilesCounter.incr(hfiles);
6060
}
61+
62+
@Override public long getSinkAppliedOps() {
63+
return opsCounter.value();
64+
}
6165
}

hbase-protocol-shaded/src/main/protobuf/server/ClusterStatus.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ message ClientMetrics {
185185
message ReplicationLoadSink {
186186
required uint64 ageOfLastAppliedOp = 1;
187187
required uint64 timeStampsOfLastAppliedOp = 2;
188+
required uint64 timestampStarted = 3;
189+
required uint64 totalOpsProcessed = 4;
188190
}
189191

190192
message ReplicationLoadSource {

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
public class MetricsSink {
3030

3131
private long lastTimestampForAge = System.currentTimeMillis();
32+
private long startTimestamp = System.currentTimeMillis();
3233
private final MetricsReplicationSinkSource mss;
3334

3435
public MetricsSink() {
@@ -98,4 +99,21 @@ public long getAgeOfLastAppliedOp() {
9899
public long getTimestampOfLastAppliedOp() {
99100
return this.lastTimestampForAge;
100101
}
102+
103+
/**
104+
* Gets the time stamp from when the Sink was initialized.
105+
* @return startTimestamp
106+
*/
107+
public long getStartTimestamp() {
108+
return this.startTimestamp;
109+
}
110+
111+
/**
112+
* Gets the total number of OPs delivered to this sink.
113+
* @return totalAplliedOps
114+
*/
115+
public long getAppliedOps() {
116+
return this.mss.getSinkAppliedOps();
117+
}
118+
101119
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public void buildReplicationLoad(final List<ReplicationSourceInterface> sources,
6161
ClusterStatusProtos.ReplicationLoadSink.newBuilder();
6262
rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
6363
rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
64+
rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp());
65+
rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps());
6466
this.replicationLoadSink = rLoadSinkBuild.build();
6567

6668
this.replicationLoadSourceEntries = new ArrayList<>();

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,15 @@ public class TestReplicationStatus extends TestReplicationBase {
5050
public static final HBaseClassTestRule CLASS_RULE =
5151
HBaseClassTestRule.forClass(TestReplicationStatus.class);
5252

53+
private void insertRowsOnSource() throws IOException {
54+
final byte[] qualName = Bytes.toBytes("q");
55+
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
56+
Put p = new Put(Bytes.toBytes("row" + i));
57+
p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
58+
htable1.put(p);
59+
}
60+
}
61+
5362
/**
5463
* Test for HBASE-9531.
5564
* <p/>
@@ -70,12 +79,7 @@ public void testReplicationStatus() throws Exception {
7079
Admin hbaseAdmin = UTIL1.getAdmin();
7180
// disable peer <= WHY? I DON'T GET THIS DISABLE BUT TEST FAILS W/O IT.
7281
hbaseAdmin.disableReplicationPeer(PEER_ID2);
73-
final byte[] qualName = Bytes.toBytes("q");
74-
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
75-
Put p = new Put(Bytes.toBytes("row" + i));
76-
p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
77-
htable1.put(p);
78-
}
82+
insertRowsOnSource();
7983
LOG.info("AFTER PUTS");
8084
// TODO: Change this wait to a barrier. I tried waiting on replication stats to
8185
// change but sleeping in main thread seems to mess up background replication.
@@ -120,6 +124,35 @@ public void testReplicationStatus() throws Exception {
120124
assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID());
121125
}
122126

127+
@Test
128+
public void testReplicationStatusSink() throws Exception {
129+
try (Admin hbaseAdmin = UTIL2.getConnection().getAdmin()) {
130+
ServerName server = UTIL2.getHBaseCluster().getRegionServer(0).getServerName();
131+
ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server);
132+
//First checks if status of timestamp of last applied op is same as RS start, since no edits
133+
//were replicated yet
134+
assertEquals(loadSink.getTimestampStarted(), loadSink.getTimestampsOfLastAppliedOp());
135+
//now insert some rows on source, so that it gets delivered to target
136+
insertRowsOnSource();
137+
long wait = Waiter.waitFor(UTIL2.getConfiguration(),
138+
10000, new Waiter.Predicate<Exception>() {
139+
@Override
140+
public boolean evaluate() throws Exception {
141+
ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server);
142+
return loadSink.getTimestampsOfLastAppliedOp()>loadSink.getTimestampStarted();
143+
}
144+
});
145+
//If wait is -1, we know predicate condition was never true
146+
assertTrue(wait>=0);
147+
}
148+
}
149+
150+
private ReplicationLoadSink getLatestSinkMetric(Admin admin, ServerName server)
151+
throws IOException {
152+
ClusterMetrics metrics = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
153+
ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
154+
return sm.getReplicationLoadSink();
155+
}
123156
/**
124157
* Wait until Master shows metrics counts for ReplicationLoadSourceList that are
125158
* greater than <code>greaterThan</code> for <code>serverName</code> before

hbase-shell/src/main/ruby/hbase/admin.rb

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -835,12 +835,18 @@ def status(format, type)
835835
r_source_string = ' SOURCE:'
836836
r_load_sink = sl.getReplicationLoadSink
837837
next if r_load_sink.nil?
838+
if r_load_sink.getTimestampsOfLastAppliedOp() == r_load_sink.getTimestampStarted()
839+
# If we have applied no operations since we've started replication,
840+
# assume that we're not acting as a sink and don't print the normal information
841+
r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
842+
r_sink_string << ", Waiting for OPs... "
843+
else
844+
r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
845+
r_sink_string << ", AgeOfLastAppliedOp=" + r_load_sink.getAgeOfLastAppliedOp().to_s
846+
r_sink_string << ", TimeStampsOfLastAppliedOp=" +
847+
(java.util.Date.new(r_load_sink.getTimestampsOfLastAppliedOp())).toString()
848+
end
838849

839-
r_sink_string << ' AgeOfLastAppliedOp=' +
840-
r_load_sink.getAgeOfLastAppliedOp.to_s
841-
r_sink_string << ', TimeStampsOfLastAppliedOp=' +
842-
java.util.Date.new(r_load_sink
843-
.getTimestampsOfLastAppliedOp).toString
844850
r_load_source_map = sl.getReplicationLoadSourceMap
845851
build_source_string(r_load_source_map, r_source_string)
846852
puts(format(' %<host>s:', host: server_name.getHostname))

0 commit comments

Comments
 (0)