diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java index 80d358bc71a4..93084437a427 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -27,9 +28,6 @@ @InterfaceAudience.Private final class ConnectionRegistryFactory { - static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY = - "hbase.client.connection.registry.impl"; - private ConnectionRegistryFactory() { } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java new file mode 100644 index 000000000000..5680847ec37a --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_DEFAULT; +import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY; +import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT; +import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.function.Predicate; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.security.User; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse; + +/** + * Master based registry implementation. Makes RPCs to the configured master addresses from config + * {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}. + * + * It supports hedged reads, which can be enabled by setting + * {@value org.apache.hadoop.hbase.HConstants#MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY} to True. Fan + * out the requests batch is controlled by + * {@value org.apache.hadoop.hbase.HConstants#HBASE_RPCS_HEDGED_REQS_FANOUT_KEY}. + * + * TODO: Handle changes to the configuration dynamically without having to restart the client. + */ +@InterfaceAudience.Private +public class MasterRegistry implements ConnectionRegistry { + private static final String MASTER_ADDRS_CONF_SEPARATOR = ","; + + // Configured list of masters to probe the meta information from. + private final Set masterServers; + + // RPC client used to talk to the masters. + private final RpcClient rpcClient; + private final RpcControllerFactory rpcControllerFactory; + private final int rpcTimeoutMs; + + MasterRegistry(Configuration conf) { + boolean hedgedReadsEnabled = conf.getBoolean(MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, + MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT); + Configuration finalConf; + if (!hedgedReadsEnabled) { + // If hedged reads are disabled, it is equivalent to setting a fan out of 1. We make a copy of + // the configuration so that other places reusing this reference is not affected. + finalConf = new Configuration(conf); + finalConf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, 1); + } else { + finalConf = conf; + } + rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + masterServers = new HashSet<>(); + parseMasterAddrs(finalConf); + rpcClient = RpcClientFactory.createClient(finalConf, HConstants.CLUSTER_ID_DEFAULT); + rpcControllerFactory = RpcControllerFactory.instantiate(finalConf); + } + + /** + * @return Stub needed to make RPC using a hedged channel to the master end points. + */ + private ClientMetaService.Interface getMasterStub() throws IOException { + return ClientMetaService.newStub( + rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(), rpcTimeoutMs)); + } + + /** + * Parses the list of master addresses from the provided configuration. Supported format is + * comma separated host[:port] values. If no port number if specified, default master port is + * assumed. + * @param conf Configuration to parse from. + */ + private void parseMasterAddrs(Configuration conf) { + String configuredMasters = conf.get(MASTER_ADDRS_KEY, MASTER_ADDRS_DEFAULT); + for (String masterAddr: configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) { + HostAndPort masterHostPort = + HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT); + masterServers.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE)); + } + Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master address is needed"); + } + + @VisibleForTesting + public Set getParsedMasterServers() { + return Collections.unmodifiableSet(masterServers); + } + + /** + * Returns a call back that can be passed along to the non-blocking rpc call. It is invoked once + * the rpc finishes and the response is propagated to the passed future. + * @param future Result future to which the rpc response is propagated. + * @param isValidResp Checks if the rpc response has a valid result. + * @param transformResult Transforms the result to a different form as expected by callers. + * @param hrc RpcController instance for this rpc. + * @param debug Debug message passed along to the caller in case of exceptions. + * @param RPC result type. + * @param Transformed type of the result. + * @return A call back that can be embedded in the non-blocking rpc call. + */ + private RpcCallback getRpcCallBack(CompletableFuture future, + Predicate isValidResp, Function transformResult, HBaseRpcController hrc, + final String debug) { + return rpcResult -> { + if (rpcResult == null) { + future.completeExceptionally( + new MasterRegistryFetchException(masterServers, hrc.getFailed())); + } + if (!isValidResp.test(rpcResult)) { + // Rpc returned ok, but result was malformed. + future.completeExceptionally(new IOException( + String.format("Invalid result for request %s. Will be retried", debug))); + + } + future.complete(transformResult.apply(rpcResult)); + }; + } + + /** + * Simple helper to transform the result of getMetaRegionLocations() rpc. + */ + private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) { + List regionLocations = new ArrayList<>(); + resp.getMetaLocationsList().forEach( + location -> regionLocations.add(ProtobufUtil.toRegionLocation(location))); + return new RegionLocations(regionLocations); + } + + @Override + public CompletableFuture getMetaRegionLocations() { + CompletableFuture result = new CompletableFuture<>(); + HBaseRpcController hrc = rpcControllerFactory.newController(); + RpcCallback callback = getRpcCallBack(result, + (rpcResp) -> rpcResp.getMetaLocationsCount() != 0, this::transformMetaRegionLocations, hrc, + "getMetaRegionLocations()"); + try { + getMasterStub().getMetaRegionLocations( + hrc, GetMetaRegionLocationsRequest.getDefaultInstance(), callback); + } catch (IOException e) { + result.completeExceptionally(e); + } + return result; + } + + @Override + public CompletableFuture getClusterId() { + CompletableFuture result = new CompletableFuture<>(); + HBaseRpcController hrc = rpcControllerFactory.newController(); + RpcCallback callback = getRpcCallBack(result, + GetClusterIdResponse::hasClusterId, GetClusterIdResponse::getClusterId, hrc, + "getClusterId()"); + try { + getMasterStub().getClusterId(hrc, GetClusterIdRequest.getDefaultInstance(), callback); + } catch (IOException e) { + result.completeExceptionally(e); + } + return result; + } + + private ServerName transformServerName(GetActiveMasterResponse resp) { + return ProtobufUtil.toServerName(resp.getServerName()); + } + + @Override + public CompletableFuture getActiveMaster() { + CompletableFuture result = new CompletableFuture<>(); + HBaseRpcController hrc = rpcControllerFactory.newController(); + RpcCallback callback = getRpcCallBack(result, + GetActiveMasterResponse::hasServerName, this::transformServerName, hrc, + "getActiveMaster()"); + try { + getMasterStub().getActiveMaster(hrc, GetActiveMasterRequest.getDefaultInstance(), callback); + } catch (IOException e) { + result.completeExceptionally(e); + } + return result; + } + + @Override + public void close() { + if (rpcClient != null) { + rpcClient.close(); + } + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/MasterRegistryFetchException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/MasterRegistryFetchException.java new file mode 100644 index 000000000000..18871befef87 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/MasterRegistryFetchException.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.exceptions; + +import java.util.Set; +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.PrettyPrinter; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Exception thrown when an master registry RPC fails in client. The exception includes the list of + * masters to which RPC was attempted and the last exception encountered. Prior exceptions are + * included in the logs. + */ +@InterfaceAudience.Private +public class MasterRegistryFetchException extends HBaseIOException { + public MasterRegistryFetchException(Set masters, Throwable failure) { + super(String.format("Exception making rpc to masters %s", PrettyPrinter.toString(masters)), + failure); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 629efe6c203c..72b9f83b9d83 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -20,22 +20,6 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; - -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; -import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; -import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; -import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; -import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; -import org.apache.hbase.thirdparty.com.google.protobuf.Message; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; -import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; - -import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; - import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -43,18 +27,15 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.KeyValueCodec; @@ -69,7 +50,22 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenSelector; - +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader; +import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache; +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; /** @@ -106,7 +102,8 @@ public abstract class AbstractRpcClient implements RpcC @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_MUTABLE_COLLECTION_PKGPROTECT", justification="the rest of the system which live in the different package can use") - protected final static Map> TOKEN_HANDLERS = new HashMap<>(); + protected final static Map> TOKEN_HANDLERS = + new HashMap<>(); static { TOKEN_HANDLERS.put(Kind.HBASE_AUTH_TOKEN, new AuthenticationTokenSelector()); @@ -217,7 +214,9 @@ private void cleanupIdleConnections() { // have some pending calls on connection so we should not shutdown the connection outside. // The connection itself will disconnect if there is no pending call for maxIdleTime. if (conn.getLastTouched() < closeBeforeTime && !conn.isActive()) { - if (LOG.isTraceEnabled()) LOG.trace("Cleanup idle connection to " + conn.remoteId().address); + if (LOG.isTraceEnabled()) { + LOG.trace("Cleanup idle connection to {}", conn.remoteId().address); + } connections.removeValue(conn.remoteId(), conn); conn.cleanupConnection(); } @@ -398,7 +397,7 @@ private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress } } - private void callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, + Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc, final Message param, Message returnType, final User ticket, final InetSocketAddress addr, final RpcCallback callback) { final MetricsConnection.CallStats cs = MetricsConnection.newCallStats(); @@ -435,9 +434,10 @@ public void run(Call call) { } catch (Exception e) { call.setException(toIOE(e)); } + return call; } - private InetSocketAddress createAddr(ServerName sn) throws UnknownHostException { + InetSocketAddress createAddr(ServerName sn) throws UnknownHostException { InetSocketAddress addr = new InetSocketAddress(sn.getHostname(), sn.getPort()); if (addr.isUnresolved()) { throw new UnknownHostException("can not resolve " + sn.getServerName()); @@ -527,6 +527,13 @@ public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout) return new RpcChannelImplementation(this, createAddr(sn), user, rpcTimeout); } + @Override + public RpcChannel createHedgedRpcChannel(Set sns, User user, int rpcTimeout) + throws UnknownHostException { + // Check HedgedRpcChannel implementation for detailed comments. + throw new UnsupportedOperationException("Hedging not supported for this implementation."); + } + private static class AbstractRpcChannel { protected final InetSocketAddress addr; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java index f84c308715b2..22eca535e958 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java @@ -17,18 +17,15 @@ */ package org.apache.hadoop.hbase.ipc; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - import java.io.IOException; import java.net.SocketAddress; - import javax.net.SocketFactory; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.net.NetUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** * Does RPC against a cluster. Manages connections per regionserver in the cluster. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java new file mode 100644 index 000000000000..7b681e079bdf --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HedgedRpcChannel.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.PrettyPrinter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; + +/** + * A non-blocking implementation of RpcChannel that hedges requests to multiple service end points. + * First received response is returned to the caller. This abstracts out the logic needed to batch + * requests to multiple end points underneath and presents itself as a single logical RpcChannel to + * the client. + * + * Hedging Details: + * --------------- + * - Hedging of RPCs happens in multiple batches. In each iteration, we select a 'batch' of address + * end points to make the call to. We do multiple iterations until we get a proper response to the + * rpc call or all the service addresses are exhausted, which ever happens first. Size of each is + * configurable and is also known as 'fanOutSize'. + * + * - We randomize the addresses up front so that the batch order per client is non deterministic. + * This avoids hot spots on the service side. The size of each batch is controlled via 'fanOutSize'. + * Higher fanOutSize implies we make more rpc calls in a single batch. One needs to mindful of the + * load on the client and server side when configuring the fan out. + * + * - In a happy case, once we receive a response from one end point, we cancel all the + * other inflight rpcs in the same batch and return the response to the caller. If we do not get a + * valid response from any address end point, we propagate the error back to the caller. + * + * - Rpc timeouts are applied to every hedged rpc. + * + * - Callers need to be careful about what rpcs they are trying to hedge. Not every kind of call can + * be hedged (for example: cluster state changing rpcs). + * + * (TODO) Retries and Adaptive hedging policy: + * ------------------------------------------ + * + * - No retries are handled at the channel level. Retries can be built in upper layers. However the + * question is, do we even need retries? Hedging in fact is a substitute for retries. + * + * - Clearly hedging puts more load on the service side. To mitigate this, we can make the hedging + * policy more adaptive. In most happy cases, the rpcs from the first few end points should return + * right away (especially short lived rpcs, that do not take up much time). In such cases, hedging + * is not needed. So, the idea is to make this request pattern pluggable so that the requests are + * hedged only when needed. + */ +class HedgedRpcChannel implements RpcChannel { + private static final Logger LOG = LoggerFactory.getLogger(HedgedRpcChannel.class); + + /** + * Currently hedging is only supported for non-blocking connection implementation types because + * the channel implementation inherently relies on the connection implementation being async. + * Refer to the comments in doCallMethod(). + */ + private final NettyRpcClient rpcClient; + // List of service addresses to hedge the requests to. + private final List addrs; + private final User ticket; + private final int rpcTimeout; + // Controls the size of request fan out (number of rpcs per a single batch). + private final int fanOutSize; + + /** + * A simple rpc call back implementation to notify the batch context if any rpc is successful. + */ + private static class BatchRpcCtxCallBack implements RpcCallback { + private final BatchRpcCtx batchRpcCtx; + private final HBaseRpcController rpcController; + BatchRpcCtxCallBack(BatchRpcCtx batchRpcCtx, HBaseRpcController rpcController) { + this.batchRpcCtx = batchRpcCtx; + this.rpcController = rpcController; + } + @Override + public void run(Message result) { + batchRpcCtx.setResultIfNotSet(result, rpcController); + } + } + + /** + * A shared RPC context between a batch of hedged RPCs. Tracks the state and helpers needed to + * synchronize on multiple RPCs to different end points fetching the result. All the methods are + * thread-safe. + */ + private static class BatchRpcCtx { + // Result set by the thread finishing first. Set only once. + private final AtomicReference result = new AtomicReference<>(); + // Caller waits on this latch being set. + // We set this to 1, so that the first successful RPC result is returned to the client. + private CountDownLatch resultsReady = new CountDownLatch(1); + // Failed rpc book-keeping. + private AtomicInteger failedRpcCount = new AtomicInteger(); + // All the call handles for this batch. + private final List callsInFlight = Collections.synchronizedList(new ArrayList<>()); + + // Target addresses. + private final List addresses; + // Called when the result is ready. + private final RpcCallback callBack; + // Last failed rpc's exception. Used to propagate the reason to the controller. + private IOException lastFailedRpcReason; + + + BatchRpcCtx(List addresses, RpcCallback callBack) { + this.addresses = addresses; + this.callBack = Preconditions.checkNotNull(callBack); + } + + /** + * Sets the result only if it is not already set by another thread. Thread that successfully + * sets the result also count downs the latch. + * @param result Result to be set. + */ + public void setResultIfNotSet(Message result, HBaseRpcController rpcController) { + if (rpcController.failed()) { + incrementFailedRpcs(rpcController.getFailed()); + return; + } + if (this.result.compareAndSet(null, result)) { + resultsReady.countDown(); + // Cancel all pending in flight calls. + for (Call call: callsInFlight) { + // It is ok to do it for all calls as it is a no-op if the call is already done. + final String exceptionMsg = String.format("%s canceled because another hedged attempt " + + "for the same rpc already succeeded. This is not needed anymore.", call); + call.setException(new CallCancelledException(exceptionMsg)); + } + } + } + + /** + * Waits until the results are populated and calls the callback if the call is successful. + * @return true for successful rpc and false otherwise. + */ + public boolean waitForResults() { + try { + // We do not set a timeout on await() because we rely on the underlying RPCs to timeout if + // something on the remote is broken. Worst case we should wait for rpc time out to kick in. + resultsReady.await(); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for batched master RPC results. Aborting wait.", e); + } + Message message = result.get(); + if (message != null) { + callBack.run(message); + return true; + } + return false; + } + + public void addCallInFlight(Call c) { + callsInFlight.add(c); + } + + public void incrementFailedRpcs(IOException reason) { + if (failedRpcCount.incrementAndGet() == addresses.size()) { + lastFailedRpcReason = reason; + // All the rpcs in this batch have failed. Invoke the waiting threads. + resultsReady.countDown(); + } + } + + public IOException getLastFailedRpcReason() { + return lastFailedRpcReason; + } + + @Override + public String toString() { + return String.format("Batched rpc for target(s) %s", PrettyPrinter.toString(addresses)); + } + } + + public HedgedRpcChannel(NettyRpcClient rpcClient, Set addrs, + User ticket, int rpcTimeout, int fanOutSize) { + this.rpcClient = rpcClient; + this.addrs = new ArrayList<>(Preconditions.checkNotNull(addrs)); + Preconditions.checkArgument(this.addrs.size() >= 1); + // For non-deterministic client query pattern. Not all clients want to hedge RPCs in the same + // order, creating hot spots on the service end points. + Collections.shuffle(this.addrs); + this.ticket = ticket; + this.rpcTimeout = rpcTimeout; + // fanOutSize controls the number of hedged RPCs per batch. + this.fanOutSize = fanOutSize; + } + + private HBaseRpcController applyRpcTimeout(RpcController controller) { + HBaseRpcController hBaseRpcController = (HBaseRpcController) controller; + int rpcTimeoutToSet = + hBaseRpcController.hasCallTimeout() ? hBaseRpcController.getCallTimeout() : rpcTimeout; + HBaseRpcController response = new HBaseRpcControllerImpl(); + response.setCallTimeout(rpcTimeoutToSet); + return response; + } + + private void doCallMethod(Descriptors.MethodDescriptor method, HBaseRpcController controller, + Message request, Message responsePrototype, RpcCallback done) { + int i = 0; + BatchRpcCtx lastBatchCtx = null; + while (i < addrs.size()) { + // Each iteration picks fanOutSize addresses to run as batch. + int batchEnd = Math.min(addrs.size(), i + fanOutSize); + List addrSubList = addrs.subList(i, batchEnd); + BatchRpcCtx batchRpcCtx = new BatchRpcCtx(addrSubList, done); + lastBatchCtx = batchRpcCtx; + LOG.debug("Attempting request {}, {}", method.getName(), batchRpcCtx); + for (InetSocketAddress address : addrSubList) { + HBaseRpcController rpcController = applyRpcTimeout(controller); + // ** WARN ** This is a blocking call if the underlying connection for the rpc client is + // a blocking implementation (ex: BlockingRpcConnection). That essentially serializes all + // the write calls. Handling blocking connection means that this should be run in a separate + // thread and hence more code complexity. Is it ok to handle only non-blocking connections? + batchRpcCtx.addCallInFlight(rpcClient.callMethod(method, rpcController, request, + responsePrototype, ticket, address, + new BatchRpcCtxCallBack(batchRpcCtx, rpcController))); + } + if (batchRpcCtx.waitForResults()) { + return; + } + // Entire batch has failed, lets try the next batch. + LOG.debug("Failed request {}, {}.", method.getName(), batchRpcCtx); + i = batchEnd; + } + Preconditions.checkNotNull(lastBatchCtx); + // All the batches failed, mark it a failed rpc. + // Propagate the failure reason. We propagate the last batch's last failing rpc reason. + // Can we do something better? + controller.setFailed(lastBatchCtx.getLastFailedRpcReason()); + done.run(null); + } + + @Override + public void callMethod(Descriptors.MethodDescriptor method, RpcController controller, + Message request, Message responsePrototype, RpcCallback done) { + // There is no reason to use any other implementation of RpcController. + Preconditions.checkState(controller instanceof HBaseRpcController); + // To make the channel non-blocking, we run the actual doCalMethod() async. The call back is + // called once the hedging finishes. + CompletableFuture.runAsync( + () -> doCallMethod(method, (HBaseRpcController)controller, request, responsePrototype, done)); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java index 61dedbb5c124..c4f70b05aaa1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java @@ -17,21 +17,26 @@ */ package org.apache.hadoop.hbase.ipc; -import org.apache.hbase.thirdparty.io.netty.channel.Channel; -import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; -import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; -import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; -import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory; - import java.io.IOException; +import java.net.InetSocketAddress; import java.net.SocketAddress; - +import java.net.UnknownHostException; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; -import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.MetricsConnection; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; +import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory; /** * Netty client for the requests and responses. @@ -74,6 +79,19 @@ protected NettyRpcConnection createConnection(ConnectionId remoteId) throws IOEx return new NettyRpcConnection(this, remoteId); } + @Override + public RpcChannel createHedgedRpcChannel(Set sns, User user, int rpcTimeout) + throws UnknownHostException { + final int hedgedRpcFanOut = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, + HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT); + Set addresses = new HashSet<>(); + for (ServerName sn: sns) { + addresses.add(createAddr(sn)); + } + return new HedgedRpcChannel(this, addresses, user, rpcTimeout, + hedgedRpcFanOut); + } + @Override protected void closeInternal() { if (shutdownGroupWhenClose) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 0e006956d249..558fceeb78bb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -17,15 +17,14 @@ */ package org.apache.hadoop.hbase.ipc; -import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; - import java.io.Closeable; import java.io.IOException; - +import java.util.Set; import org.apache.hadoop.hbase.ServerName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.security.User; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; /** * Interface for RpcClient implementations so ConnectionManager can handle it. @@ -83,6 +82,16 @@ BlockingRpcChannel createBlockingRpcChannel(ServerName sn, User user, int rpcTim RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) throws IOException; + /** + * Creates a channel that can hedge request to multiple underlying channels. + * @param sns Set of servers for underlying channels. + * @param user user for the connection. + * @param rpcTimeout rpc timeout to use. + * @return A hedging rpc channel for this rpc client instance. + */ + RpcChannel createHedgedRpcChannel(final Set sns, final User user, int rpcTimeout) + throws IOException; + /** * Interrupt the connections to the given server. This should be called if the server * is known as actually dead. This will not prevent current operation to be retried, and, diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java index f02ec422a922..561b1f5715fd 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.FutureUtils; @@ -70,7 +71,7 @@ public void close() { @BeforeClass public static void setUp() { - CONF.setClass(ConnectionRegistryFactory.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, + CONF.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ConnectionRegistryForTest.class, ConnectionRegistry.class); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 132d3e03be60..ef525d7b9b38 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -180,10 +180,17 @@ public enum OperationStatusCode { public static final String MASTER_INFO_PORT = "hbase.master.info.port"; /** Configuration key for the list of master host:ports **/ - public static final String MASTER_ADDRS_KEY = "hbase.master.addrs"; + public static final String MASTER_ADDRS_KEY = "hbase.masters"; public static final String MASTER_ADDRS_DEFAULT = "localhost:" + DEFAULT_MASTER_PORT; + /** Configuration to enable hedged reads on master registry **/ + public static final String MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY = + "hbase.client.master_registry.enable_hedged_reads"; + + /** Default value for enabling hedging reads on master registry **/ + public static final boolean MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT = false; + /** Parameter name for the master type being backup (waits for primary to go inactive). */ public static final String MASTER_TYPE_BACKUP = "hbase.master.backup"; @@ -909,6 +916,12 @@ public enum OperationStatusCode { */ public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout"; + /** Configuration key that controls the fan out of requests in hedged channel implementation. **/ + public static final String HBASE_RPCS_HEDGED_REQS_FANOUT_KEY = "hbase.rpc.hedged.fanout"; + + /** Default value for the fan out of hedged requests. **/ + public static final int HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT = 2; + /** * timeout for each read RPC */ @@ -940,6 +953,11 @@ public enum OperationStatusCode { */ public static final long NO_SEQNUM = -1; + /** + * Registry implementation to be used on the client side. + */ + public static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY = + "hbase.client.registry.impl"; /* * cluster replication constants. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/PrettyPrinter.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/PrettyPrinter.java index 147e9160f910..ff7064b11430 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/PrettyPrinter.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/PrettyPrinter.java @@ -19,9 +19,12 @@ package org.apache.hadoop.hbase.util; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Objects; import java.util.regex.Matcher; import java.util.regex.Pattern; - import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.exceptions.HBaseException; import org.apache.yetus.audience.InterfaceAudience; @@ -29,7 +32,7 @@ import org.slf4j.LoggerFactory; @InterfaceAudience.Private -public class PrettyPrinter { +public final class PrettyPrinter { private static final Logger LOG = LoggerFactory.getLogger(PrettyPrinter.class); @@ -117,7 +120,7 @@ private static String humanReadableTTL(final long interval){ sb.append(" DAY").append(days == 1 ? "" : "S"); } - if (hours > 0 ) { + if (hours > 0) { sb.append(days > 0 ? " " : ""); sb.append(hours); sb.append(" HOUR").append(hours == 1 ? "" : "S"); @@ -188,4 +191,18 @@ private static long humanReadableIntervalToSec(final String humanReadableInterva return ttl; } + /** + * Pretty prints a collection of any type to a string. Relies on toString() implementation of the + * object type. + * @param collection collection to pretty print. + * @return Pretty printed string for the collection. + */ + public static String toString(Collection collection) { + List stringList = new ArrayList<>(); + for (Object o: collection) { + stringList.add(Objects.toString(o)); + } + return "[" + String.join(",", stringList) + "]"; + } + } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTableName.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTableName.java index 43a384ade931..7e443993ddb3 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTableName.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TestTableName.java @@ -51,7 +51,21 @@ public class TestTableName extends TestWatcher { */ @Override protected void starting(Description description) { - tableName = TableName.valueOf(description.getMethodName()); + tableName = TableName.valueOf(cleanUpTestName(description.getMethodName())); + } + + /** + * Helper to handle parameterized method names. Unlike regular test methods, parameterized method + * names look like 'foo[x]'. This is problematic for tests that use this name for HBase tables. + * This helper strips out the parameter suffixes. + * @return current test method name with out parameterized suffixes. + */ + private static String cleanUpTestName(String methodName) { + int index = methodName.indexOf('['); + if (index == -1) { + return methodName; + } + return methodName.substring(0, index); } public TableName getTableName() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java index 5c6ad955fe26..cfa6f75faea0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java @@ -26,12 +26,14 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import org.apache.hadoop.hbase.HConstants; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; /** * Utility used running a cluster all in the one JVM. @@ -136,6 +138,11 @@ public static JVMClusterUtil.MasterThread createMasterThread(final Configuration } catch (Exception e) { throw new IOException(e); } + // Needed if a master based registry is configured for internal cluster connections. Here, we + // just add the current master host port since we do not know other master addresses up front + // in mini cluster tests. + c.set(HConstants.MASTER_ADDRS_KEY, + Preconditions.checkNotNull(server.getServerName().getAddress()).toString()); return new JVMClusterUtil.MasterThread(server, index); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index f1e91debafd0..d1f2f1f13b95 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1128,6 +1128,9 @@ public MiniHBaseCluster startMiniHBaseCluster(StartMiniClusterOption option) this.hbaseCluster = new MiniHBaseCluster(c, option.getNumMasters(), option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(), option.getRsClass()); + // Populate the master address configuration from mini cluster configuration. + conf.set(HConstants.MASTER_ADDRS_KEY, + c.get(HConstants.MASTER_ADDRS_KEY, HConstants.MASTER_ADDRS_DEFAULT)); // Don't leave here till we've done a successful scan of the hbase:meta Table t = getConnection().getTable(TableName.META_TABLE_NAME); ResultScanner s = t.getScanner(new Scan()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java index a669362d4d4b..c9d67f48023b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.client; import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; @@ -28,7 +29,7 @@ public class DummyConnectionRegistry implements ConnectionRegistry { public static final String REGISTRY_IMPL_CONF_KEY = - ConnectionRegistryFactory.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY; + HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY; @Override public CompletableFuture getMetaRegionLocations() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 61d78ce6430d..4f4fe79f7d8c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,10 +28,10 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -64,7 +64,9 @@ import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.StartMiniClusterOption; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TestTableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; @@ -106,23 +108,28 @@ import org.apache.hadoop.hbase.util.NonRepeatedEnvironmentEdge; import org.apache.hadoop.hbase.util.TableDescriptorChecker; import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.Assume; import org.junit.ClassRule; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; /** * Run tests that use the HBase clients; {@link Table}. * Sets up the HBase mini cluster once at start and runs through all client tests. * Each creates a table named for the method and does its stuff against that. + * + * Parameterized to run with different registry implementations. */ @Category({LargeTests.class, ClientTests.class}) @SuppressWarnings ("deprecation") +@RunWith(Parameterized.class) public class TestFromClientSide { @ClassRule @@ -131,7 +138,7 @@ public class TestFromClientSide { // NOTE: Increment tests were moved to their own class, TestIncrementsFromClientSide. private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide.class); - protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + protected static HBaseTestingUtility TEST_UTIL; private static byte [] ROW = Bytes.toBytes("testRow"); private static byte [] FAMILY = Bytes.toBytes("testFamily"); private static final byte[] INVALID_FAMILY = Bytes.toBytes("invalidTestFamily"); @@ -139,10 +146,54 @@ public class TestFromClientSide { private static byte [] VALUE = Bytes.toBytes("testValue"); protected static int SLAVES = 3; - @Rule - public TestName name = new TestName(); + @Rule public TestTableName name = new TestTableName(); + + // To keep the child classes happy. + TestFromClientSide() {} + + public TestFromClientSide(Class registry, int numHedgedReqs) throws Exception { + initialize(registry, numHedgedReqs, MultiRowMutationEndpoint.class); + } + + @Parameterized.Parameters + public static Collection parameters() { + return Arrays.asList(new Object[][] { + { MasterRegistry.class, 1}, + { MasterRegistry.class, 2}, + { ZKConnectionRegistry.class, 1} + }); + } + + /** + * JUnit does not provide an easy way to run a hook after each parameterized run. Without that + * there is no easy way to restart the test cluster after each parameterized run. Annotation + * BeforeParam does not work either because it runs before parameterization and hence does not + * have access to the test parameters (which is weird). + * + * This *hack* checks if the current instance of test cluster configuration has the passed + * parameterized configs. In such a case, we can just reuse the cluster for test and do not need + * to initialize from scratch. While this is a hack, it saves a ton of time for the full + * test and de-flakes it. + */ + private static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) { + if (TEST_UTIL == null) { + return false; + } + Configuration conf = TEST_UTIL.getConfiguration(); + Class confClass = conf.getClass( + HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class); + int hedgedReqConfig = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, + HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT); + return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig; + } - protected static final void initialize(Class... cps) throws Exception { + protected static final void initialize(Class registryImpl, int numHedgedReqs, Class... cps) + throws Exception { + // initialize() is called for every unit test, however we only want to reset the cluster state + // at the end of every parameterized run. + if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) { + return; + } // Uncomment the following lines if more verbosity is needed for // debugging (see HBASE-12285 for details). // ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); @@ -150,22 +201,35 @@ protected static final void initialize(Class... cps) throws Exception { // ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); // make sure that we do not get the same ts twice, see HBASE-19731 for more details. EnvironmentEdgeManager.injectEdge(new NonRepeatedEnvironmentEdge()); + if (TEST_UTIL != null) { + // We reached end of a parameterized run, clean up. + TEST_UTIL.shutdownMiniCluster(); + } + TEST_UTIL = new HBaseTestingUtility(); Configuration conf = TEST_UTIL.getConfiguration(); conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, Arrays.stream(cps).map(Class::getName).toArray(String[]::new)); conf.setBoolean(TableDescriptorChecker.TABLE_SANITY_CHECKS, true); // enable for below tests - // We need more than one region server in this test - TEST_UTIL.startMiniCluster(SLAVES); - } - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - initialize(MultiRowMutationEndpoint.class); + conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl, + ConnectionRegistry.class); + if (numHedgedReqs == 1) { + conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false); + } else { + Preconditions.checkArgument(numHedgedReqs > 1); + conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true); + } + conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs); + StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder(); + // Multiple masters needed only when hedged reads for master registry are enabled. + builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(SLAVES); + TEST_UTIL.startMiniCluster(builder.build()); } @AfterClass public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); + if (TEST_UTIL != null) { + TEST_UTIL.shutdownMiniCluster(); + } } /** @@ -173,7 +237,7 @@ public static void tearDownAfterClass() throws Exception { */ @Test public void testDuplicateAppend() throws Exception { - HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(name.getMethodName()); + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor(name.getTableName()); Map kvs = new HashMap<>(); kvs.put(SleepAtFirstRpcCall.SLEEP_TIME_CONF_KEY, "2000"); hdt.addCoprocessor(SleepAtFirstRpcCall.class.getName(), null, 1, kvs); @@ -181,11 +245,11 @@ public void testDuplicateAppend() throws Exception { Configuration c = new Configuration(TEST_UTIL.getConfiguration()); c.setInt(HConstants.HBASE_CLIENT_PAUSE, 50); - // Client will retry beacuse rpc timeout is small than the sleep time of first rpc call + // Client will retry because rpc timeout is small than the sleep time of first rpc call c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 1500); try (Connection connection = ConnectionFactory.createConnection(c); - Table table = connection.getTableBuilder(TableName.valueOf(name.getMethodName()), null) + Table table = connection.getTableBuilder(name.getTableName(), null) .setOperationTimeout(3 * 1000).build()) { Append append = new Append(ROW); append.addColumn(HBaseTestingUtility.fam1, QUALIFIER, VALUE); @@ -209,7 +273,7 @@ public void testDuplicateAppend() throws Exception { */ @Test public void testKeepDeletedCells() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); final byte[] FAMILY = Bytes.toBytes("family"); final byte[] C0 = Bytes.toBytes("c0"); @@ -275,7 +339,7 @@ public void testKeepDeletedCells() throws Exception { */ @Test public void testPurgeFutureDeletes() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); final byte[] ROW = Bytes.toBytes("row"); final byte[] FAMILY = Bytes.toBytes("family"); final byte[] COLUMN = Bytes.toBytes("column"); @@ -328,7 +392,7 @@ public boolean evaluate() throws IOException { */ @Test public void testGetConfiguration() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") }; Configuration conf = TEST_UTIL.getConfiguration(); try (Table table = TEST_UTIL.createTable(tableName, FAMILIES)) { @@ -342,7 +406,7 @@ public void testGetConfiguration() throws Exception { */ @Test public void testWeirdCacheBehaviour() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [][] FAMILIES = new byte[][] { Bytes.toBytes("trans-blob"), Bytes.toBytes("trans-type"), Bytes.toBytes("trans-date"), Bytes.toBytes("trans-tags"), Bytes.toBytes("trans-group") }; @@ -383,7 +447,7 @@ public void testWeirdCacheBehaviour() throws Exception { } private void deleteColumns(Table ht, String value, String keyPrefix) - throws IOException { + throws IOException { ResultScanner scanner = buildScanner(keyPrefix, value, ht); Iterator it = scanner.iterator(); int count = 0; @@ -468,8 +532,8 @@ private void putRows(Table ht, int numRows, String value, String key) */ @Test public void testFilterAcrossMultipleRegions() - throws IOException, InterruptedException { - final TableName tableName = TableName.valueOf(name.getMethodName()); + throws IOException, InterruptedException { + final TableName tableName = name.getTableName(); try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { int rowCount = TEST_UTIL.loadTable(t, FAMILY, false); assertRowCount(t, rowCount); @@ -555,8 +619,7 @@ private void assertRowCount(final Table t, final int expected) throws IOExceptio * @param t Table to split. * @return Map of regions to servers. */ - private List splitTable(final Table t) - throws IOException, InterruptedException { + private List splitTable(final Table t) throws IOException { // Split this table in two. Admin admin = TEST_UTIL.getAdmin(); admin.split(t.getName()); @@ -572,8 +635,7 @@ private List splitTable(final Table t) * @param t * @return Map of table regions; caller needs to check table actually split. */ - private List waitOnSplit(final Table t) - throws IOException { + private List waitOnSplit(final Table t) throws IOException { try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(t.getName())) { List regions = locator.getAllRegionLocations(); int originalCount = regions.size(); @@ -585,8 +647,9 @@ private List waitOnSplit(final Table t) e.printStackTrace(); } regions = locator.getAllRegionLocations(); - if (regions.size() > originalCount) + if (regions.size() > originalCount) { break; + } } return regions; } @@ -594,7 +657,7 @@ private List waitOnSplit(final Table t) @Test public void testSuperSimple() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { Put put = new Put(ROW); put.addColumn(FAMILY, QUALIFIER, VALUE); @@ -610,7 +673,7 @@ public void testSuperSimple() throws Exception { @Test public void testMaxKeyValueSize() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); Configuration conf = TEST_UTIL.getConfiguration(); String oldMaxSize = conf.get(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY); try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { @@ -640,7 +703,7 @@ public void testMaxKeyValueSize() throws Exception { @Test public void testFilters() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { byte[][] ROWS = makeN(ROW, 10); byte[][] QUALIFIERS = { @@ -677,7 +740,7 @@ public void testFilters() throws Exception { @Test public void testFilterWithLongCompartor() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { byte[][] ROWS = makeN(ROW, 10); byte[][] values = new byte[10][]; @@ -709,7 +772,7 @@ public void testFilterWithLongCompartor() throws Exception { @Test public void testKeyOnlyFilter() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { byte[][] ROWS = makeN(ROW, 10); byte[][] QUALIFIERS = { @@ -747,7 +810,7 @@ public void testKeyOnlyFilter() throws Exception { */ @Test public void testSimpleMissing() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { byte[][] ROWS = makeN(ROW, 4); @@ -858,7 +921,7 @@ public void testSimpleMissing() throws Exception { */ @Test public void testSingleRowMultipleFamily() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [][] ROWS = makeN(ROW, 3); byte [][] FAMILIES = makeNAscii(FAMILY, 10); byte [][] QUALIFIERS = makeN(QUALIFIER, 10); @@ -1166,7 +1229,7 @@ public void testNullTableName() throws IOException { @Test(expected = IllegalArgumentException.class) public void testNullFamilyName() throws IOException { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); // Null family (should NOT work) TEST_UTIL.createTable(tableName, new byte[][]{null}); @@ -1175,7 +1238,7 @@ public void testNullFamilyName() throws IOException { @Test public void testNullRowAndQualifier() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { @@ -1211,7 +1274,7 @@ public void testNullRowAndQualifier() throws Exception { @Test public void testNullEmptyQualifier() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { @@ -1249,7 +1312,7 @@ public void testNullEmptyQualifier() throws Exception { @Test public void testNullValue() throws IOException { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { // Null value @@ -1284,7 +1347,7 @@ public void testNullValue() throws IOException { @Test public void testNullQualifier() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { // Work for Put @@ -1342,7 +1405,7 @@ public void testNullQualifier() throws Exception { @Test public void testVersions() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); long [] STAMPS = makeStamps(20); byte [][] VALUES = makeNAscii(VALUE, 20); @@ -1569,7 +1632,7 @@ public void testVersions() throws Exception { @Test @SuppressWarnings("checkstyle:MethodLength") public void testVersionLimits() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [][] FAMILIES = makeNAscii(FAMILY, 3); int [] LIMITS = {1,3,5}; long [] STAMPS = makeStamps(10); @@ -1764,7 +1827,7 @@ public void testVersionLimits() throws Exception { @Test public void testDeleteFamilyVersion() throws Exception { try (Admin admin = TEST_UTIL.getAdmin()) { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte[][] QUALIFIERS = makeNAscii(QUALIFIER, 1); byte[][] VALUES = makeN(VALUE, 5); @@ -1804,7 +1867,7 @@ public void testDeleteFamilyVersion() throws Exception { @Test public void testDeleteFamilyVersionWithOtherDeletes() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [][] QUALIFIERS = makeNAscii(QUALIFIER, 5); byte [][] VALUES = makeN(VALUE, 5); @@ -1922,7 +1985,7 @@ public void testDeleteFamilyVersionWithOtherDeletes() throws Exception { @Test public void testDeleteWithFailed() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [][] FAMILIES = makeNAscii(FAMILY, 3); byte [][] VALUES = makeN(VALUE, 5); @@ -1948,7 +2011,7 @@ public void testDeleteWithFailed() throws Exception { @Test public void testDeletes() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [][] ROWS = makeNAscii(ROW, 6); byte [][] FAMILIES = makeNAscii(FAMILY, 3); @@ -2254,7 +2317,7 @@ public void testDeletes() throws Exception { */ @Test public void testBatchOperationsWithErrors() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table foo = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 10)) { int NUM_OPS = 100; @@ -2380,7 +2443,7 @@ public void testJiraTest867() throws Exception { int numRows = 10; int numColsPerRow = 2000; - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [][] ROWS = makeN(ROW, numRows); byte [][] QUALIFIERS = makeN(QUALIFIER, numColsPerRow); @@ -2463,7 +2526,7 @@ public void testJiraTest867() throws Exception { */ @Test public void testJiraTest861() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [][] VALUES = makeNAscii(VALUE, 7); long [] STAMPS = makeStamps(7); @@ -2526,7 +2589,7 @@ public void testJiraTest861() throws Exception { */ @Test public void testJiraTest33() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [][] VALUES = makeNAscii(VALUE, 7); long [] STAMPS = makeStamps(7); @@ -2574,7 +2637,7 @@ public void testJiraTest33() throws Exception { */ @Test public void testJiraTest1014() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table ht = TEST_UTIL.createTable(tableName, FAMILY, 10)) { @@ -2598,7 +2661,7 @@ public void testJiraTest1014() throws Exception { */ @Test public void testJiraTest1182() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [][] VALUES = makeNAscii(VALUE, 7); long [] STAMPS = makeStamps(7); @@ -2642,7 +2705,7 @@ public void testJiraTest1182() throws Exception { */ @Test public void testJiraTest52() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [][] VALUES = makeNAscii(VALUE, 7); long [] STAMPS = makeStamps(7); @@ -2678,8 +2741,7 @@ public void testJiraTest52() throws Exception { private void getVersionRangeAndVerifyGreaterThan(Table ht, byte [] row, byte [] family, byte [] qualifier, long [] stamps, byte [][] values, - int start, int end) - throws IOException { + int start, int end) throws IOException { Get get = new Get(row); get.addColumn(family, qualifier); get.readVersions(Integer.MAX_VALUE); @@ -2689,8 +2751,7 @@ private void getVersionRangeAndVerifyGreaterThan(Table ht, byte [] row, } private void getVersionRangeAndVerify(Table ht, byte [] row, byte [] family, - byte [] qualifier, long [] stamps, byte [][] values, int start, int end) - throws IOException { + byte [] qualifier, long [] stamps, byte [][] values, int start, int end) throws IOException { Get get = new Get(row); get.addColumn(family, qualifier); get.readVersions(Integer.MAX_VALUE); @@ -2700,8 +2761,7 @@ private void getVersionRangeAndVerify(Table ht, byte [] row, byte [] family, } private void getAllVersionsAndVerify(Table ht, byte [] row, byte [] family, - byte [] qualifier, long [] stamps, byte [][] values, int start, int end) - throws IOException { + byte [] qualifier, long [] stamps, byte [][] values, int start, int end) throws IOException { Get get = new Get(row); get.addColumn(family, qualifier); get.readVersions(Integer.MAX_VALUE); @@ -2711,8 +2771,7 @@ private void getAllVersionsAndVerify(Table ht, byte [] row, byte [] family, private void scanVersionRangeAndVerifyGreaterThan(Table ht, byte [] row, byte [] family, byte [] qualifier, long [] stamps, byte [][] values, - int start, int end) - throws IOException { + int start, int end) throws IOException { Scan scan = new Scan(row); scan.addColumn(family, qualifier); scan.setMaxVersions(Integer.MAX_VALUE); @@ -2722,8 +2781,7 @@ private void scanVersionRangeAndVerifyGreaterThan(Table ht, byte [] row, } private void scanVersionRangeAndVerify(Table ht, byte [] row, byte [] family, - byte [] qualifier, long [] stamps, byte [][] values, int start, int end) - throws IOException { + byte [] qualifier, long [] stamps, byte [][] values, int start, int end) throws IOException { Scan scan = new Scan(row); scan.addColumn(family, qualifier); scan.setMaxVersions(Integer.MAX_VALUE); @@ -2733,8 +2791,7 @@ private void scanVersionRangeAndVerify(Table ht, byte [] row, byte [] family, } private void scanAllVersionsAndVerify(Table ht, byte [] row, byte [] family, - byte [] qualifier, long [] stamps, byte [][] values, int start, int end) - throws IOException { + byte [] qualifier, long [] stamps, byte [][] values, int start, int end) throws IOException { Scan scan = new Scan(row); scan.addColumn(family, qualifier); scan.setMaxVersions(Integer.MAX_VALUE); @@ -2743,8 +2800,7 @@ private void scanAllVersionsAndVerify(Table ht, byte [] row, byte [] family, } private void getVersionAndVerify(Table ht, byte [] row, byte [] family, - byte [] qualifier, long stamp, byte [] value) - throws Exception { + byte [] qualifier, long stamp, byte [] value) throws Exception { Get get = new Get(row); get.addColumn(family, qualifier); get.setTimestamp(stamp); @@ -2754,8 +2810,7 @@ private void getVersionAndVerify(Table ht, byte [] row, byte [] family, } private void getVersionAndVerifyMissing(Table ht, byte [] row, byte [] family, - byte [] qualifier, long stamp) - throws Exception { + byte [] qualifier, long stamp) throws Exception { Get get = new Get(row); get.addColumn(family, qualifier); get.setTimestamp(stamp); @@ -2765,8 +2820,7 @@ private void getVersionAndVerifyMissing(Table ht, byte [] row, byte [] family, } private void scanVersionAndVerify(Table ht, byte [] row, byte [] family, - byte [] qualifier, long stamp, byte [] value) - throws Exception { + byte [] qualifier, long stamp, byte [] value) throws Exception { Scan scan = new Scan(row); scan.addColumn(family, qualifier); scan.setTimestamp(stamp); @@ -2776,8 +2830,7 @@ private void scanVersionAndVerify(Table ht, byte [] row, byte [] family, } private void scanVersionAndVerifyMissing(Table ht, byte [] row, - byte [] family, byte [] qualifier, long stamp) - throws Exception { + byte [] family, byte [] qualifier, long stamp) throws Exception { Scan scan = new Scan(row); scan.addColumn(family, qualifier); scan.setTimestamp(stamp); @@ -2786,10 +2839,7 @@ private void scanVersionAndVerifyMissing(Table ht, byte [] row, assertNullResult(result); } - private void getTestNull(Table ht, byte [] row, byte [] family, - byte [] value) - throws Exception { - + private void getTestNull(Table ht, byte [] row, byte [] family, byte [] value) throws Exception { Get get = new Get(row); get.addColumn(family, null); Result result = ht.get(get); @@ -2866,9 +2916,7 @@ private void scanTestNull(Table ht, byte[] row, byte[] family, byte[] value, } private void singleRowGetTest(Table ht, byte [][] ROWS, byte [][] FAMILIES, - byte [][] QUALIFIERS, byte [][] VALUES) - throws Exception { - + byte [][] QUALIFIERS, byte [][] VALUES) throws Exception { // Single column from memstore Get get = new Get(ROWS[0]); get.addColumn(FAMILIES[4], QUALIFIERS[0]); @@ -2923,7 +2971,7 @@ private void singleRowGetTest(Table ht, byte [][] ROWS, byte [][] FAMILIES, assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES, new int [][] { {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7} - }); + }); // Multiple columns from everywhere storefile, many family, wildcard get = new Get(ROWS[0]); @@ -2939,7 +2987,7 @@ private void singleRowGetTest(Table ht, byte [][] ROWS, byte [][] FAMILIES, assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES, new int [][] { {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7} - }); + }); // Everything get = new Get(ROWS[0]); @@ -2947,7 +2995,7 @@ private void singleRowGetTest(Table ht, byte [][] ROWS, byte [][] FAMILIES, assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES, new int [][] { {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}, {9, 0, 0} - }); + }); // Get around inserted columns @@ -2964,9 +3012,7 @@ private void singleRowGetTest(Table ht, byte [][] ROWS, byte [][] FAMILIES, } private void singleRowScanTest(Table ht, byte [][] ROWS, byte [][] FAMILIES, - byte [][] QUALIFIERS, byte [][] VALUES) - throws Exception { - + byte [][] QUALIFIERS, byte [][] VALUES) throws Exception { // Single column from memstore Scan scan = new Scan(); scan.addColumn(FAMILIES[4], QUALIFIERS[0]); @@ -3021,7 +3067,7 @@ private void singleRowScanTest(Table ht, byte [][] ROWS, byte [][] FAMILIES, assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES, new int [][] { {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7} - }); + }); // Multiple columns from everywhere storefile, many family, wildcard scan = new Scan(); @@ -3037,7 +3083,7 @@ private void singleRowScanTest(Table ht, byte [][] ROWS, byte [][] FAMILIES, assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES, new int [][] { {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7} - }); + }); // Everything scan = new Scan(); @@ -3045,7 +3091,7 @@ private void singleRowScanTest(Table ht, byte [][] ROWS, byte [][] FAMILIES, assertNResult(result, ROWS[0], FAMILIES, QUALIFIERS, VALUES, new int [][] { {2, 2, 2}, {2, 4, 4}, {4, 0, 0}, {4, 4, 4}, {6, 6, 6}, {6, 7, 7}, {7, 7, 7}, {9, 0, 0} - }); + }); // Scan around inserted columns @@ -3065,13 +3111,9 @@ private void singleRowScanTest(Table ht, byte [][] ROWS, byte [][] FAMILIES, * Expects family and qualifier arrays to be valid for at least * the range: idx-2 < idx < idx+2 */ - private void getVerifySingleColumn(Table ht, - byte [][] ROWS, int ROWIDX, - byte [][] FAMILIES, int FAMILYIDX, - byte [][] QUALIFIERS, int QUALIFIERIDX, - byte [][] VALUES, int VALUEIDX) - throws Exception { - + private void getVerifySingleColumn(Table ht, byte [][] ROWS, int ROWIDX, byte [][] FAMILIES, + int FAMILYIDX, byte [][] QUALIFIERS, int QUALIFIERIDX, byte [][] VALUES, int VALUEIDX) + throws Exception { Get get = new Get(ROWS[ROWIDX]); Result result = ht.get(get); assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX], @@ -3123,13 +3165,9 @@ private void getVerifySingleColumn(Table ht, * the range: idx-2 to idx+2 * Expects row array to be valid for at least idx to idx+2 */ - private void scanVerifySingleColumn(Table ht, - byte [][] ROWS, int ROWIDX, - byte [][] FAMILIES, int FAMILYIDX, - byte [][] QUALIFIERS, int QUALIFIERIDX, - byte [][] VALUES, int VALUEIDX) - throws Exception { - + private void scanVerifySingleColumn(Table ht, byte [][] ROWS, int ROWIDX, byte [][] FAMILIES, + int FAMILYIDX, byte [][] QUALIFIERS, int QUALIFIERIDX, byte [][] VALUES, int VALUEIDX) + throws Exception { Scan scan = new Scan(); Result result = getSingleScanResult(ht, scan); assertSingleResult(result, ROWS[ROWIDX], FAMILIES[FAMILYIDX], @@ -3183,12 +3221,8 @@ private void scanVerifySingleColumn(Table ht, * Verify we do not read any values by accident around a single column * Same requirements as getVerifySingleColumn */ - private void getVerifySingleEmpty(Table ht, - byte [][] ROWS, int ROWIDX, - byte [][] FAMILIES, int FAMILYIDX, - byte [][] QUALIFIERS, int QUALIFIERIDX) - throws Exception { - + private void getVerifySingleEmpty(Table ht, byte [][] ROWS, int ROWIDX, byte [][] FAMILIES, + int FAMILYIDX, byte [][] QUALIFIERS, int QUALIFIERIDX) throws Exception { Get get = new Get(ROWS[ROWIDX]); get.addFamily(FAMILIES[4]); get.addColumn(FAMILIES[4], QUALIFIERS[1]); @@ -3214,12 +3248,8 @@ private void getVerifySingleEmpty(Table ht, } - private void scanVerifySingleEmpty(Table ht, - byte [][] ROWS, int ROWIDX, - byte [][] FAMILIES, int FAMILYIDX, - byte [][] QUALIFIERS, int QUALIFIERIDX) - throws Exception { - + private void scanVerifySingleEmpty(Table ht, byte [][] ROWS, int ROWIDX, byte [][] FAMILIES, + int FAMILYIDX, byte [][] QUALIFIERS, int QUALIFIERIDX) throws Exception { Scan scan = new Scan(ROWS[ROWIDX+1]); Result result = getSingleScanResult(ht, scan); assertNullResult(result); @@ -3244,9 +3274,7 @@ private void scanVerifySingleEmpty(Table ht, // Verifiers // - private void assertKey(Cell key, byte [] row, byte [] family, - byte [] qualifier, byte [] value) - throws Exception { + private void assertKey(Cell key, byte [] row, byte [] family, byte [] qualifier, byte [] value) { assertTrue("Expected row [" + Bytes.toString(row) + "] " + "Got row [" + Bytes.toString(CellUtil.cloneRow(key)) +"]", equals(row, CellUtil.cloneRow(key))); @@ -3262,8 +3290,7 @@ private void assertKey(Cell key, byte [] row, byte [] family, } static void assertIncrementKey(Cell key, byte [] row, byte [] family, - byte [] qualifier, long value) - throws Exception { + byte [] qualifier, long value) { assertTrue("Expected row [" + Bytes.toString(row) + "] " + "Got row [" + Bytes.toString(CellUtil.cloneRow(key)) +"]", equals(row, CellUtil.cloneRow(key))); @@ -3284,9 +3311,7 @@ private void assertNumKeys(Result result, int n) throws Exception { } private void assertNResult(Result result, byte [] row, - byte [][] families, byte [][] qualifiers, byte [][] values, - int [][] idxs) - throws Exception { + byte [][] families, byte [][] qualifiers, byte [][] values, int [][] idxs) { assertTrue("Expected row [" + Bytes.toString(row) + "] " + "Got row [" + Bytes.toString(result.getRow()) +"]", equals(row, result.getRow())); @@ -3318,8 +3343,7 @@ private void assertNResult(Result result, byte [] row, private void assertNResult(Result result, byte [] row, byte [] family, byte [] qualifier, long [] stamps, byte [][] values, - int start, int end) - throws IOException { + int start, int end) { assertTrue("Expected row [" + Bytes.toString(row) + "] " + "Got row [" + Bytes.toString(result.getRow()) +"]", equals(row, result.getRow())); @@ -3353,8 +3377,7 @@ private void assertNResult(Result result, byte [] row, */ private void assertDoubleResult(Result result, byte [] row, byte [] familyA, byte [] qualifierA, byte [] valueA, - byte [] familyB, byte [] qualifierB, byte [] valueB) - throws Exception { + byte [] familyB, byte [] qualifierB, byte [] valueB) { assertTrue("Expected row [" + Bytes.toString(row) + "] " + "Got row [" + Bytes.toString(result.getRow()) +"]", equals(row, result.getRow())); @@ -3384,8 +3407,7 @@ private void assertDoubleResult(Result result, byte [] row, } private void assertSingleResult(Result result, byte [] row, byte [] family, - byte [] qualifier, byte [] value) - throws Exception { + byte [] qualifier, byte [] value) { assertTrue("Expected row [" + Bytes.toString(row) + "] " + "Got row [" + Bytes.toString(result.getRow()) +"]", equals(row, result.getRow())); @@ -3423,8 +3445,7 @@ private void assertSingleResult(Result result, byte[] row, byte[] family, byte[] } private void assertSingleResult(Result result, byte [] row, byte [] family, - byte [] qualifier, long ts, byte [] value) - throws Exception { + byte [] qualifier, long ts, byte [] value) { assertTrue("Expected row [" + Bytes.toString(row) + "] " + "Got row [" + Bytes.toString(result.getRow()) +"]", equals(row, result.getRow())); @@ -3515,7 +3536,7 @@ static boolean equals(byte [] left, byte [] right) { @Test public void testDuplicateVersions() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); long [] STAMPS = makeStamps(20); byte [][] VALUES = makeNAscii(VALUE, 20); @@ -3738,7 +3759,7 @@ public void testDuplicateVersions() throws Exception { @Test public void testUpdates() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table hTable = TEST_UTIL.createTable(tableName, FAMILY, 10)) { // Write a column with values at timestamp 1, 2 and 3 @@ -3788,7 +3809,7 @@ public void testUpdates() throws Exception { @Test public void testUpdatesWithMajorCompaction() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table hTable = TEST_UTIL.createTable(tableName, FAMILY, 10); Admin admin = TEST_UTIL.getAdmin()) { @@ -3849,7 +3870,7 @@ public void testUpdatesWithMajorCompaction() throws Exception { @Test public void testMajorCompactionBetweenTwoUpdates() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table hTable = TEST_UTIL.createTable(tableName, FAMILY, 10); Admin admin = TEST_UTIL.getAdmin()) { @@ -3916,7 +3937,7 @@ public void testMajorCompactionBetweenTwoUpdates() throws Exception { @Test public void testGet_EmptyTable() throws IOException { - try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) { + try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) { Get get = new Get(ROW); get.addFamily(FAMILY); Result r = table.get(get); @@ -3926,7 +3947,7 @@ public void testGet_EmptyTable() throws IOException { @Test public void testGet_NullQualifier() throws IOException { - try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) { + try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) { Put put = new Put(ROW); put.addColumn(FAMILY, QUALIFIER, VALUE); table.put(put); @@ -3950,7 +3971,7 @@ public void testGet_NullQualifier() throws IOException { @Test public void testGet_NonExistentRow() throws IOException { - try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) { + try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) { Put put = new Put(ROW); put.addColumn(FAMILY, QUALIFIER, VALUE); table.put(put); @@ -3978,7 +3999,7 @@ public void testPut() throws IOException { final byte [] row1 = Bytes.toBytes("row1"); final byte [] row2 = Bytes.toBytes("row2"); final byte [] value = Bytes.toBytes("abcd"); - try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), + try (Table table = TEST_UTIL.createTable(name.getTableName(), new byte[][] { CONTENTS_FAMILY, SMALL_FAMILY })) { Put put = new Put(row1); put.addColumn(CONTENTS_FAMILY, null, value); @@ -4017,7 +4038,7 @@ public void testPut() throws IOException { public void testPutNoCF() throws IOException { final byte[] BAD_FAM = Bytes.toBytes("BAD_CF"); final byte[] VAL = Bytes.toBytes(100); - try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) { + try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) { boolean caughtNSCFE = false; try { @@ -4037,7 +4058,7 @@ public void testRowsPut() throws IOException { final byte[] SMALL_FAMILY = Bytes.toBytes("smallfam"); final int NB_BATCH_ROWS = 10; final byte[] value = Bytes.toBytes("abcd"); - try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), + try (Table table = TEST_UTIL.createTable(name.getTableName(), new byte[][] {CONTENTS_FAMILY, SMALL_FAMILY })) { ArrayList rowsUpdate = new ArrayList(); for (int i = 0; i < NB_BATCH_ROWS; i++) { @@ -4067,7 +4088,7 @@ public void testRowsPutBufferedManyManyFlushes() throws IOException { final byte[] SMALL_FAMILY = Bytes.toBytes("smallfam"); final byte[] value = Bytes.toBytes("abcd"); final int NB_BATCH_ROWS = 10; - try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), + try (Table table = TEST_UTIL.createTable(name.getTableName(), new byte[][] { CONTENTS_FAMILY, SMALL_FAMILY })) { ArrayList rowsUpdate = new ArrayList(); for (int i = 0; i < NB_BATCH_ROWS * 10; i++) { @@ -4130,7 +4151,7 @@ public void testHBase737 () throws IOException { final byte [] FAM1 = Bytes.toBytes("fam1"); final byte [] FAM2 = Bytes.toBytes("fam2"); // Open table - try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), + try (Table table = TEST_UTIL.createTable(name.getTableName(), new byte [][] {FAM1, FAM2})) { // Insert some values Put put = new Put(ROW); @@ -4213,9 +4234,10 @@ public void testHBase737 () throws IOException { @Test public void testListTables() throws IOException, InterruptedException { - final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); - final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); - final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3"); + final String testTableName = name.getTableName().toString(); + final TableName tableName1 = TableName.valueOf(testTableName + "1"); + final TableName tableName2 = TableName.valueOf(testTableName + "2"); + final TableName tableName3 = TableName.valueOf(testTableName + "3"); TableName [] tables = new TableName[] { tableName1, tableName2, tableName3 }; for (int i = 0; i < tables.length; i++) { TEST_UTIL.createTable(tables[i], FAMILY); @@ -4244,7 +4266,7 @@ public void testListTables() throws IOException, InterruptedException { */ @Test public void testUnmanagedHConnection() throws IOException { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY); try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); Table t = conn.getTable(tableName); @@ -4260,7 +4282,13 @@ public void testUnmanagedHConnection() throws IOException { */ @Test public void testUnmanagedHConnectionReconnect() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + Configuration conf = TEST_UTIL.getConfiguration(); + Class registryImpl = conf.getClass( + HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class); + // This test does not make sense for MasterRegistry since it stops the only master in the + // cluster and starts a new master without populating the underlying config for the connection. + Assume.assumeFalse(registryImpl.equals(MasterRegistry.class)); + final TableName tableName = name.getTableName(); TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY); try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { try (Table t = conn.getTable(tableName); Admin admin = conn.getAdmin()) { @@ -4290,8 +4318,9 @@ public void testUnmanagedHConnectionReconnect() throws Exception { @Test public void testMiscHTableStuff() throws IOException { - final TableName tableAname = TableName.valueOf(name.getMethodName() + "A"); - final TableName tableBname = TableName.valueOf(name.getMethodName() + "B"); + final String testTableName = name.getTableName().toString(); + final TableName tableAname = TableName.valueOf(testTableName + "A"); + final TableName tableBname = TableName.valueOf(testTableName + "B"); final byte[] attrName = Bytes.toBytes("TESTATTR"); final byte[] attrValue = Bytes.toBytes("somevalue"); byte[] value = Bytes.toBytes("value"); @@ -4340,8 +4369,9 @@ public void testMiscHTableStuff() throws IOException { // add a user attribute to HTD desc.setValue(attrName, attrValue); // add a user attribute to HCD - for (HColumnDescriptor c : desc.getFamilies()) + for (HColumnDescriptor c : desc.getFamilies()) { c.setValue(attrName, attrValue); + } // update metadata for all regions of this table admin.modifyTable(desc); // enable the table @@ -4365,7 +4395,7 @@ public void testMiscHTableStuff() throws IOException { @Test public void testGetClosestRowBefore() throws IOException, InterruptedException { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); final byte[] firstRow = Bytes.toBytes("row111"); final byte[] secondRow = Bytes.toBytes("row222"); final byte[] thirdRow = Bytes.toBytes("row333"); @@ -4492,7 +4522,7 @@ public void testScanVariableReuse() throws Exception { @Test public void testMultiRowMutation() throws Exception { LOG.info("Starting testMultiRowMutation"); - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); final byte [] ROW1 = Bytes.toBytes("testRow1"); try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { @@ -4524,7 +4554,7 @@ public void testMultiRowMutation() throws Exception { @Test public void testRowMutation() throws Exception { LOG.info("Starting testRowMutation"); - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { byte[][] QUALIFIERS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b") }; RowMutations arm = new RowMutations(ROW); @@ -4574,7 +4604,7 @@ public void testRowMutation() throws Exception { @Test public void testBatchAppendWithReturnResultFalse() throws Exception { LOG.info("Starting testBatchAppendWithReturnResultFalse"); - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { Append append1 = new Append(Bytes.toBytes("row1")); append1.setReturnResults(false); @@ -4598,7 +4628,7 @@ public void testBatchAppendWithReturnResultFalse() throws Exception { @Test public void testAppend() throws Exception { LOG.info("Starting testAppend"); - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { byte[] v1 = Bytes.toBytes("42"); byte[] v2 = Bytes.toBytes("23"); @@ -4690,7 +4720,7 @@ public void testAppendWithoutWAL() throws Exception { @Test public void testClientPoolRoundRobin() throws IOException { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); int poolSize = 3; int numVersions = poolSize * 2; @@ -4728,7 +4758,7 @@ public void testClientPoolRoundRobin() throws IOException { @Ignore ("Flakey: HBASE-8989") @Test public void testClientPoolThreadLocal() throws IOException { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); int poolSize = Integer.MAX_VALUE; int numVersions = 3; @@ -4814,7 +4844,7 @@ public void testCheckAndPut() throws IOException { final byte [] anotherrow = Bytes.toBytes("anotherrow"); final byte [] value2 = Bytes.toBytes("abcd"); - try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) { + try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) { Put put1 = new Put(ROW); put1.addColumn(FAMILY, QUALIFIER, VALUE); @@ -4852,7 +4882,7 @@ public void testCheckAndPut() throws IOException { @Test public void testCheckAndMutateWithTimeRange() throws IOException { - try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) { + try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) { final long ts = System.currentTimeMillis() / 2; Put put = new Put(ROW); put.addColumn(FAMILY, QUALIFIER, ts, VALUE); @@ -4948,7 +4978,7 @@ public void testCheckAndPutWithCompareOp() throws IOException { final byte [] value3 = Bytes.toBytes("cccc"); final byte [] value4 = Bytes.toBytes("dddd"); - try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) { + try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) { Put put2 = new Put(ROW); put2.addColumn(FAMILY, QUALIFIER, value2); @@ -5030,7 +5060,7 @@ public void testCheckAndPutWithCompareOp() throws IOException { public void testCheckAndDelete() throws IOException { final byte [] value1 = Bytes.toBytes("aaaa"); - try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), + try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) { Put put = new Put(ROW); @@ -5053,7 +5083,7 @@ public void testCheckAndDeleteWithCompareOp() throws IOException { final byte [] value3 = Bytes.toBytes("cccc"); final byte [] value4 = Bytes.toBytes("dddd"); - try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), + try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) { Put put2 = new Put(ROW); @@ -5143,9 +5173,9 @@ public void testCheckAndDeleteWithCompareOp() throws IOException { * Test ScanMetrics */ @Test - @SuppressWarnings ("unused") + @SuppressWarnings({"unused", "checkstyle:EmptyBlock"}) public void testScanMetrics() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); // Set up test table: // Create table: @@ -5198,7 +5228,6 @@ public void testScanMetrics() throws Exception { // the end of the scanner. So this is asking for 2 of the 3 rows we inserted. for (Result result : scanner.next(numRecords - 1)) { } - ScanMetrics scanMetrics = scanner.getScanMetrics(); assertEquals("Did not access all the regions in the table", numOfRegions, scanMetrics.countOfRegions.get()); @@ -5279,7 +5308,7 @@ public void testScanMetrics() throws Exception { */ @Test public void testCacheOnWriteEvictOnClose() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [] data = Bytes.toBytes("data"); try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) { @@ -5403,7 +5432,7 @@ private void waitForStoreFileCount(HStore store, int count, int timeout) */ public void testNonCachedGetRegionLocation() throws Exception { // Test Initialization. - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [] family1 = Bytes.toBytes("f1"); byte [] family2 = Bytes.toBytes("f2"); try (Table table = TEST_UTIL.createTable(tableName, new byte[][] {family1, family2}, 10); @@ -5452,7 +5481,7 @@ public void testGetRegionsInRange() throws Exception { // Test Initialization. byte [] startKey = Bytes.toBytes("ddc"); byte [] endKey = Bytes.toBytes("mmm"); - TableName tableName = TableName.valueOf(name.getMethodName()); + TableName tableName = name.getTableName(); TEST_UTIL.createMultiRegionTable(tableName, new byte[][] { FAMILY }, 10); int numOfRegions = -1; @@ -5522,7 +5551,7 @@ private List getRegionsInRange(TableName tableName, byte[] star @Test public void testJira6912() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table foo = TEST_UTIL.createTable(tableName, new byte[][] {FAMILY}, 10)) { List puts = new ArrayList(); @@ -5551,7 +5580,7 @@ public void testJira6912() throws Exception { @Test public void testScan_NullQualifier() throws IOException { - try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) { + try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) { Put put = new Put(ROW); put.addColumn(FAMILY, QUALIFIER, VALUE); table.put(put); @@ -5581,7 +5610,7 @@ public void testScan_NullQualifier() throws IOException { @Test public void testNegativeTimestamp() throws IOException { - try (Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY)) { + try (Table table = TEST_UTIL.createTable(name.getTableName(), FAMILY)) { try { Put put = new Put(ROW, -1); @@ -5642,7 +5671,7 @@ public void testNegativeTimestamp() throws IOException { @Test public void testRawScanRespectsVersions() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { byte[] row = Bytes.toBytes("row"); @@ -5716,7 +5745,7 @@ public void testRawScanRespectsVersions() throws Exception { @Test public void testEmptyFilterList() throws Exception { // Test Initialization. - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { // Insert one row each region @@ -5756,7 +5785,7 @@ public void testEmptyFilterList() throws Exception { @Test public void testSmallScan() throws Exception { // Test Initialization. - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { // Insert one row each region @@ -5794,7 +5823,7 @@ public void testSmallScan() throws Exception { @Test public void testSuperSimpleWithReverseScan() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { Put put = new Put(Bytes.toBytes("0-b11111-0000000000000000000")); put.addColumn(FAMILY, QUALIFIER, VALUE); @@ -5839,7 +5868,7 @@ public void testSuperSimpleWithReverseScan() throws Exception { @Test public void testFiltersWithReverseScan() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { byte[][] ROWS = makeN(ROW, 10); byte[][] QUALIFIERS = {Bytes.toBytes("col0--"), @@ -5882,7 +5911,7 @@ public void testFiltersWithReverseScan() throws Exception { @Test public void testKeyOnlyFilterWithReverseScan() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { byte[][] ROWS = makeN(ROW, 10); byte[][] QUALIFIERS = {Bytes.toBytes("col0--"), @@ -5923,7 +5952,7 @@ public void testKeyOnlyFilterWithReverseScan() throws Exception { */ @Test public void testSimpleMissingWithReverseScan() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { byte[][] ROWS = makeN(ROW, 4); @@ -5988,7 +6017,7 @@ public void testSimpleMissingWithReverseScan() throws Exception { @Test public void testNullWithReverseScan() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table ht = TEST_UTIL.createTable(tableName, FAMILY)) { // Null qualifier (should work) Put put = new Put(ROW); @@ -6001,7 +6030,8 @@ public void testNullWithReverseScan() throws Exception { } // Use a new table - try (Table ht = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName() + "2"), FAMILY)) { + try (Table ht = + TEST_UTIL.createTable(TableName.valueOf(name.getTableName().toString() + "2"), FAMILY)) { // Empty qualifier, byte[0] instead of null (should work) Put put = new Put(ROW); put.addColumn(FAMILY, HConstants.EMPTY_BYTE_ARRAY, VALUE); @@ -6027,7 +6057,7 @@ public void testNullWithReverseScan() throws Exception { @Test @SuppressWarnings("checkstyle:MethodLength") public void testDeletesWithReverseScan() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte[][] ROWS = makeNAscii(ROW, 6); byte[][] FAMILIES = makeNAscii(FAMILY, 3); byte[][] VALUES = makeN(VALUE, 5); @@ -6213,7 +6243,7 @@ public void testDeletesWithReverseScan() throws Exception { @Test public void testReversedScanUnderMultiRegions() throws Exception { // Test Initialization. - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte[] maxByteArray = ConnectionUtils.MAX_BYTE_ARRAY; byte[][] splitRows = new byte[][] { Bytes.toBytes("005"), Bytes.add(Bytes.toBytes("005"), Bytes.multiple(maxByteArray, 16)), @@ -6274,7 +6304,7 @@ public void testReversedScanUnderMultiRegions() throws Exception { @Test public void testSmallReversedScanUnderMultiRegions() throws Exception { // Test Initialization. - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte[][] splitRows = new byte[][]{ Bytes.toBytes("000"), Bytes.toBytes("002"), Bytes.toBytes("004"), Bytes.toBytes("006"), Bytes.toBytes("008"), Bytes.toBytes("010")}; @@ -6494,7 +6524,7 @@ public void testCellSizeLimit() throws IOException { @Test public void testDeleteSpecifiedVersionOfSpecifiedColumn() throws Exception { try (Admin admin = TEST_UTIL.getAdmin()) { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte[][] VALUES = makeN(VALUE, 5); long[] ts = {1000, 2000, 3000, 4000, 5000}; @@ -6540,7 +6570,7 @@ public void testDeleteSpecifiedVersionOfSpecifiedColumn() throws Exception { @Test public void testDeleteLatestVersionOfSpecifiedColumn() throws Exception { try (Admin admin = TEST_UTIL.getAdmin()) { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte[][] VALUES = makeN(VALUE, 5); long[] ts = {1000, 2000, 3000, 4000, 5000}; @@ -6603,7 +6633,7 @@ public void testDeleteLatestVersionOfSpecifiedColumn() throws Exception { @Test public void testReadWithFilter() throws Exception { try (Admin admin = TEST_UTIL.getAdmin()) { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 3)) { byte[] VALUEA = Bytes.toBytes("value-a"); @@ -6690,7 +6720,7 @@ public void testReadWithFilter() throws Exception { @Test public void testCellUtilTypeMethods() throws IOException { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { final byte[] row = Bytes.toBytes("p"); @@ -6744,7 +6774,7 @@ public void testCellUtilTypeMethods() throws IOException { @Test(expected = DoNotRetryIOException.class) public void testCreateTableWithZeroRegionReplicas() throws Exception { - TableName tableName = TableName.valueOf(name.getMethodName()); + TableName tableName = name.getTableName(); TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) .setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("cf"))) .setRegionReplication(0) @@ -6755,7 +6785,7 @@ public void testCreateTableWithZeroRegionReplicas() throws Exception { @Test(expected = DoNotRetryIOException.class) public void testModifyTableWithZeroRegionReplicas() throws Exception { - TableName tableName = TableName.valueOf(name.getMethodName()); + TableName tableName = name.getTableName(); TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) .setColumnFamily(ColumnFamilyDescriptorBuilder.of(Bytes.toBytes("cf"))) .build(); @@ -6770,13 +6800,13 @@ public void testModifyTableWithZeroRegionReplicas() throws Exception { @Test(timeout = 60000) public void testModifyTableWithMemstoreData() throws Exception { - TableName tableName = TableName.valueOf(name.getMethodName()); + TableName tableName = name.getTableName(); createTableAndValidateTableSchemaModification(tableName, true); } @Test(timeout = 60000) public void testDeleteCFWithMemstoreData() throws Exception { - TableName tableName = TableName.valueOf(name.getMethodName()); + TableName tableName = name.getTableName(); createTableAndValidateTableSchemaModification(tableName, false); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java index 37d0135fb76f..8845f9adacb3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideWithCoprocessor.java @@ -17,14 +17,16 @@ */ package org.apache.hadoop.hbase.client; +import java.util.Arrays; +import java.util.Collection; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; import org.apache.hadoop.hbase.regionserver.NoOpScanPolicyObserver; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.experimental.categories.Category; +import org.junit.runners.Parameterized; /** * Test all client operations with a coprocessor that just implements the default flush/compact/scan @@ -32,13 +34,24 @@ */ @Category({ LargeTests.class, ClientTests.class }) public class TestFromClientSideWithCoprocessor extends TestFromClientSide { - @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestFromClientSideWithCoprocessor.class); - @BeforeClass - public static void setUpBeforeClass() throws Exception { - initialize(MultiRowMutationEndpoint.class, NoOpScanPolicyObserver.class); + // Override the parameters from the parent class. We just want to run it for the default + // param combination. + @Parameterized.Parameters + public static Collection parameters() { + return Arrays.asList(new Object[][] { + { ZKConnectionRegistry.class, 1} + }); + } + + public TestFromClientSideWithCoprocessor(Class registry, int numHedgedReqs) throws Exception { + if (TEST_UTIL == null) { + // It is ok to initialize once because the test is parameterized for a single dimension. + initialize(registry, numHedgedReqs, NoOpScanPolicyObserver.class, + MultiRowMutationEndpoint.class); + } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java new file mode 100644 index 000000000000..335f96814e0c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM; +import static org.junit.Assert.assertEquals; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.StartMiniClusterOption; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestMasterRegistry { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMasterRegistry.class); + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.getConfiguration().setInt(META_REPLICAS_NUM, 3); + StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder(); + builder.numMasters(3).numRegionServers(3); + TEST_UTIL.startMiniCluster(builder.build()); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Generates a string of dummy master addresses in host:port format. Every other hostname won't + * have a port number. + */ + private static String generateDummyMastersList(int size) { + List masters = new ArrayList<>(); + for (int i = 0; i < size; i++) { + masters.add(" localhost" + (i % 2 == 0 ? ":" + (1000 + i) : "")); + } + return String.join(",", masters); + } + + /** + * Makes sure the master registry parses the master end points in the configuration correctly. + */ + @Test public void testMasterAddressParsing() { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + int numMasters = 10; + conf.set(HConstants.MASTER_ADDRS_KEY, generateDummyMastersList(numMasters)); + try (MasterRegistry registry = new MasterRegistry(conf)) { + List parsedMasters = new ArrayList<>(registry.getParsedMasterServers()); + // Half of them would be without a port, duplicates are removed. + assertEquals(numMasters/2 + 1, parsedMasters.size()); + // Sort in the increasing order of port numbers. + Collections.sort(parsedMasters, Comparator.comparingInt(ServerName::getPort)); + for (int i = 0; i < parsedMasters.size(); i++) { + ServerName sn = parsedMasters.get(i); + assertEquals("localhost", sn.getHostname()); + if (i == parsedMasters.size() - 1) { + // Last entry should be the one with default port. + assertEquals(HConstants.DEFAULT_MASTER_PORT, sn.getPort()); + } else { + assertEquals(1000 + (2 * i), sn.getPort()); + } + } + } + } + + @Test public void testRegistryRPCs() throws Exception { + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster(); + for (int numHedgedReqs = 1; numHedgedReqs <=3; numHedgedReqs++) { + if (numHedgedReqs == 1) { + conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false); + } else { + conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true); + } + conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs); + try (MasterRegistry registry = new MasterRegistry(conf)) { + assertEquals(registry.getClusterId().get(), activeMaster.getClusterId()); + assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName()); + List metaLocations = + Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations()); + List actualMetaLocations = activeMaster.getMetaRegionLocationCache() + .getMetaRegionLocations().get(); + Collections.sort(metaLocations); + Collections.sort(actualMetaLocations); + assertEquals(actualMetaLocations, metaLocations); + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index c3da5875b370..796ebb31dc65 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -29,9 +29,10 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -44,35 +45,38 @@ import org.apache.hadoop.hbase.HTestConst; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.filter.FilterBase; -import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.StartMiniClusterOption; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.TestTableName; +import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; import org.apache.hadoop.hbase.filter.ColumnRangeFilter; +import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.junit.After; import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; /** - * A client-side test, mostly testing scanners with various parameters. + * A client-side test, mostly testing scanners with various parameters. Parameterized on different + * registry implementations. */ @Category({MediumTests.class, ClientTests.class}) +@RunWith(Parameterized.class) public class TestScannersFromClientSide { @ClassRule @@ -81,38 +85,80 @@ public class TestScannersFromClientSide { private static final Logger LOG = LoggerFactory.getLogger(TestScannersFromClientSide.class); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static HBaseTestingUtility TEST_UTIL; private static byte [] ROW = Bytes.toBytes("testRow"); private static byte [] FAMILY = Bytes.toBytes("testFamily"); private static byte [] QUALIFIER = Bytes.toBytes("testQualifier"); private static byte [] VALUE = Bytes.toBytes("testValue"); @Rule - public TestName name = new TestName(); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - Configuration conf = TEST_UTIL.getConfiguration(); - conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024); - TEST_UTIL.startMiniCluster(3); - } + public TestTableName name = new TestTableName(); @AfterClass public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); + if (TEST_UTIL != null) { + TEST_UTIL.shutdownMiniCluster(); + } } - @Before - public void setUp() throws Exception { - // Nothing to do. + @Parameterized.Parameters + public static Collection parameters() { + return Arrays.asList(new Object[][] { + { MasterRegistry.class, 1}, + { MasterRegistry.class, 2}, + { ZKConnectionRegistry.class, 1} + }); } /** - * @throws java.lang.Exception + * JUnit does not provide an easy way to run a hook after each parameterized run. Without that + * there is no easy way to restart the test cluster after each parameterized run. Annotation + * BeforeParam does not work either because it runs before parameterization and hence does not + * have access to the test parameters (which is weird). + * + * This *hack* checks if the current instance of test cluster configuration has the passed + * parameterized configs. In such a case, we can just reuse the cluster for test and do not need + * to initialize from scratch. While this is a hack, it saves a ton of time for the full + * test and de-flakes it. */ - @After - public void tearDown() throws Exception { - // Nothing to do. + private static boolean isSameParameterizedCluster(Class registryImpl, int numHedgedReqs) { + // initialize() is called for every unit test, however we only want to reset the cluster state + // at the end of every parameterized run. + if (TEST_UTIL == null) { + return false; + } + Configuration conf = TEST_UTIL.getConfiguration(); + Class confClass = conf.getClass( + HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class); + int hedgedReqConfig = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, + HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT); + return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig; + } + + public TestScannersFromClientSide(Class registryImpl, int numHedgedReqs) throws Exception { + if (isSameParameterizedCluster(registryImpl, numHedgedReqs)) { + return; + } + if (TEST_UTIL != null) { + // We reached the end of a parameterized run, clean up the cluster. + TEST_UTIL.shutdownMiniCluster(); + } + TEST_UTIL = new HBaseTestingUtility(); + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 10 * 1024 * 1024); + conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, registryImpl, + ConnectionRegistry.class); + if (numHedgedReqs == 1) { + conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, false); + } else { + Preconditions.checkArgument(numHedgedReqs > 1); + conf.setBoolean(HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY, true); + } + conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, numHedgedReqs); + StartMiniClusterOption.Builder builder = StartMiniClusterOption.builder(); + // Multiple masters needed only when hedged reads for master registry are enabled. + builder.numMasters(numHedgedReqs > 1 ? 3 : 1).numRegionServers(3); + TEST_UTIL.startMiniCluster(builder.build()); } /** @@ -120,7 +166,7 @@ public void tearDown() throws Exception { */ @Test public void testScanBatch() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 8); Table ht = TEST_UTIL.createTable(tableName, FAMILY); @@ -190,7 +236,7 @@ public void testScanBatch() throws Exception { @Test public void testMaxResultSizeIsSetToDefault() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); Table ht = TEST_UTIL.createTable(tableName, FAMILY); // The max result size we expect the scan to use by default. @@ -259,7 +305,7 @@ public void testScannerForNotExistingTable() { @Test public void testSmallScan() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); int numRows = 10; byte[][] ROWS = HTestConst.makeNAscii(ROW, numRows); @@ -292,7 +338,8 @@ public void testSmallScan() throws Exception { /** * Run through a variety of test configurations with a small scan */ - private void testSmallScan(Table table, boolean reversed, int rows, int columns) throws Exception { + private void testSmallScan( + Table table, boolean reversed, int rows, int columns) throws Exception { Scan baseScan = new Scan(); baseScan.setReversed(reversed); baseScan.setSmall(true); @@ -334,7 +381,7 @@ private void verifyExpectedCounts(Table table, Scan scan, int expectedRowCount, */ @Test public void testGetMaxResults() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); @@ -408,8 +455,8 @@ public void testGetMaxResults() throws Exception { kvListExp.add(new KeyValue(ROW, FAMILIES[1], QUALIFIERS[i], 1, VALUE)); } for (int i=0; i < 2; i++) { - kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); - } + kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); + } for (int i=10; i < 20; i++) { kvListExp.add(new KeyValue(ROW, FAMILIES[2], QUALIFIERS[i], 1, VALUE)); } @@ -452,7 +499,7 @@ public void testGetMaxResults() throws Exception { */ @Test public void testScanMaxResults() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [][] ROWS = HTestConst.makeNAscii(ROW, 2); byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 10); @@ -500,7 +547,7 @@ public void testScanMaxResults() throws Exception { */ @Test public void testGetRowOffset() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [][] FAMILIES = HTestConst.makeNAscii(FAMILY, 3); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 20); @@ -519,7 +566,9 @@ public void testGetRowOffset() throws Exception { KeyValue kv = new KeyValue(ROW, FAMILIES[0], QUALIFIERS[i], 1, VALUE); put.add(kv); // skipping first two kvs - if (i < 2) continue; + if (i < 2) { + continue; + } kvListExp.add(kv); } ht.put(put); @@ -590,7 +639,7 @@ public void testGetRowOffset() throws Exception { @Test public void testScanRawDeleteFamilyVersion() throws Exception { - TableName tableName = TableName.valueOf(name.getMethodName()); + TableName tableName = name.getTableName(); TEST_UTIL.createTable(tableName, FAMILY); Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.set(RPC_CODEC_CONF_KEY, ""); @@ -618,7 +667,7 @@ public void testScanRawDeleteFamilyVersion() throws Exception { */ @Test public void testScanOnReopenedRegion() throws Exception { - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = name.getTableName(); byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2); Table ht = TEST_UTIL.createTable(tableName, FAMILY); @@ -693,8 +742,9 @@ static void verifyResult(Result result, List expKvList, boolean toLog, LOG.info(msg); LOG.info("Expected count: " + expKvList.size()); LOG.info("Actual count: " + result.size()); - if (expKvList.isEmpty()) + if (expKvList.isEmpty()) { return; + } int i = 0; for (Cell kv : result.rawCells()) { @@ -715,7 +765,7 @@ static void verifyResult(Result result, List expKvList, boolean toLog, @Test public void testReadExpiredDataForRawScan() throws IOException { - TableName tableName = TableName.valueOf(name.getMethodName()); + TableName tableName = name.getTableName(); long ts = System.currentTimeMillis() - 10000; byte[] value = Bytes.toBytes("expired"); try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { @@ -735,7 +785,7 @@ public void testReadExpiredDataForRawScan() throws IOException { @Test public void testScanWithColumnsAndFilterAndVersion() throws IOException { - TableName tableName = TableName.valueOf(name.getMethodName()); + TableName tableName = name.getTableName(); try (Table table = TEST_UTIL.createTable(tableName, FAMILY, 4)) { for (int i = 0; i < 4; i++) { Put put = new Put(ROW); @@ -757,7 +807,7 @@ public void testScanWithColumnsAndFilterAndVersion() throws IOException { @Test public void testScanWithSameStartRowStopRow() throws IOException { - TableName tableName = TableName.valueOf(name.getMethodName()); + TableName tableName = name.getTableName(); try (Table table = TEST_UTIL.createTable(tableName, FAMILY)) { table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); @@ -794,7 +844,7 @@ public void testScanWithSameStartRowStopRow() throws IOException { @Test public void testReverseScanWithFlush() throws Exception { - TableName tableName = TableName.valueOf(name.getMethodName()); + TableName tableName = name.getTableName(); final int BATCH_SIZE = 10; final int ROWS_TO_INSERT = 100; final byte[] LARGE_VALUE = generateHugeValue(128 * 1024); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index ac0d35613dca..2797df34b81c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -30,22 +30,31 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.internal.verification.VerificationModeFactory.times; - import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.util.StringUtils; +import org.junit.Assume; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto; @@ -54,14 +63,6 @@ import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.hadoop.util.StringUtils; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; /** * Some basic ipc tests. @@ -232,7 +233,6 @@ public void testRpcMaxRequestSize() throws IOException, ServiceException { /** * Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null * remoteAddress set to its Call Object - * @throws ServiceException */ @Test public void testRpcServerForNotNullRemoteAddressInCallObject() @@ -363,6 +363,104 @@ public void testAsyncEcho() throws IOException { } } + /** + * Tests the various request fan out values using a simple RPC hedged across a mix of running and + * failing servers. + */ + @Test + public void testHedgedAsyncEcho() throws Exception { + // Hedging is not supported for blocking connection types. + Assume.assumeFalse(this instanceof TestBlockingIPC); + List rpcServers = new ArrayList<>(); + List addresses = new ArrayList<>(); + // Create a mix of running and failing servers. + final int numRunningServers = 5; + final int numFailingServers = 3; + final int numServers = numRunningServers + numFailingServers; + for (int i = 0; i < numRunningServers; i++) { + RpcServer rpcServer = createRpcServer(null, "testRpcServer" + i, + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( + SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, + new FifoRpcScheduler(CONF, 1)); + rpcServer.start(); + addresses.add(rpcServer.getListenerAddress()); + rpcServers.add(rpcServer); + } + for (int i = 0; i < numFailingServers; i++) { + RpcServer rpcServer = createTestFailingRpcServer(null, "testFailingRpcServer" + i, + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( + SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, + new FifoRpcScheduler(CONF, 1)); + rpcServer.start(); + addresses.add(rpcServer.getListenerAddress()); + rpcServers.add(rpcServer); + } + Configuration conf = HBaseConfiguration.create(); + try (AbstractRpcClient client = createRpcClient(conf)) { + // Try out various fan out values starting from 1 -> numServers. + for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) { + // Update the client's underlying conf, should be ok for the test. + LOG.debug("Testing with request fan out: " + reqFanOut); + conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut); + Interface stub = newStub(client, addresses); + BlockingRpcCallback done = new BlockingRpcCallback<>(); + stub.echo(new HBaseRpcControllerImpl(), + EchoRequestProto.newBuilder().setMessage("hello").build(), done); + TestProtos.EchoResponseProto responseProto = done.get(); + assertNotNull(responseProto); + assertEquals("hello", responseProto.getMessage()); + LOG.debug("Ended test with request fan out: " + reqFanOut); + } + } finally { + for (RpcServer rpcServer: rpcServers) { + rpcServer.stop(); + } + } + } + + @Test + public void testHedgedAsyncTimeouts() throws Exception { + // Hedging is not supported for blocking connection types. + Assume.assumeFalse(this instanceof TestBlockingIPC); + List rpcServers = new ArrayList<>(); + List addresses = new ArrayList<>(); + final int numServers = 3; + for (int i = 0; i < numServers; i++) { + RpcServer rpcServer = createRpcServer(null, "testTimeoutRpcServer" + i, + Lists.newArrayList(new RpcServer.BlockingServiceAndInterface( + SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, + new FifoRpcScheduler(CONF, 1)); + rpcServer.start(); + addresses.add(rpcServer.getListenerAddress()); + rpcServers.add(rpcServer); + } + Configuration conf = HBaseConfiguration.create(); + int timeout = 100; + int pauseTime = 1000; + try (AbstractRpcClient client = createRpcClient(conf)) { + // Try out various fan out values starting from 1 -> numServers. + for (int reqFanOut = 1; reqFanOut <= numServers; reqFanOut++) { + // Update the client's underlying conf, should be ok for the test. + LOG.debug("Testing with request fan out: " + reqFanOut); + conf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, reqFanOut); + Interface stub = newStub(client, addresses); + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); + pcrc.setCallTimeout(timeout); + BlockingRpcCallback callback = new BlockingRpcCallback<>(); + stub.pause(pcrc, PauseRequestProto.newBuilder().setMs(pauseTime).build(), callback); + assertNull(callback.get()); + // Make sure the controller has the right exception propagated. + assertTrue(pcrc.getFailed() instanceof CallTimeoutException); + LOG.debug("Ended test with request fan out: " + reqFanOut); + } + } finally { + for (RpcServer rpcServer: rpcServers) { + rpcServer.stop(); + } + } + } + + @Test public void testAsyncRemoteError() throws IOException { AbstractRpcClient client = createRpcClient(CONF); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java index d8a2d348ff9c..6adfa4602ee4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtobufRpcServiceImpl.java @@ -17,21 +17,23 @@ */ package org.apache.hadoop.hbase.ipc; -import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; -import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; - import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; - +import java.util.Set; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.Threads; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.AddrResponseProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; @@ -41,8 +43,6 @@ import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.Threads; @InterfaceAudience.Private public class TestProtobufRpcServiceImpl implements BlockingInterface { @@ -67,6 +67,17 @@ public static Interface newStub(RpcClient client, InetSocketAddress addr) throws User.getCurrent(), 0)); } + public static Interface newStub(RpcClient client, List addrs) + throws IOException { + Set serverNames = new HashSet<>(); + for (InetSocketAddress addr: addrs) { + serverNames.add(ServerName.valueOf( + addr.getHostName(), addr.getPort(), System.currentTimeMillis())); + } + return TestProtobufRpcProto.newStub(client.createHedgedRpcChannel( + serverNames, User.getCurrent(), 0)); + } + @Override public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) throws ServiceException {