Skip to content

Commit

Permalink
HBASE-23305: Master based registry implementation
Browse files Browse the repository at this point in the history
Implements a master based registry for clients.

- Supports hedged RPCs (fan out configured via configs).
- Parameterized existing client tests to run with multiple
  registry combinations.
- Added unit-test coverage for the new registry implenenation.
  • Loading branch information
bharathv committed Dec 20, 2019
1 parent e41b46c commit 31a04be
Show file tree
Hide file tree
Showing 13 changed files with 814 additions and 248 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.HConstants.REGISTRY_IMPL_CONF_KEY;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils;
Expand All @@ -27,8 +28,6 @@
@InterfaceAudience.Private
final class AsyncRegistryFactory {

static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";

private AsyncRegistryFactory() {
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,364 @@
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_NUM_HEDGED_REQS_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_NUM_HEDGED_REQS_KEY;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;

/**
* Master based registry implementation. Makes RPCs to the configured master addresses from config
* {@value HConstants#MASTER_ADDRS_KEY}.
*
* It has the ability to burst the same RPC to multiple masters as a batch and returns whatever
* comes back first (a.k.a hedged RPCs). Number of target masters in a single batch is controlled
* via {@value HConstants#MASTER_REGISTRY_NUM_HEDGED_REQS_KEY}. If it is set to 1 (default), it is
* equivalent to picking a random master from the configured list.
*
* TODO: Handle changes to the configuration dynamically without having to restart the client.
*/
@InterfaceAudience.Private
public class MasterRegistry implements AsyncRegistry {
private static final Logger LOG = LoggerFactory.getLogger(MasterRegistry.class);

// Configured list of masters to probe the meta information from.
private final List<ServerName> masterServers;
// Controls the fan out of the hedged requests. Requests are made in batches of this number until
// all the servers are exhausted. The first returned result is passed back to the client.
private final int requestFanOut;
private ExecutorService masterRpcPool;

// RPC client used to talk to the masters.
private final RpcClient rpcClient;
private final RpcControllerFactory rpcControllerFactory;
private final int rpcTimeoutNs;

// A simple interface that callers can implement to make an RPC to master. This is used to
// abstract out the logic needed to hedge the requests to multiple masters. For more details, look
// at doRPCs().
@VisibleForTesting
@FunctionalInterface
public interface RpcCall<RESP> {
RESP doRpc(ClientMetaService.BlockingInterface stub) throws ServiceException;
}

/**
* A shared RPC context between a batch of hedged RPCs. Tracks the state and helpers needed to
* synchronize on multiple RPCs to different masters fetching the result. All the methods are
* thread-safe.
* @param <RESP> Return response type for the RPCs.
*/
private class BatchRpcCtx<RESP> {
// Result set by the thread finishing first. Set only once.
private AtomicReference<RESP> result;
// Caller waits on this latch being set.
private CountDownLatch resultsReady;
// Book-keeping for number of failed RPCs.
private AtomicInteger failedRPCs;

BatchRpcCtx() {
result = new AtomicReference<>();
// We set this to 1, so that the first successful RPC result is returned to the client.
resultsReady = new CountDownLatch(1);
failedRPCs = new AtomicInteger(0);
}

/**
* Sets the result only if it is not already set by another thread. Thread that successfully
* sets the result also count downs the latch.
* @param result Result to be set.
*/
public void setResultIfNotSet(RESP result) {
if (this.result.compareAndSet(null, result)) {
resultsReady.countDown();
}
}

/**
* Caller can use this method to wait for results to be fetched.
* @param timeoutNs Waits until this timeout hits or the results are set. Whatever happens
* first.
* @return True if the results are ready. False otherwise.
*/
public boolean waitForResults(int timeoutNs) {
try {
return resultsReady.await(timeoutNs, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for batched master RPC results. Aborting wait.", e);
return false;
}
}

/**
* Helper to increment the number of failed RPCs.
*/
public void incrementFailedRPCs() {
failedRPCs.incrementAndGet();
}

/**
* Onus is on the caller to wait for the results and call this.
* @return the current result.
*/
public RESP getResult() {
return result.get();
}
}

/**
* A runnable implementation of an RPC call to a given master. Updates the results in a shared
* rpc context.
* @param <RESP> Response type of the RPC.
*/
private class MasterRpc<RESP> implements Runnable {
private final BatchRpcCtx<RESP> rpcCtx;
private final ServerName master;
private final RpcCall<RESP> rpcCall;
private final Function<RESP, Boolean> isValidResp;
private final String debugStr;

MasterRpc(BatchRpcCtx<RESP> rpcCtx, ServerName master, RpcCall<RESP> rpcCall,
Function<RESP, Boolean> isValidResp, String debugStr) {
this.rpcCtx = rpcCtx;
this.master = master;
this.rpcCall = rpcCall;
this.isValidResp = isValidResp;
this.debugStr = debugStr;
}

@Override
public void run() {
try {
RESP resp = rpcCall.doRpc(getMasterStub(master));
if (isValidResp.apply(resp)) {
// Valid result, set if not set by other threads.
rpcCtx.setResultIfNotSet(resp);
return;
}
} catch (Exception e) {
LOG.warn("Error calling {} on master {}. Trying other masters.", debugStr, master, e);
}
rpcCtx.incrementFailedRPCs();
}
}

MasterRegistry(Configuration conf) {
masterServers = new ArrayList<>();
requestFanOut =
conf.getInt(MASTER_REGISTRY_NUM_HEDGED_REQS_KEY, MASTER_REGISTRY_NUM_HEDGED_REQS_DEFAULT);
Preconditions.checkArgument(requestFanOut >= 1);
if (requestFanOut > 1) {
masterRpcPool = Executors.newFixedThreadPool(requestFanOut,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MasterRegistryRPC-%d").build());
}
parseMasterAddrs(conf);
rpcTimeoutNs = (int) Math.min(Integer.MAX_VALUE,
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY,
DEFAULT_HBASE_RPC_TIMEOUT)));
// TODO(HBASE-23330): Fix clients using cluster ID based token auth.
rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT);
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
}

/**
* Parses the list of master addresses from the provided configuration.
* @param conf Configuration to parse from.
*/
private void parseMasterAddrs(Configuration conf) {
String configuredMasters = conf.get(MASTER_ADDRS_KEY, MASTER_ADDRS_DEFAULT);
for (String masterAddr: configuredMasters.split(",")) {
masterServers.add(ServerName.valueOf(masterAddr, ServerName.NON_STARTCODE));
}
// (Pseudo) Randomized so that not all clients hot spot the same set of masters.
Collections.shuffle(masterServers);
Preconditions.checkArgument(!masterServers.isEmpty());
}

/**
* Makes a given RPC to master servers.
* @param rpcCall Call to make.
* @param debug String used for debug logging the RPC details.
* @param <RESP> Response type for the RPC.
* @param isvalidResp Used to verify if the response returned from RPC is valid.
* @return Optional response from the RPCs to parsed masters.
*/
@VisibleForTesting
<RESP> Optional<RESP> doRPCs(RpcCall<RESP> rpcCall,
Function<RESP, Boolean> isvalidResp, String debug) {
if (requestFanOut == 1) {
// This is the most general (and default) case. We want to avoid the thread creation and
// synchronization overhead and hence a special optimization for this case that just loops
// through the available masters in that order.
return doSequentialRPCs(rpcCall, isvalidResp, debug);
}
return doHedgedRPCs(rpcCall, isvalidResp, debug);
}

/**
* Makes RPCs in batches of {HConstants#MASTER_REGISTRY_NUM_HEDGED_REQS_KEY}. Currently should
* only be used if the request fan out is > 1. For the default case, refer to doSequentialRPCs().
*/
private <RESP> Optional<RESP> doHedgedRPCs(RpcCall<RESP> rpcCall,
Function<RESP, Boolean> isvalidResp, String debug) {
Preconditions.checkState(requestFanOut > 1);
Preconditions.checkNotNull(masterRpcPool);
int i = 0;
while (i < masterServers.size()){
// Each iteration of loop picks requestFanOut masters
int subListSize = Math.min(masterServers.size(), i + requestFanOut);
List<ServerName> masterSubList = masterServers.subList(i, subListSize);
// Create a new RPC context for this batch of RPCs and submit the RPCs to the pool.
BatchRpcCtx<RESP> batchRpcCtx = new BatchRpcCtx<>();
if (LOG.isDebugEnabled()) {
LOG.debug("Attempting to make rpc {} to batch {}.", debug,
masterSubList.stream().map(Objects::toString).collect(Collectors.toList()));
}
for (ServerName master: masterSubList) {
masterRpcPool.submit(new MasterRpc<>(batchRpcCtx, master, rpcCall, isvalidResp, debug));
}
if (batchRpcCtx.waitForResults(rpcTimeoutNs)) {
// Results set by some RPC, no point in doing rest of the calls.
return Optional.of(batchRpcCtx.getResult());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Failed to make rpc {} to batch {}. Trying others.", debug,
masterSubList.stream().map(Objects::toString).collect(Collectors.toList()));
}
i = subListSize;
}
return Optional.empty();
}


/**
* Sequentially calls the masters to make an RPC in random order. This removes the unnecessary
* thread overhead and synchronization when hedged RPCs are disabled (maxRequestFanOut == 1).
*/
private <RESP> Optional<RESP> doSequentialRPCs(RpcCall<RESP> rpcCall,
Function<RESP, Boolean> isvalidResp, String debug) {
Preconditions.checkState(requestFanOut == 1, "Invalid request fan out.");
for (ServerName master: masterServers) {
try {
RESP resp = rpcCall.doRpc(getMasterStub(master));
if (!isvalidResp.apply(resp)) {
continue;
}
return Optional.of(resp);
} catch (Exception e) {
LOG.warn("Error calling {} on master {}. Trying other masters.", debug, master, e);
}
}
// Failed on all the masters.
return Optional.empty();
}

/**
* Util that generates a master stub for a given ServerName.
*/
private ClientMetaService.BlockingInterface getMasterStub(ServerName server) throws IOException {
return ClientMetaService.newBlockingStub(
rpcClient.createBlockingRpcChannel(server, User.getCurrent(), rpcTimeoutNs));
}

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocation() {
CompletableFuture<RegionLocations> result = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
Optional<GetMetaRegionLocationsResponse> resp = doRPCs(stub -> stub.getMetaRegionLocations(
rpcControllerFactory.newController(), GetMetaRegionLocationsRequest.getDefaultInstance()),
(rpcResp) -> rpcResp.getMetaLocationsCount() != 0,"GetMetaRegionLocations()");
if (!resp.isPresent()) {
result.completeExceptionally(new MasterRegistryFetchException(masterServers,
"GetMetaRegionLocations()"));
}
List<HRegionLocation> regionLocations = new ArrayList<>();
resp.get().getMetaLocationsList().forEach(
location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
result.complete(new RegionLocations(regionLocations));
});
return result;
}

@Override
public CompletableFuture<String> getClusterId() {
CompletableFuture<String> result = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
Optional<GetClusterIdResponse> resp = doRPCs(stub -> stub.getClusterId(
rpcControllerFactory.newController(), GetClusterIdRequest.getDefaultInstance()),
GetClusterIdResponse::hasClusterId, "GetClusterId()");
if (!resp.isPresent()) {
result.completeExceptionally(new MasterRegistryFetchException(masterServers,
"GetClusterId()"));
}
result.complete(resp.get().getClusterId());
});
return result;
}

@Override
public CompletableFuture<ServerName> getMasterAddress() {
CompletableFuture<ServerName> result = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
Optional<GetActiveMasterResponse> resp = doRPCs(stub -> stub.getActiveMaster(
rpcControllerFactory.newController(), GetActiveMasterRequest.getDefaultInstance()),
GetActiveMasterResponse::hasServerName, "GetActiveMaster()");
if (!resp.isPresent()) {
result.completeExceptionally(new MasterRegistryFetchException(masterServers,
"GetMasterAddress()"));
}
result.complete(ProtobufUtil.toServerName(resp.get().getServerName()));
});
return result;
}

@Override
public void close() {
if (rpcClient != null) {
rpcClient.close();
}
if (masterRpcPool != null) {
masterRpcPool.shutdownNow();
}
}
}
Loading

0 comments on commit 31a04be

Please sign in to comment.