Skip to content

HBASE-23604: Clarify AsyncRegistry usage in the code. #957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class AsyncConnectionImpl implements AsyncConnection {

private final User user;

final AsyncRegistry registry;
final ConnectionRegistry registry;

private final int rpcTimeout;

Expand Down Expand Up @@ -122,7 +122,7 @@ class AsyncConnectionImpl implements AsyncConnection {

private volatile ConnectionOverAsyncConnection conn;

public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId,
SocketAddress localAddress, User user) {
this.conf = conf;
this.user = user;
Expand All @@ -136,7 +136,8 @@ public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String cl
} else {
this.metrics = Optional.empty();
}
this.rpcClient = RpcClientFactory.createClient(conf, clusterId, localAddress, metrics.orElse(null));
this.rpcClient = RpcClientFactory.createClient(
conf, clusterId, localAddress, metrics.orElse(null));
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
this.rpcTimeout =
Expand Down Expand Up @@ -257,7 +258,7 @@ AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
CompletableFuture<MasterService.Interface> getMasterStub() {
return ConnectionUtils.getOrFetch(masterStub, masterStubMakeFuture, false, () -> {
CompletableFuture<MasterService.Interface> future = new CompletableFuture<>();
addListener(registry.getMasterAddress(), (addr, error) -> {
addListener(registry.getActiveMaster(), (addr, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else if (addr == null) {
Expand Down Expand Up @@ -368,7 +369,7 @@ public Connection toConnection() {
@Override
public CompletableFuture<Hbck> getHbck() {
CompletableFuture<Hbck> future = new CompletableFuture<>();
addListener(registry.getMasterAddress(), (sn, error) -> {
addListener(registry.getActiveMaster(), (sn, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@
@InterfaceAudience.Private
class AsyncMetaRegionLocator {

private final AsyncRegistry registry;
private final ConnectionRegistry registry;

private final AtomicReference<RegionLocations> metaRegionLocations = new AtomicReference<>();

private final AtomicReference<CompletableFuture<RegionLocations>> metaRelocateFuture =
new AtomicReference<>();

AsyncMetaRegionLocator(AsyncRegistry registry) {
AsyncMetaRegionLocator(ConnectionRegistry registry) {
this.registry = registry;
}

Expand All @@ -60,7 +60,7 @@ class AsyncMetaRegionLocator {
*/
CompletableFuture<RegionLocations> getRegionLocations(int replicaId, boolean reload) {
return ConnectionUtils.getOrFetch(metaRegionLocations, metaRelocateFuture, reload,
registry::getMetaRegionLocation, locs -> isGood(locs, replicaId), "meta region location");
registry::getMetaRegionLocations, locs -> isGood(locs, replicaId), "meta region location");
}

private HRegionLocation getCacheLocation(HRegionLocation loc) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public CompletableFuture<HRegionLocation> getRegionLocation(byte[] row, int repl
@Override
public CompletableFuture<List<HRegionLocation>> getAllRegionLocations() {
if (TableName.isMetaTableName(tableName)) {
return conn.registry.getMetaRegionLocation()
return conn.registry.getMetaRegionLocations()
.thenApply(locs -> Arrays.asList(locs.getRegionLocations()));
}
return AsyncMetaTableAccessor.getTableHRegionLocations(conn.getTable(TableName.META_TABLE_NAME),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ public static CompletableFuture<AsyncConnection> createAsyncConnection(Configura
public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
final User user) {
CompletableFuture<AsyncConnection> future = new CompletableFuture<>();
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf);
addListener(registry.getClusterId(), (clusterId, error) -> {
if (error != null) {
registry.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@
import org.apache.yetus.audience.InterfaceAudience;

/**
* Implementations hold cluster information such as this cluster's id, location of hbase:meta, etc..
* Registry for meta information needed for connection setup to a HBase cluster. Implementations
* hold cluster information such as this cluster's id, location of hbase:meta, etc..
* Internal use only.
*/
@InterfaceAudience.Private
interface AsyncRegistry extends Closeable {
interface ConnectionRegistry extends Closeable {

/**
* Get the location of meta region.
* Get the location of meta region(s).
*/
CompletableFuture<RegionLocations> getMetaRegionLocation();
CompletableFuture<RegionLocations> getMetaRegionLocations();

/**
* Should only be called once.
Expand All @@ -43,9 +44,9 @@ interface AsyncRegistry extends Closeable {
CompletableFuture<String> getClusterId();

/**
* Get the address of HMaster.
* Get the address of active HMaster.
*/
CompletableFuture<ServerName> getMasterAddress();
CompletableFuture<ServerName> getActiveMaster();

/**
* Closes this instance and releases any system resources associated with it
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -18,26 +18,28 @@
package org.apache.hadoop.hbase.client;

import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Get instance of configured Registry.
* Factory class to get the instance of configured connection registry.
*/
@InterfaceAudience.Private
final class AsyncRegistryFactory {
final class ConnectionRegistryFactory {

static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";
static final String CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY =
"hbase.client.connection.registry.impl";

private AsyncRegistryFactory() {
private ConnectionRegistryFactory() {
}

/**
* @return The cluster registry implementation to use.
* @return The connection registry implementation to use.
*/
static AsyncRegistry getRegistry(Configuration conf) {
Class<? extends AsyncRegistry> clazz =
conf.getClass(REGISTRY_IMPL_CONF_KEY, ZKAsyncRegistry.class, AsyncRegistry.class);
static ConnectionRegistry getRegistry(Configuration conf) {
Class<? extends ConnectionRegistry> clazz = conf.getClass(
CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class,
ConnectionRegistry.class);
return ReflectionUtils.newInstance(clazz, conf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ public CompletableFuture<Boolean> isTableDisabled(TableName tableName) {
@Override
public CompletableFuture<Boolean> isTableAvailable(TableName tableName) {
if (TableName.isMetaTableName(tableName)) {
return connection.registry.getMetaRegionLocation().thenApply(locs -> Stream
return connection.registry.getMetaRegionLocations().thenApply(locs -> Stream
.of(locs.getRegionLocations()).allMatch(loc -> loc != null && loc.getServerName() != null));
}
CompletableFuture<Boolean> future = new CompletableFuture<>();
Expand Down Expand Up @@ -847,7 +847,7 @@ public CompletableFuture<List<RegionInfo>> getRegions(ServerName serverName) {
@Override
public CompletableFuture<List<RegionInfo>> getRegions(TableName tableName) {
if (tableName.equals(META_TABLE_NAME)) {
return connection.registry.getMetaRegionLocation()
return connection.registry.getMetaRegionLocations()
.thenApply(locs -> Stream.of(locs.getRegionLocations()).map(HRegionLocation::getRegion)
.collect(Collectors.toList()));
} else {
Expand Down Expand Up @@ -1075,8 +1075,9 @@ private CompletableFuture<List<HRegionLocation>> getTableHRegionLocations(TableN
if (TableName.META_TABLE_NAME.equals(tableName)) {
CompletableFuture<List<HRegionLocation>> future = new CompletableFuture<>();
// For meta table, we use zk to fetch all locations.
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(connection.getConfiguration());
addListener(registry.getMetaRegionLocation(), (metaRegions, err) -> {
ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(
connection.getConfiguration());
addListener(registry.getMetaRegionLocations(), (metaRegions, err) -> {
if (err != null) {
future.completeExceptionally(err);
} else if (metaRegions == null || metaRegions.isEmpty() ||
Expand Down Expand Up @@ -1104,7 +1105,7 @@ private CompletableFuture<Void> compact(TableName tableName, byte[] columnFamily

switch (compactType) {
case MOB:
addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
Expand Down Expand Up @@ -2343,7 +2344,7 @@ CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedR
String encodedName = Bytes.toString(regionNameOrEncodedRegionName);
if (encodedName.length() < RegionInfo.MD5_HEX_LENGTH) {
// old format encodedName, should be meta region
future = connection.registry.getMetaRegionLocation()
future = connection.registry.getMetaRegionLocations()
.thenApply(locs -> Stream.of(locs.getRegionLocations())
.filter(loc -> loc.getRegion().getEncodedName().equals(encodedName)).findFirst());
} else {
Expand All @@ -2354,7 +2355,7 @@ CompletableFuture<HRegionLocation> getRegionLocation(byte[] regionNameOrEncodedR
RegionInfo regionInfo =
MetaTableAccessor.parseRegionInfoFromRegionName(regionNameOrEncodedRegionName);
if (regionInfo.isMetaRegion()) {
future = connection.registry.getMetaRegionLocation()
future = connection.registry.getMetaRegionLocations()
.thenApply(locs -> Stream.of(locs.getRegionLocations())
.filter(loc -> loc.getRegion().getReplicaId() == regionInfo.getReplicaId())
.findFirst());
Expand Down Expand Up @@ -2927,7 +2928,7 @@ public CompletableFuture<CompactionState> getCompactionState(TableName tableName

switch (compactType) {
case MOB:
addListener(connection.registry.getMasterAddress(), (serverName, err) -> {
addListener(connection.registry.getActiveMaster(), (serverName, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@
* Zookeeper based registry implementation.
*/
@InterfaceAudience.Private
class ZKAsyncRegistry implements AsyncRegistry {
class ZKConnectionRegistry implements ConnectionRegistry {

private static final Logger LOG = LoggerFactory.getLogger(ZKAsyncRegistry.class);
private static final Logger LOG = LoggerFactory.getLogger(ZKConnectionRegistry.class);

private final ReadOnlyZKClient zk;

private final ZNodePaths znodePaths;

ZKAsyncRegistry(Configuration conf) {
ZKConnectionRegistry(Configuration conf) {
this.znodePaths = new ZNodePaths(conf);
this.zk = new ReadOnlyZKClient(conf);
}
Expand Down Expand Up @@ -93,7 +93,7 @@ private static String getClusterId(byte[] data) throws DeserializationException

@Override
public CompletableFuture<String> getClusterId() {
return getAndConvert(znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId);
return getAndConvert(znodePaths.clusterIdZNode, ZKConnectionRegistry::getClusterId);
}

@VisibleForTesting
Expand Down Expand Up @@ -144,7 +144,7 @@ private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
int replicaId = znodePaths.getMetaReplicaIdFromZnode(metaReplicaZNode);
String path = ZNodePaths.joinZNode(znodePaths.baseZNode, metaReplicaZNode);
if (replicaId == DEFAULT_REPLICA_ID) {
addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
Expand All @@ -162,7 +162,7 @@ private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
tryComplete(remaining, locs, future);
});
} else {
addListener(getAndConvert(path, ZKAsyncRegistry::getMetaProto), (proto, error) -> {
addListener(getAndConvert(path, ZKConnectionRegistry::getMetaProto), (proto, error) -> {
if (future.isDone()) {
return;
}
Expand Down Expand Up @@ -191,7 +191,7 @@ private void getMetaRegionLocation(CompletableFuture<RegionLocations> future,
}

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocation() {
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
addListener(
zk.list(znodePaths.baseZNode)
Expand All @@ -217,8 +217,8 @@ private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOExcep
}

@Override
public CompletableFuture<ServerName> getMasterAddress() {
return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
public CompletableFuture<ServerName> getActiveMaster() {
return getAndConvert(znodePaths.masterAddressZNode, ZKConnectionRegistry::getMasterProto)
.thenApply(proto -> {
if (proto == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
* Registry that does nothing. Otherwise, default Registry wants zookeeper up and running.
*/
@InterfaceAudience.Private
class DoNothingAsyncRegistry implements AsyncRegistry {
class DoNothingConnectionRegistry implements ConnectionRegistry {

public DoNothingAsyncRegistry(Configuration conf) {
public DoNothingConnectionRegistry(Configuration conf) {
}

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocation() {
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return CompletableFuture.completedFuture(null);
}

Expand All @@ -43,7 +43,7 @@ public CompletableFuture<String> getClusterId() {
}

@Override
public CompletableFuture<ServerName> getMasterAddress() {
public CompletableFuture<ServerName> getActiveMaster() {
return CompletableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
}).when(adminStub).stopServer(any(HBaseRpcController.class), any(StopServerRequest.class),
any());

conn = new AsyncConnectionImpl(CONF, new DoNothingAsyncRegistry(CONF), "test", null,
conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", null,
UserProvider.instantiate(CONF).getCurrent()) {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -43,21 +43,21 @@ public class TestAsyncMetaRegionLocatorFailFast {

private static AsyncMetaRegionLocator LOCATOR;

private static final class FaultyAsyncRegistry extends DoNothingAsyncRegistry {
private static final class FaultyConnectionRegistry extends DoNothingConnectionRegistry {

public FaultyAsyncRegistry(Configuration conf) {
public FaultyConnectionRegistry(Configuration conf) {
super(conf);
}

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocation() {
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return FutureUtils.failedFuture(new DoNotRetryRegionException("inject error"));
}
}

@BeforeClass
public static void setUp() {
LOCATOR = new AsyncMetaRegionLocator(new FaultyAsyncRegistry(CONF));
LOCATOR = new AsyncMetaRegionLocator(new FaultyConnectionRegistry(CONF));
}

@Test(expected = DoNotRetryIOException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
return null;
}
}).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
conn = new AsyncConnectionImpl(CONF, new DoNothingAsyncRegistry(CONF), "test", null,
conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", null,
UserProvider.instantiate(CONF).getCurrent()) {

@Override
Expand Down
Loading