Skip to content

Commit 11d093b

Browse files
committed
HBASE-21406 "status 'replication'" should not show SINK if the cluste… (#1761)
Signed-off-by: Jan Hentschel <jan.hentschel@ultratendency.com> Signed-off by: Viraj Jasani <vjasani@apache.org> Signed-off-by: Josh Elser <elserj@apache.org> (Cherry picked from commit e5345b3)
1 parent b3c6af9 commit 11d093b

File tree

9 files changed

+99
-15
lines changed

9 files changed

+99
-15
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationLoadSink.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,17 @@
1919
public class ReplicationLoadSink {
2020
private final long ageOfLastAppliedOp;
2121
private final long timestampsOfLastAppliedOp;
22+
private final long timestampStarted;
23+
private final long totalOpsProcessed;
2224

2325
// TODO: add the builder for this class
2426
@InterfaceAudience.Private
25-
public ReplicationLoadSink(long age, long timestamp) {
27+
public ReplicationLoadSink(long age, long timestamp, long timestampStarted,
28+
long totalOpsProcessed) {
2629
this.ageOfLastAppliedOp = age;
2730
this.timestampsOfLastAppliedOp = timestamp;
31+
this.timestampStarted = timestampStarted;
32+
this.totalOpsProcessed = totalOpsProcessed;
2833
}
2934

3035
public long getAgeOfLastAppliedOp() {
@@ -43,4 +48,12 @@ public long getTimeStampsOfLastAppliedOp() {
4348
public long getTimestampsOfLastAppliedOp() {
4449
return this.timestampsOfLastAppliedOp;
4550
}
51+
52+
public long getTimestampStarted() {
53+
return timestampStarted;
54+
}
55+
56+
public long getTotalOpsProcessed() {
57+
return totalOpsProcessed;
58+
}
4659
}

hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2801,7 +2801,10 @@ public static void mergeFrom(Message.Builder builder, CodedInputStream codedInpu
28012801

28022802
public static ReplicationLoadSink toReplicationLoadSink(
28032803
ClusterStatusProtos.ReplicationLoadSink rls) {
2804-
return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(), rls.getTimeStampsOfLastAppliedOp());
2804+
return new ReplicationLoadSink(rls.getAgeOfLastAppliedOp(),
2805+
rls.getTimeStampsOfLastAppliedOp(),
2806+
rls.getTimestampStarted(),
2807+
rls.getTotalOpsProcessed());
28052808
}
28062809

28072810
public static ReplicationLoadSource toReplicationLoadSource(
@@ -3394,6 +3397,8 @@ public static ClusterStatusProtos.ReplicationLoadSink toReplicationLoadSink(
33943397
return ClusterStatusProtos.ReplicationLoadSink.newBuilder()
33953398
.setAgeOfLastAppliedOp(rls.getAgeOfLastAppliedOp())
33963399
.setTimeStampsOfLastAppliedOp(rls.getTimestampsOfLastAppliedOp())
3400+
.setTimestampStarted(rls.getTimestampStarted())
3401+
.setTotalOpsProcessed(rls.getTotalOpsProcessed())
33973402
.build();
33983403
}
33993404

hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,5 @@ public interface MetricsReplicationSinkSource {
3232
void incrAppliedOps(long batchsize);
3333
long getLastAppliedOpAge();
3434
void incrAppliedHFiles(long hfileSize);
35+
long getSinkAppliedOps();
3536
}

hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,8 @@ public long getLastAppliedOpAge() {
5858
public void incrAppliedHFiles(long hfiles) {
5959
hfilesCounter.incr(hfiles);
6060
}
61+
62+
@Override public long getSinkAppliedOps() {
63+
return opsCounter.value();
64+
}
6165
}

hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ message ClientMetrics {
185185
message ReplicationLoadSink {
186186
required uint64 ageOfLastAppliedOp = 1;
187187
required uint64 timeStampsOfLastAppliedOp = 2;
188+
required uint64 timestampStarted = 3;
189+
required uint64 totalOpsProcessed = 4;
188190
}
189191

190192
message ReplicationLoadSource {

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
public class MetricsSink {
3030

3131
private long lastTimestampForAge = System.currentTimeMillis();
32+
private long startTimestamp = System.currentTimeMillis();
3233
private final MetricsReplicationSinkSource mss;
3334

3435
public MetricsSink() {
@@ -110,4 +111,21 @@ public long getTimeStampOfLastAppliedOp() {
110111
public long getTimestampOfLastAppliedOp() {
111112
return this.lastTimestampForAge;
112113
}
114+
115+
/**
116+
* Gets the time stamp from when the Sink was initialized.
117+
* @return startTimestamp
118+
*/
119+
public long getStartTimestamp() {
120+
return this.startTimestamp;
121+
}
122+
123+
/**
124+
* Gets the total number of OPs delivered to this sink.
125+
* @return totalAplliedOps
126+
*/
127+
public long getAppliedOps() {
128+
return this.mss.getSinkAppliedOps();
129+
}
130+
113131
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public void buildReplicationLoad(final List<ReplicationSourceInterface> sources,
6161
ClusterStatusProtos.ReplicationLoadSink.newBuilder();
6262
rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
6363
rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
64+
rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp());
65+
rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps());
6466
this.replicationLoadSink = rLoadSinkBuild.build();
6567

6668
this.replicationLoadSourceEntries = new ArrayList<>();

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,15 @@ public class TestReplicationStatus extends TestReplicationBase {
5050
public static final HBaseClassTestRule CLASS_RULE =
5151
HBaseClassTestRule.forClass(TestReplicationStatus.class);
5252

53+
private void insertRowsOnSource() throws IOException {
54+
final byte[] qualName = Bytes.toBytes("q");
55+
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
56+
Put p = new Put(Bytes.toBytes("row" + i));
57+
p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
58+
htable1.put(p);
59+
}
60+
}
61+
5362
/**
5463
* Test for HBASE-9531.
5564
* <p/>
@@ -70,12 +79,7 @@ public void testReplicationStatus() throws Exception {
7079
Admin hbaseAdmin = UTIL1.getAdmin();
7180
// disable peer <= WHY? I DON'T GET THIS DISABLE BUT TEST FAILS W/O IT.
7281
hbaseAdmin.disableReplicationPeer(PEER_ID2);
73-
final byte[] qualName = Bytes.toBytes("q");
74-
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
75-
Put p = new Put(Bytes.toBytes("row" + i));
76-
p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
77-
htable1.put(p);
78-
}
82+
insertRowsOnSource();
7983
LOG.info("AFTER PUTS");
8084
// TODO: Change this wait to a barrier. I tried waiting on replication stats to
8185
// change but sleeping in main thread seems to mess up background replication.
@@ -120,6 +124,35 @@ public void testReplicationStatus() throws Exception {
120124
assertEquals(PEER_ID2, rLoadSourceList.get(0).getPeerID());
121125
}
122126

127+
@Test
128+
public void testReplicationStatusSink() throws Exception {
129+
try (Admin hbaseAdmin = UTIL2.getConnection().getAdmin()) {
130+
ServerName server = UTIL2.getHBaseCluster().getRegionServer(0).getServerName();
131+
ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server);
132+
//First checks if status of timestamp of last applied op is same as RS start, since no edits
133+
//were replicated yet
134+
assertEquals(loadSink.getTimestampStarted(), loadSink.getTimestampsOfLastAppliedOp());
135+
//now insert some rows on source, so that it gets delivered to target
136+
insertRowsOnSource();
137+
long wait = Waiter.waitFor(UTIL2.getConfiguration(),
138+
10000, new Waiter.Predicate<Exception>() {
139+
@Override
140+
public boolean evaluate() throws Exception {
141+
ReplicationLoadSink loadSink = getLatestSinkMetric(hbaseAdmin, server);
142+
return loadSink.getTimestampsOfLastAppliedOp()>loadSink.getTimestampStarted();
143+
}
144+
});
145+
//If wait is -1, we know predicate condition was never true
146+
assertTrue(wait>=0);
147+
}
148+
}
149+
150+
private ReplicationLoadSink getLatestSinkMetric(Admin admin, ServerName server)
151+
throws IOException {
152+
ClusterMetrics metrics = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
153+
ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
154+
return sm.getReplicationLoadSink();
155+
}
123156
/**
124157
* Wait until Master shows metrics counts for ReplicationLoadSourceList that are
125158
* greater than <code>greaterThan</code> for <code>serverName</code> before

hbase-shell/src/main/ruby/hbase/admin.rb

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -812,12 +812,18 @@ def status(format, type)
812812
r_source_string = ' SOURCE:'
813813
r_load_sink = sl.getReplicationLoadSink
814814
next if r_load_sink.nil?
815+
if r_load_sink.getTimestampsOfLastAppliedOp() == r_load_sink.getTimestampStarted()
816+
# If we have applied no operations since we've started replication,
817+
# assume that we're not acting as a sink and don't print the normal information
818+
r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
819+
r_sink_string << ", Waiting for OPs... "
820+
else
821+
r_sink_string << " TimeStampStarted=" + r_load_sink.getTimestampStarted().to_s
822+
r_sink_string << ", AgeOfLastAppliedOp=" + r_load_sink.getAgeOfLastAppliedOp().to_s
823+
r_sink_string << ", TimeStampsOfLastAppliedOp=" +
824+
(java.util.Date.new(r_load_sink.getTimestampsOfLastAppliedOp())).toString()
825+
end
815826

816-
r_sink_string << ' AgeOfLastAppliedOp=' +
817-
r_load_sink.getAgeOfLastAppliedOp.to_s
818-
r_sink_string << ', TimeStampsOfLastAppliedOp=' +
819-
java.util.Date.new(r_load_sink
820-
.getTimeStampsOfLastAppliedOp).toString
821827
r_load_source_map = sl.getReplicationLoadSourceMap
822828
build_source_string(r_load_source_map, r_source_string)
823829
puts(format(' %<host>s:', host: server_status.getHostname))
@@ -888,15 +894,15 @@ def build_running_source_stats(source_load, r_source_string)
888894
end
889895

890896
def build_shipped_stats(source_load, r_source_string)
891-
r_source_string << if source_load.getTimeStampOfLastShippedOp.zero?
897+
r_source_string << if source_load.getTimestampOfLastShippedOp.zero?
892898
"\n " \
893899
'No Ops shipped since last restart'
894900
else
895901
"\n AgeOfLastShippedOp=" +
896902
source_load.getAgeOfLastShippedOp.to_s +
897903
', TimeStampOfLastShippedOp=' +
898904
java.util.Date.new(source_load
899-
.getTimeStampOfLastShippedOp).toString
905+
.getTimestampOfLastShippedOp).toString
900906
end
901907
end
902908

0 commit comments

Comments
 (0)