Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-24824 Add more stats in PE for read replica #2205

Merged
merged 1 commit into from
Aug 7, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,22 @@ protected static class RunResult implements Comparable<RunResult> {
public RunResult(long duration, Histogram hist) {
this.duration = duration;
this.hist = hist;
numbOfReplyOverThreshold = 0;
numOfReplyFromReplica = 0;
}

public RunResult(long duration, long numbOfReplyOverThreshold, long numOfReplyFromReplica,
Histogram hist) {
this.duration = duration;
this.hist = hist;
this.numbOfReplyOverThreshold = numbOfReplyOverThreshold;
this.numOfReplyFromReplica = numOfReplyFromReplica;
}

public final long duration;
public final Histogram hist;
public final long numbOfReplyOverThreshold;
public final long numOfReplyFromReplica;

@Override
public String toString() {
Expand Down Expand Up @@ -492,6 +504,10 @@ public void setStatus(final String msg) throws IOException {
});
LOG.info("Finished " + Thread.currentThread().getName() + " in " + run.duration +
"ms over " + threadOpts.perClientRunRows + " rows");
if (opts.latencyThreshold > 0) {
LOG.info("Number of replies over latency threshold " + opts.latencyThreshold +
"(ms) is " + run.numbOfReplyOverThreshold);
}
return run;
}
});
Expand All @@ -512,10 +528,12 @@ public void setStatus(final String msg) throws IOException {
long total = 0;
float avgLatency = 0 ;
float avgTPS = 0;
long replicaWins = 0;
for (RunResult result : results) {
total += result.duration;
avgLatency += result.hist.getSnapshot().getMean();
avgTPS += opts.perClientRunRows * 1.0f / result.duration;
replicaWins += result.numOfReplyFromReplica;
}
avgTPS *= 1000; // ms to second
avgLatency = avgLatency / results.length;
Expand All @@ -525,12 +543,15 @@ public void setStatus(final String msg) throws IOException {
+ "\tAvg: " + (total / results.length) + "ms");
LOG.info("[ Avg latency (us)]\t" + Math.round(avgLatency));
LOG.info("[ Avg TPS/QPS]\t" + Math.round(avgTPS) + "\t row per second");
if (opts.replicas > 1) {
LOG.info("[results from replica regions] " + replicaWins);
}

for (int i = 0; i < opts.connCount; i++) {
cons[i].close();
asyncCons[i].close();
}


return results;
}

Expand Down Expand Up @@ -706,6 +727,7 @@ static class TestOptions {
int columns = 1;
int families = 1;
int caching = 30;
int latencyThreshold = 0; // in millsecond
boolean addColumns = true;
MemoryCompactionPolicy inMemoryCompaction =
MemoryCompactionPolicy.valueOf(
Expand Down Expand Up @@ -741,6 +763,7 @@ public TestOptions(TestOptions that) {
this.useTags = that.useTags;
this.noOfTags = that.noOfTags;
this.reportLatency = that.reportLatency;
this.latencyThreshold = that.latencyThreshold;
this.multiGet = that.multiGet;
this.multiPut = that.multiPut;
this.inMemoryCF = that.inMemoryCF;
Expand Down Expand Up @@ -1130,6 +1153,7 @@ private static long nextRandomSeed() {

private String testName;
private Histogram latencyHistogram;
private Histogram replicaLatencyHistogram;
private Histogram valueSizeHistogram;
private Histogram rpcCallsHistogram;
private Histogram remoteRpcCallsHistogram;
Expand All @@ -1138,6 +1162,8 @@ private static long nextRandomSeed() {
private Histogram bytesInResultsHistogram;
private Histogram bytesInRemoteResultsHistogram;
private RandomDistribution.Zipf zipf;
private long numOfReplyOverLatencyThreshold = 0;
private long numOfReplyFromReplica = 0;

/**
* Note that all subclasses of this class must provide a public constructor
Expand Down Expand Up @@ -1175,13 +1201,28 @@ int getValueLength(final Random r) {
}

void updateValueSize(final Result [] rs) throws IOException {
if (rs == null || !isRandomValueSize()) return;
for (Result r: rs) updateValueSize(r);
updateValueSize(rs, 0);
}

void updateValueSize(final Result [] rs, final long latency) throws IOException {
if (rs == null || (latency == 0)) return;
for (Result r: rs) updateValueSize(r, latency);
}

void updateValueSize(final Result r) throws IOException {
if (r == null || !isRandomValueSize()) return;
updateValueSize(r, 0);
}

void updateValueSize(final Result r, final long latency) throws IOException {
if (r == null || (latency == 0)) return;
int size = 0;
// update replicaHistogram
if (r.isStale()) {
replicaLatencyHistogram.update(latency / 1000);
numOfReplyFromReplica ++;
}
if (!isRandomValueSize()) return;

for (CellScanner scanner = r.cellScanner(); scanner.advance();) {
size += scanner.current().getValueLength();
}
Expand Down Expand Up @@ -1245,6 +1286,10 @@ public Histogram getLatencyHistogram() {
void testSetup() throws IOException {
// test metrics
latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
// If it is a replica test, set up histogram for replica.
if (opts.replicas > 1) {
replicaLatencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
}
valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
// scan metrics
rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
Expand All @@ -1268,6 +1313,10 @@ void testTakedown() throws IOException {
status.setStatus("Test : " + testName + ", Thread : " + Thread.currentThread().getName());
status.setStatus("Latency (us) : " + YammerHistogramUtils.getHistogramReport(
latencyHistogram));
if (opts.replicas > 1) {
status.setStatus("Latency (us) from Replica Regions: " +
YammerHistogramUtils.getHistogramReport(replicaLatencyHistogram));
}
status.setStatus("Num measures (latency) : " + latencyHistogram.getCount());
status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(latencyHistogram));
if (valueSizeHistogram.getCount() > 0) {
Expand Down Expand Up @@ -1349,15 +1398,19 @@ void testTimed() throws IOException, InterruptedException {
long startTime = System.nanoTime();
boolean requestSent = false;
try (TraceScope scope = TraceUtil.createTrace("test row");){
requestSent = testRow(i);
requestSent = testRow(i, startTime);
}
if ( (i - startRow) > opts.measureAfter) {
// If multiget or multiput is enabled, say set to 10, testRow() returns immediately
// first 9 times and sends the actual get request in the 10th iteration.
// We should only set latency when actual request is sent because otherwise
// it turns out to be 0.
if (requestSent) {
latencyHistogram.update((System.nanoTime() - startTime) / 1000);
long latency = (System.nanoTime() - startTime) / 1000;
latencyHistogram.update(latency);
if ((opts.latencyThreshold > 0) && (latency / 1000 >= opts.latencyThreshold)) {
numOfReplyOverLatencyThreshold ++;
}
}
if (status != null && i > 0 && (i % getReportingPeriod()) == 0) {
status.setStatus(generateStatus(startRow, i, lastRow));
Expand Down Expand Up @@ -1389,7 +1442,7 @@ public String getShortValueSizeReport() {
* False if not, multiGet and multiPut e.g., the rows are sent
* to server only if enough gets/puts are gathered.
*/
abstract boolean testRow(final int i) throws IOException, InterruptedException;
abstract boolean testRow(final int i, final long startTime) throws IOException, InterruptedException;
}

static abstract class Test extends TestBase {
Expand Down Expand Up @@ -1460,7 +1513,7 @@ static class AsyncRandomReadTest extends AsyncTableTest {
}

@Override
boolean testRow(final int i) throws IOException, InterruptedException {
boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
if (opts.randomSleep > 0) {
Thread.sleep(rd.nextInt(opts.randomSleep));
}
Expand Down Expand Up @@ -1569,7 +1622,7 @@ void testTakedown() throws IOException {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
if (this.testScanner == null) {
Scan scan =
new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
Expand Down Expand Up @@ -1603,7 +1656,7 @@ static class AsyncSequentialReadTest extends AsyncTableTest {
}

@Override
boolean testRow(final int i) throws IOException, InterruptedException {
boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
Get get = new Get(format(i));
for (int family = 0; family < opts.families; family++) {
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
Expand Down Expand Up @@ -1645,7 +1698,7 @@ protected byte[] generateRow(final int i) {

@Override
@SuppressWarnings("ReturnValueIgnored")
boolean testRow(final int i) throws IOException, InterruptedException {
boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
byte[] row = generateRow(i);
Put put = new Put(row);
for (int family = 0; family < opts.families; family++) {
Expand Down Expand Up @@ -1720,7 +1773,7 @@ static class RandomSeekScanTest extends TableTest {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
.setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
.setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
Expand Down Expand Up @@ -1768,7 +1821,7 @@ static abstract class RandomScanWithRangeTest extends TableTest {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
Pair<byte[], byte[]> startAndStopRow = getStartAndStopRow();
Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
.withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
Expand Down Expand Up @@ -1871,6 +1924,7 @@ static class RandomReadTest extends TableTest {
private final Consistency consistency;
private ArrayList<Get> gets;
private Random rd = new Random();
private long numOfReplyFromReplica = 0;

RandomReadTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
Expand All @@ -1882,7 +1936,7 @@ static class RandomReadTest extends TableTest {
}

@Override
boolean testRow(final int i) throws IOException, InterruptedException {
boolean testRow(final int i, final long startTime) throws IOException, InterruptedException {
if (opts.randomSleep > 0) {
Thread.sleep(rd.nextInt(opts.randomSleep));
}
Expand All @@ -1907,13 +1961,24 @@ boolean testRow(final int i) throws IOException, InterruptedException {
this.gets.add(get);
if (this.gets.size() == opts.multiGet) {
Result [] rs = this.table.get(this.gets);
updateValueSize(rs);
if (opts.replicas > 1) {
long latency = System.nanoTime() - startTime;
updateValueSize(rs, latency);
} else {
updateValueSize(rs);
}
this.gets.clear();
} else {
return false;
}
} else {
updateValueSize(this.table.get(get));
if (opts.replicas > 1) {
Result r = this.table.get(get);
long latency = System.nanoTime() - startTime;
updateValueSize(r, latency);
} else {
updateValueSize(this.table.get(get));
}
}
return true;
}
Expand Down Expand Up @@ -1964,7 +2029,7 @@ void testTakedown() throws IOException {


@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
if (this.testScanner == null) {
Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
.setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
Expand Down Expand Up @@ -2027,7 +2092,7 @@ static class IncrementTest extends CASTableTest {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
Increment increment = new Increment(format(i));
// unlike checkAndXXX tests, which make most sense to do on a single value,
// if multiple families are specified for an increment test we assume it is
Expand All @@ -2047,7 +2112,7 @@ static class AppendTest extends CASTableTest {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
byte [] bytes = format(i);
Append append = new Append(bytes);
// unlike checkAndXXX tests, which make most sense to do on a single value,
Expand All @@ -2068,7 +2133,7 @@ static class CheckAndMutateTest extends CASTableTest {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
final byte [] bytes = format(i);
// checkAndXXX tests operate on only a single value
// Put a known value so when we go to check it, it is there.
Expand All @@ -2089,7 +2154,7 @@ static class CheckAndPutTest extends CASTableTest {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
final byte [] bytes = format(i);
// checkAndXXX tests operate on only a single value
// Put a known value so when we go to check it, it is there.
Expand All @@ -2108,7 +2173,7 @@ static class CheckAndDeleteTest extends CASTableTest {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
final byte [] bytes = format(i);
// checkAndXXX tests operate on only a single value
// Put a known value so when we go to check it, it is there.
Expand All @@ -2129,7 +2194,7 @@ static class SequentialReadTest extends TableTest {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
Get get = new Get(format(i));
for (int family = 0; family < opts.families; family++) {
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
Expand Down Expand Up @@ -2167,7 +2232,7 @@ protected byte[] generateRow(final int i) {
}

@Override
boolean testRow(final int i) throws IOException {
boolean testRow(final int i, final long startTime) throws IOException {
byte[] row = generateRow(i);
Put put = new Put(row);
for (int family = 0; family < opts.families; family++) {
Expand Down Expand Up @@ -2224,7 +2289,7 @@ static class FilteredScanTest extends TableTest {
}

@Override
boolean testRow(int i) throws IOException {
boolean testRow(int i, final long startTime) throws IOException {
byte[] value = generateData(this.rand, getValueLength(this.rand));
Scan scan = constructScan(value);
ResultScanner scanner = null;
Expand Down Expand Up @@ -2368,7 +2433,8 @@ static RunResult runOneClient(final Class<? extends TestBase> cmd, Configuration
" (" + calculateMbps((int)(opts.perClientRunRows * opts.sampleRate), totalElapsedTime,
getAverageValueLength(opts), opts.families, opts.columns) + ")");

return new RunResult(totalElapsedTime, t.getLatencyHistogram());
return new RunResult(totalElapsedTime, t.numOfReplyOverLatencyThreshold,
t.numOfReplyFromReplica, t.getLatencyHistogram());
}

private static int getAverageValueLength(final TestOptions opts) {
Expand Down Expand Up @@ -2434,6 +2500,8 @@ protected static void printUsage(final String shortName, final String message) {
System.err.println(" traceRate Enable HTrace spans. Initiate tracing every N rows. " +
"Default: 0");
System.err.println(" latency Set to report operation latencies. Default: False");
System.err.println(" latencyThreshold Set to report number of operations with latency " +
"over lantencyThreshold, unit in millisecond, default 0");
System.err.println(" measureAfter Start to measure the latency once 'measureAfter'" +
" rows have been treated. Default: 0");
System.err.println(" valueSize Pass value size to use: Default: "
Expand Down Expand Up @@ -2631,6 +2699,12 @@ static TestOptions parseOpts(Queue<String> args) {
continue;
}

final String latencyThreshold = "--latencyThreshold=";
if (cmd.startsWith(latencyThreshold)) {
opts.latencyThreshold = Integer.parseInt(cmd.substring(latencyThreshold.length()));
continue;
}

final String latency = "--latency";
if (cmd.startsWith(latency)) {
opts.reportLatency = true;
Expand Down