Skip to content

HBASE-22346 scanner priorities/deadline units are invalid for non-huge scanners #210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2862,4 +2862,9 @@ private boolean shouldSubmitSCP(ServerName serverName) {
}
return true;
}

@Override
public boolean isMaster() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
*/
package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,6 +43,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.security.User;
Expand All @@ -65,6 +69,7 @@
//All the argument classes declare a 'getRegion' method that returns a
//RegionSpecifier object. Methods can be invoked on the returned object
//to figure out whether it is a meta region or not.
// TODO: split the priority and deadline parts; they are currently completely unrelated
@InterfaceAudience.Private
public class AnnotationReadingPriorityFunction implements PriorityFunction {
private static final Logger LOG =
Expand All @@ -73,6 +78,18 @@ public class AnnotationReadingPriorityFunction implements PriorityFunction {
/** Used to control the scan delay, currently sqrt(numNextCall * weight) */
public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight";

/** When to use the actual time-based deadline for scanners */
public static final String SCAN_DEADLINE_PRIORITY = "hbase.ipc.server.scan.deadline.only";
private static final ScanDeadlineOnly SCAN_DEADLINE_PRIORITY_DEFAULT = ScanDeadlineOnly.NONE;
enum ScanDeadlineOnly {
ALL,
META_ONLY,
NONE
}

private static final ByteString META_PREFIX = ByteString.copyFrom(
TableName.META_TABLE_NAME.toBytes());

protected final Map<String, Integer> annotatedQos;
//We need to mock the regionserver instance for some unit tests (set via
//setRegionServer method.
Expand All @@ -94,6 +111,7 @@ public class AnnotationReadingPriorityFunction implements PriorityFunction {
private final Map<String, Map<Class<? extends Message>, Method>> methodMap = new HashMap<>();

private final float scanVirtualTimeWeight;
private final ScanDeadlineOnly scanDeadlineOnly;

/**
* Calls {@link #AnnotationReadingPriorityFunction(RSRpcServices, Class)} using the result of
Expand Down Expand Up @@ -148,6 +166,26 @@ public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices,

Configuration conf = rpcServices.getConfiguration();
scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f);
scanDeadlineOnly = getScanDeadlineOnlyConf(rpcServices, conf);
}

private static ScanDeadlineOnly getScanDeadlineOnlyConf(
RSRpcServices rpcServices, Configuration conf) {
ScanDeadlineOnly result = SCAN_DEADLINE_PRIORITY_DEFAULT;
String val = conf.get(SCAN_DEADLINE_PRIORITY, SCAN_DEADLINE_PRIORITY_DEFAULT.name());
try {
result = ScanDeadlineOnly.valueOf(val.toUpperCase());
} catch (IllegalArgumentException ex) {
LOG.warn("Invalid value for {} ({}); using the default", SCAN_DEADLINE_PRIORITY, val);
}
if (result == ScanDeadlineOnly.META_ONLY && LoadBalancer.isTablesOnMaster(conf)
&& LoadBalancer.isSystemTablesOnlyOnMaster(conf) && rpcServices.isMaster()) {
result = ScanDeadlineOnly.ALL;
}
if (result != ScanDeadlineOnly.NONE) {
LOG.info("Using deadline-based scanner priority {}", result);
}
return result;
}

private String capitalize(final String s) {
Expand Down Expand Up @@ -262,21 +300,65 @@ protected int getBasePriority(RequestHeader header, Message param) {
public long getDeadline(RequestHeader header, Message param) {
if (param instanceof ScanRequest) {
ScanRequest request = (ScanRequest)param;
if (!request.hasScannerId()) {
return 0;
boolean useDeadline;
long baseTime = 0;
switch (scanDeadlineOnly) {
case ALL: useDeadline = true; break;
case NONE: useDeadline = false; break;
case META_ONLY: {
useDeadline = isMetaScan(request);
// If meta regions use real time and non-meta use vtime, make sure they are comparable
// and that non-meta scans are generally after meta scans; add the default scanner
// delay to map the former to real time.
if (!useDeadline) {
baseTime = rpcServices.getScannerExpirationDelayMs(null);
}
break;
}
default: throw new AssertionError(scanDeadlineOnly);
}
if (useDeadline) {
return rpcServices.getScannerExpirationDelayMs(
request.hasScannerId() ? request.getScannerId() : null);
} else if (!request.hasScannerId()) {
return baseTime;
} else {
// get the 'virtual time' of the scanner, and applies sqrt() to get a
// nice curve for the delay. The more a scanner is used the less priority it gets.
// The weight is used to have more control on the delay.
long vtime = rpcServices.getScannerVirtualTime(request.getScannerId());
return baseTime + Math.round(Math.sqrt(vtime * scanVirtualTimeWeight));
}

// get the 'virtual time' of the scanner, and applies sqrt() to get a
// nice curve for the delay. More a scanner is used the less priority it gets.
// The weight is used to have more control on the delay.
long vtime = rpcServices.getScannerVirtualTime(request.getScannerId());
return Math.round(Math.sqrt(vtime * scanVirtualTimeWeight));
}
return 0;
}

private boolean isMetaScan(ScanRequest request) {
if (request.hasScannerId()) {
RegionScanner rs = rpcServices.getScanner(request.getScannerId());
return (rs != null) && rs.getRegionInfo().isMetaRegion();
} else if (request.hasRegion()) {
RegionSpecifier r = request.getRegion();
if (r.hasType() && r.getType() == RegionSpecifier.RegionSpecifierType.REGION_NAME) {
return r.getValue().startsWith(META_PREFIX); // Common case shortcut.
} else {
try {
return rpcServices.getRegion(r).getTableDescriptor().isMetaRegion();
} catch (IOException e) {
// Ignore NotServing; let's allow the scan to fail later, for consistency.
}
}
}
return false;
}

@VisibleForTesting
void setRegionServer(final HRegionServer hrs) {
this.rpcServices = hrs.getRSRpcServices();
}

@VisibleForTesting
ScanDeadlineOnly getScanDeadlineOnly() {
return scanDeadlineOnly;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -437,17 +437,23 @@ private static final class RegionScannerHolder {
private final HRegion r;
private final RpcCallback closeCallBack;
private final RpcCallback shippedCallback;
private final Lease lease;
private byte[] rowOfLastPartialResult;
private boolean needCursor;

public RegionScannerHolder(String scannerName, RegionScanner s, HRegion r,
RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor) {
RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor, Lease lease) {
this.scannerName = scannerName;
this.s = s;
this.r = r;
this.closeCallBack = closeCallBack;
this.shippedCallback = shippedCallback;
this.needCursor = needCursor;
this.lease = lease;
}

public long getExpirationDelayMs() {
return lease == null ? 0 : lease.getDelay(TimeUnit.MILLISECONDS);
}

public long getNextCallSeq() {
Expand Down Expand Up @@ -1332,8 +1338,7 @@ public int getScannersCount() {
return scanners.size();
}

public
RegionScanner getScanner(long scannerId) {
public RegionScanner getScanner(long scannerId) {
String scannerIdString = Long.toString(scannerId);
RegionScannerHolder scannerHolder = scanners.get(scannerIdString);
if (scannerHolder != null) {
Expand All @@ -1342,6 +1347,20 @@ RegionScanner getScanner(long scannerId) {
return null;
}

public long getScannerExpirationDelayMs(Long scannerId) {
if (scannerId == null) {
return this.scannerLeaseTimeoutPeriod; // This is a new scanner.
}
RegionScannerHolder scannerHolder = scanners.get(Long.toString(scannerId));
if (scannerHolder != null) {
return scannerHolder.getExpirationDelayMs();
}
// This is a missing/expired scanner.
// Return a large value; ideally it should be Long.MAX_VALUE but we don't want to rely on
// the correct overflow handling by the caller when the code there changes.
return TimeUnit.DAYS.toMillis(1);
}

public String getScanDetailsWithId(long scannerId) {
RegionScanner scanner = getScanner(scannerId);
if (scanner == null) {
Expand Down Expand Up @@ -1418,8 +1437,8 @@ private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Ship
} else {
closeCallback = new RegionScannerCloseCallBack(s);
}
RegionScannerHolder rsh =
new RegionScannerHolder(scannerName, s, r, closeCallback, shippedCallback, needCursor);
RegionScannerHolder rsh = new RegionScannerHolder(
scannerName, s, r, closeCallback, shippedCallback, needCursor, lease);
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " +
scannerName;
Expand Down Expand Up @@ -3795,4 +3814,8 @@ protected AccessChecker getAccessChecker() {
protected ZKPermissionWatcher getZkPermissionWatcher() {
return zkPermissionWatcher;
}

public boolean isMaster() {
return false;
}
}
Loading