Skip to content

Commit

Permalink
HBASE-24824 Add more stats in PE for read replica
Browse files Browse the repository at this point in the history
  • Loading branch information
Huaxiang Sun committed Aug 7, 2020
1 parent c39cad2 commit 5872668
Showing 1 changed file with 100 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,22 @@ protected static class RunResult implements Comparable<RunResult> {
public RunResult(long duration, Histogram hist) {
this.duration = duration;
this.hist = hist;
numbOfReplyOverThreshold = 0;
numOfReplyFromReplica = 0;
}

public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica,
Histogram hist) {
this.duration = duration;
this.hist = hist;
this.numbOfReplyOverThreshold = numbOfReplyOverThreshold;
this.numOfReplyFromReplica = numOfReplyFromReplica;
}

public final long duration;
public final Histogram hist;
public final long numbOfReplyOverThreshold;
public final long numOfReplyFromReplica;

@Override
public String toString() {
Expand Down Expand Up @@ -492,6 +504,10 @@ public void setStatus(final String msg) throws IOException {
});
LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
"ms over " + threadOpts.perClientRunRows + " rows");
if (opts.latencyThreshold > 0) {
LOG.info("Number of replies over latency threshold " + opts.latencyThreshold +
"(ms) is " + run.numbOfReplyOverThreshold);
}
return run;
}
});
Expand All @@ -512,10 +528,12 @@ public void setStatus(final String msg) throws IOException {
long total = 0;
float avgLatency = 0 ;
float avgTPS = 0;
long replicaWins = 0;
for (RunResult result : results) {
total += result.duration;
avgLatency += result.hist.getSnapshot().getMean();
avgTPS += opts.perClientRunRows * 1.0f / result.duration;
replicaWins += result.numOfReplyFromReplica;
}
avgTPS *= 1000; // ms to second
avgLatency = avgLatency / results.length;
Expand All @@ -525,12 +543,15 @@ public void setStatus(final String msg) throws IOException {
+ "\tAvg: " + (total / results.length) + "ms");
LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
if (opts.replicas > 1) {
LOG.info("[results from replica regions] " + replicaWins);
}

for (int i = 0; i < opts.connCount; i++) {
cons[i].close();
asyncCons[i].close();
}


return results;
}

Expand Down Expand Up @@ -706,6 +727,7 @@ static class TestOptions {
int columns = 1;
int families = 1;
int caching = 30;
int latencyThreshold = 0; // in millsecond
boolean addColumns = true;
MemoryCompactionPolicy inMemoryCompaction =
MemoryCompactionPolicy.valueOf(
Expand Down Expand Up @@ -741,6 +763,7 @@ public TestOptions(TestOptions that) {
this.useTags = that.useTags;
this.noOfTags = that.noOfTags;
this.reportLatency = that.reportLatency;
this.latencyThreshold = that.latencyThreshold;
this.multiGet = that.multiGet;
this.multiPut = that.multiPut;
this.inMemoryCF = that.inMemoryCF;
Expand Down Expand Up @@ -1130,6 +1153,7 @@ private static long nextRandomSeed() {

private String testName;
private Histogram latencyHistogram;
private Histogram replicaLatencyHistogram;
private Histogram valueSizeHistogram;
private Histogram rpcCallsHistogram;
private Histogram remoteRpcCallsHistogram;
Expand All @@ -1138,6 +1162,8 @@ private static long nextRandomSeed() {
private Histogram bytesInResultsHistogram;
private Histogram bytesInRemoteResultsHistogram;
private RandomDistribution.Zipf zipf;
private long numOfReplyOverLatencyThreshold = 0;
private long numOfReplyFromReplica = 0;

/**
* Note that all subclasses of this class must provide a public constructor
Expand Down Expand Up @@ -1175,13 +1201,28 @@ int getValueLength(final Random r) {
}

void updateValueSize(final Result [] rs) throws IOException {
if (rs == null || !isRandomValueSize()) return;
for (Result r: rs) updateValueSize(r);
updateValueSize(rs, 0);
}

void updateValueSize(final Result [] rs, final long latency) throws IOException {
if (rs == null || (latency == 0)) return;
for (Result r: rs) updateValueSize(r, latency);
}

void updateValueSize(final Result r) throws IOException {
if (r == null || !isRandomValueSize()) return;
updateValueSize(r, 0);
}

void updateValueSize(final Result r, final long latency) throws IOException {
if (r == null || (latency == 0)) return;
int size = 0;
// update replicaHistogram
if (r.isStale()) {
replicaLatencyHistogram.update(latency / 1000);
numOfReplyFromReplica ++;
}
if (!isRandomValueSize()) return;

for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
size += scanner.current().getValueLength();
}
Expand Down Expand Up @@ -1245,6 +1286,10 @@ public Histogram getLatencyHistogram() {
void testSetup() throws IOException {
// test metrics
latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
// If it is a replica test, set up histogram for replica.
if (opts.replicas > 1) {
replicaLatencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
}
valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
// scan metrics
rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
Expand All @@ -1268,6 +1313,10 @@ void testTakedown() throws IOException {
status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(
latencyHistogram));
if (opts.replicas > 1) {
status.setStatus("Latency (us) from Replica Regions: " +
YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram));
}
status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
if (valueSizeHistogram.getCount() > 0) {
Expand Down Expand Up @@ -1349,15 +1398,19 @@ void testTimed() throws IOException, InterruptedException {
long startTime = System.nanoTime();
boolean requestSent = false;
try (TraceScope scope = TraceUtil.createTrace("test row");){
requestSent = testRow(i);
requestSent = testRow(i, startTime);
}
if ( (i - startRow) > opts.measureAfter) {
// If multiget or multiput is enabled, say set to 10, testRow() returns immediately
// first 9 times and sends the actual get request in the 10th iteration.
// We should only set latency when actual request is sent because otherwise
// it turns out to be 0.
if (requestSent) {
latencyHistogram.update((System.nanoTime() - startTime) / 1000);
long latency = (System.nanoTime() - startTime) / 1000;
latencyHistogram.update(latency);
if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) {
numOfReplyOverLatencyThreshold ++;
}
}
if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
status.setStatus(generateStatus(startRow, i, lastRow));
Expand Down Expand Up @@ -1389,7 +1442,7 @@ public String getShortValueSizeReport() {
* False if not, multiGet and multiPut e.g., the rows are sent
* to server only if enough gets/puts are gathered.
*/
abstract boolean testRow(final int i) throws IOException, InterruptedException;
abstract boolean testRow(final int i, final long startTime) throws IOException, InterruptedException;
}

static abstract class Test extends TestBase {
Expand Down Expand Up @@ -1460,7 +1513,7 @@ static class AsyncRandomReadTest extends AsyncTableTest {
}

@Override
boolean testRow(final int i) throws IOException, InterruptedException {
boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
if (opts.randomSleep > 0) {
Thread.sleep(rd.nextInt(opts.randomSleep));
}
Expand Down Expand Up @@ -1569,7 +1622,7 @@ void testTakedown() throws IOException {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
if (this.testScanner == null) {
Scan scan =
new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
Expand Down Expand Up @@ -1603,7 +1656,7 @@ static class AsyncSequentialReadTest extends AsyncTableTest {
}

@Override
boolean testRow(final int i) throws IOException, InterruptedException {
boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
Get get = new Get(format(i));
for (int family = 0; family < opts.families; family++) {
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
Expand Down Expand Up @@ -1645,7 +1698,7 @@ protected byte[] generateRow(final int i) {

@Override
@SuppressWarnings("ReturnValueIgnored")
boolean testRow(final int i) throws IOException, InterruptedException {
boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
byte[] row = generateRow(i);
Put put = new Put(row);
for (int family = 0; family < opts.families; family++) {
Expand Down Expand Up @@ -1720,7 +1773,7 @@ static class RandomSeekScanTest extends TableTest {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
.setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
.setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
Expand Down Expand Up @@ -1768,7 +1821,7 @@ static abstract class RandomScanWithRangeTest extends TableTest {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
.withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
Expand Down Expand Up @@ -1871,6 +1924,7 @@ static class RandomReadTest extends TableTest {
private final Consistency consistency;
private ArrayList<Get> gets;
private Random rd = new Random();
private long numOfReplyFromReplica = 0;

RandomReadTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
Expand All @@ -1882,7 +1936,7 @@ static class RandomReadTest extends TableTest {
}

@Override
boolean testRow(final int i) throws IOException, InterruptedException {
boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
if (opts.randomSleep > 0) {
Thread.sleep(rd.nextInt(opts.randomSleep));
}
Expand All @@ -1907,13 +1961,24 @@ boolean testRow(final int i) throws IOException, InterruptedException {
this.gets.add(get);
if (this.gets.size() == opts.multiGet) {
Result [] rs = this.table.get(this.gets);
updateValueSize(rs);
if (opts.replicas > 1) {
long latency = System.nanoTime() - startTime;
updateValueSize(rs, latency);
} else {
updateValueSize(rs);
}
this.gets.clear();
} else {
return false;
}
} else {
updateValueSize(this.table.get(get));
if (opts.replicas > 1) {
Result r = this.table.get(get);
long latency = System.nanoTime() - startTime;
updateValueSize(r, latency);
} else {
updateValueSize(this.table.get(get));
}
}
return true;
}
Expand Down Expand Up @@ -1964,7 +2029,7 @@ void testTakedown() throws IOException {


@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
if (this.testScanner == null) {
Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
.setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
Expand Down Expand Up @@ -2027,7 +2092,7 @@ static class IncrementTest extends CASTableTest {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
Increment increment = new Increment(format(i));
// unlike checkAndXXX tests, which make most sense to do on a single value,
// if multiple families are specified for an increment test we assume it is
Expand All @@ -2047,7 +2112,7 @@ static class AppendTest extends CASTableTest {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
byte [] bytes = format(i);
Append append = new Append(bytes);
// unlike checkAndXXX tests, which make most sense to do on a single value,
Expand All @@ -2068,7 +2133,7 @@ static class CheckAndMutateTest extends CASTableTest {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
final byte [] bytes = format(i);
// checkAndXXX tests operate on only a single value
// Put a known value so when we go to check it, it is there.
Expand All @@ -2089,7 +2154,7 @@ static class CheckAndPutTest extends CASTableTest {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
final byte [] bytes = format(i);
// checkAndXXX tests operate on only a single value
// Put a known value so when we go to check it, it is there.
Expand All @@ -2108,7 +2173,7 @@ static class CheckAndDeleteTest extends CASTableTest {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
final byte [] bytes = format(i);
// checkAndXXX tests operate on only a single value
// Put a known value so when we go to check it, it is there.
Expand All @@ -2129,7 +2194,7 @@ static class SequentialReadTest extends TableTest {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
Get get = new Get(format(i));
for (int family = 0; family < opts.families; family++) {
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
Expand Down Expand Up @@ -2167,7 +2232,7 @@ protected byte[] generateRow(final int i) {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
byte[] row = generateRow(i);
Put put = new Put(row);
for (int family = 0; family < opts.families; family++) {
Expand Down Expand Up @@ -2224,7 +2289,7 @@ static class FilteredScanTest extends TableTest {
}

@Override
boolean testRow(int i) throws IOException {
boolean testRow(int i, final long startTime) throws IOException {
byte[] value = generateData(this.rand, getValueLength(this.rand));
Scan scan = constructScan(value);
ResultScanner scanner = null;
Expand Down Expand Up @@ -2368,7 +2433,8 @@ static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration
" (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
getAverageValueLength(opts), opts.families, opts.columns) + ")");

return new RunResult(totalElapsedTime, t.getLatencyHistogram());
return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold,
t.numOfReplyFromReplica, t.getLatencyHistogram());
}

private static int getAverageValueLength(final TestOptions opts) {
Expand Down Expand Up @@ -2434,6 +2500,8 @@ protected static void printUsage(final String shortName, final String message) {
System.err.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " +
"Default: 0");
System.err.println(" latency Set to report operation latencies. Default: False");
System.err.println(" latencyThreshold Set to report number of operations with latency " +
"over lantencyThreshold, unit in millisecond, default 0");
System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" +
" rows have been treated. Default: 0");
System.err.println(" valueSize Pass value size to use: Default: "
Expand Down Expand Up @@ -2631,6 +2699,12 @@ static TestOptions parseOpts(Queue<String> args) {
continue;
}

final String latencyThreshold = "--latencyThreshold=";
if (cmd.startsWith(latencyThreshold)) {
opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length()));
continue;
}

final String latency = "--latency";
if (cmd.startsWith(latency)) {
opts.reportLatency = true;
Expand Down

0 comments on commit 5872668

Please sign in to comment.