Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDFS-13522. IPC changes to support observer reads through routers. #4441

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public interface AlignmentContext {
void updateResponseState(RpcResponseHeaderProto.Builder header);

/**
* This is the intended client method call to implement to recieve state info
* This is the intended client method call to implement to receive state info
* during RPC response processing.
*
* @param header The RPC response header.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public class ConnectionManager {

/** Queue for creating new connections. */
private final BlockingQueue<ConnectionPool> creatorQueue;
private final Map<String, RouterGSIContext> alignmentContexts;
private volatile boolean enableObserverRead;
/** Max size of queue for creating new connections. */
private final int creatorQueueMaxSize;

Expand Down Expand Up @@ -125,6 +127,12 @@ public ConnectionManager(Configuration config) {
RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT);
LOG.info("Cleaning connections every {} seconds",
TimeUnit.MILLISECONDS.toSeconds(this.connectionCleanupPeriodMs));

this.alignmentContexts = new HashMap<>();

this.enableObserverRead = this.conf.getBoolean(
RBFConfigKeys.DFS_ROUTER_ENABLE_OBSERVER_READ_KEY,
RBFConfigKeys.DFS_ROUTER_ENABLE_OBSERVER_READ_DEFAULT);
}

/**
Expand Down Expand Up @@ -172,11 +180,13 @@ public void close() {
* @param ugi User group information.
* @param nnAddress Namenode address for the connection.
* @param protocol Protocol for the connection.
* @param nsId Nameservice Identify.
* @return Proxy client to connect to nnId as UGI.
* @throws IOException If the connection cannot be obtained.
*/
public ConnectionContext getConnection(UserGroupInformation ugi,
String nnAddress, Class<?> protocol) throws IOException {
String nnAddress, Class<?> protocol, String nsId)
throws IOException {

// Check if the manager is shutdown
if (!this.running) {
Expand All @@ -203,9 +213,14 @@ public ConnectionContext getConnection(UserGroupInformation ugi,
try {
pool = this.pools.get(connectionId);
if (pool == null) {
RouterGSIContext gsiContext = this.alignmentContexts.get(nsId);
if (gsiContext == null) {
gsiContext = new RouterGSIContext(this.enableObserverRead);
this.alignmentContexts.put(nsId, gsiContext);
}
pool = new ConnectionPool(
this.conf, nnAddress, ugi, this.minSize, this.maxSize,
this.minActiveRatio, protocol);
this.minActiveRatio, protocol, gsiContext);
this.pools.put(connectionId, pool);
}
} finally {
Expand All @@ -231,6 +246,18 @@ public ConnectionContext getConnection(UserGroupInformation ugi,
return conn;
}

/**
* Dynamically reconfigure the enableObserverRead.
*/
public void reconfEnableObserverRead(boolean enableObserverRead) {
readLock.lock();
this.enableObserverRead = enableObserverRead;
for (RouterGSIContext routerGSIContext : alignmentContexts.values()) {
routerGSIContext.setEnableObserverRead(enableObserverRead);
}
readLock.unlock();
}

Comment on lines +250 to +260
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to reconfigure observer reads dynamically?

/**
* Get the number of connection pools.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
Expand Down Expand Up @@ -105,6 +106,8 @@ public class ConnectionPool {
/** The last time a connection was active. */
private volatile long lastActiveTime = 0;

private final AlignmentContext alignmentContext;

/** Map for the protocols and their protobuf implementations. */
private final static Map<Class<?>, ProtoImpl> PROTO_MAP = new HashMap<>();
static {
Expand Down Expand Up @@ -134,7 +137,8 @@ private static class ProtoImpl {

protected ConnectionPool(Configuration config, String address,
UserGroupInformation user, int minPoolSize, int maxPoolSize,
float minActiveRatio, Class<?> proto) throws IOException {
float minActiveRatio, Class<?> proto,
AlignmentContext alignmentContext) throws IOException {

this.conf = config;

Expand All @@ -150,6 +154,8 @@ protected ConnectionPool(Configuration config, String address,
this.maxSize = maxPoolSize;
this.minActiveRatio = minActiveRatio;

this.alignmentContext = alignmentContext;

// Add minimum connections to the pool
for (int i=0; i<this.minSize; i++) {
ConnectionContext newConnection = newConnection();
Expand Down Expand Up @@ -394,7 +400,8 @@ public String getJSON() {
*/
public ConnectionContext newConnection() throws IOException {
return newConnection(
this.conf, this.namenodeAddress, this.ugi, this.protocol);
this.conf, this.namenodeAddress, this.ugi, this.protocol,
this.alignmentContext);
}

/**
Expand All @@ -413,8 +420,8 @@ public ConnectionContext newConnection() throws IOException {
* @throws IOException If it cannot be created.
*/
protected static <T> ConnectionContext newConnection(Configuration conf,
String nnAddress, UserGroupInformation ugi, Class<T> proto)
throws IOException {
String nnAddress, UserGroupInformation ugi, Class<T> proto,
AlignmentContext alignmentContext) throws IOException {
if (!PROTO_MAP.containsKey(proto)) {
String msg = "Unsupported protocol for connection to NameNode: "
+ ((proto != null) ? proto.getName() : "null");
Expand All @@ -438,14 +445,14 @@ protected static <T> ConnectionContext newConnection(Configuration conf,
InetSocketAddress socket = NetUtils.createSocketAddr(nnAddress);
final long version = RPC.getProtocolVersion(classes.protoPb);
Object proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi,
conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy();
conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null,
alignmentContext).getProxy();
T client = newProtoClient(proto, classes, proxy);
Text dtService = SecurityUtil.buildTokenService(socket);

ProxyAndInfo<T> clientProxy =
new ProxyAndInfo<T>(client, dtService, socket);
ConnectionContext connection = new ConnectionContext(clientProxy);
return connection;
return new ConnectionContext(clientProxy);
}

private static <T> T newProtoClient(Class<T> proto, ProtoImpl classes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final long DFS_ROUTER_NAMENODE_CONNECTION_CLEAN_MS_DEFAULT =
TimeUnit.SECONDS.toMillis(10);

public static final String DFS_ROUTER_ENABLE_OBSERVER_READ_KEY =
FEDERATION_ROUTER_PREFIX + "enable.observer.read";
public static final boolean DFS_ROUTER_ENABLE_OBSERVER_READ_DEFAULT = false;

// HDFS Router RPC client
public static final String DFS_ROUTER_CLIENT_THREADS_SIZE =
FEDERATION_ROUTER_PREFIX + "client.thread-size";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdfs.server.federation.router;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.ClientGSIContext;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;

/**
* Global State ID context for the router.
* <p>
* This is the router side implementation responsible for receiving
* state alignment info from server(s).
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RouterGSIContext extends ClientGSIContext {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the alignment between routers and namenode can be taken care of with just ClientGSIContext.
How does the router not updating the lastSeenStateID when communicating with the namenode help?

private volatile boolean enableObserverRead = false;

public RouterGSIContext(boolean enableObserverRead) {
super();
setEnableObserverRead(enableObserverRead);
}

public void setEnableObserverRead(boolean enableObserverRead) {
this.enableObserverRead = enableObserverRead;
}


/**
* Router side implementation for providing state alignment info in requests.
*/
@Override
public void updateRequestState(RpcRequestHeaderProto.Builder header) {
if (enableObserverRead) {
super.updateRequestState(header);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ private ConnectionContext getConnection(UserGroupInformation ugi, String nsId,
ugi.getUserName(), routerUser);
}
connection = this.connectionManager.getConnection(
connUGI, rpcAddress, proto);
connUGI, rpcAddress, proto, nsId);
LOG.debug("User {} NN {} is using connection {}",
ugi.getUserName(), rpcAddress, connection);
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@
</description>
</property>

<property>
<name>dfs.federation.router.enable.observer.read</name>
<value>false</value>
<description>
Enable observer read for client with router.
</description>
</property>

<property>
<name>dfs.federation.router.dn-report.time-out</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
throw new IOException("Simulate connectionManager throw IOException");
}
}).when(spyConnectionManager).getConnection(
any(UserGroupInformation.class), any(String.class), any(Class.class));
any(UserGroupInformation.class), any(String.class), any(Class.class),
any(String.class));

Whitebox.setInternalState(rpcClient, "connectionManager",
spyConnectionManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ public void testCleanup() throws Exception {
Map<ConnectionPoolId, ConnectionPool> poolMap = connManager.getPools();

ConnectionPool pool1 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class);
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class, null);
addConnectionsToPool(pool1, 9, 4);
poolMap.put(
new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
pool1);

ConnectionPool pool2 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, 0.5f, ClientProtocol.class);
conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, 0.5f, ClientProtocol.class, null);
addConnectionsToPool(pool2, 10, 10);
poolMap.put(
new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS, ClientProtocol.class),
Expand All @@ -111,7 +111,7 @@ public void testCleanup() throws Exception {

// Make sure the number of connections doesn't go below minSize
ConnectionPool pool3 = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, 0.5f, ClientProtocol.class);
conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, 0.5f, ClientProtocol.class, null);
addConnectionsToPool(pool3, 8, 0);
poolMap.put(
new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS, ClientProtocol.class),
Expand All @@ -136,7 +136,7 @@ public void testConnectionCreatorWithException() throws Exception {
// Create a bad connection pool pointing to unresolvable namenode address.
ConnectionPool badPool = new ConnectionPool(
conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f,
ClientProtocol.class);
ClientProtocol.class, null);
BlockingQueue<ConnectionPool> queue = new ArrayBlockingQueue<>(1);
queue.add(badPool);
ConnectionManager.ConnectionCreator connectionCreator =
Expand All @@ -162,7 +162,7 @@ public void testGetConnectionWithException() throws Exception {
// Create a bad connection pool pointing to unresolvable namenode address.
ConnectionPool badPool = new ConnectionPool(
conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 1, 10, 0.5f,
ClientProtocol.class);
ClientProtocol.class, null);
}

@Test
Expand All @@ -172,7 +172,7 @@ public void testGetConnection() throws Exception {
int activeConns = 5;

ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class);
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class, null);
addConnectionsToPool(pool, totalConns, activeConns);
poolMap.put(
new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class),
Expand All @@ -197,7 +197,7 @@ public void testGetConnection() throws Exception {
@Test
public void testValidClientIndex() throws Exception {
ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, 0.5f, ClientProtocol.class);
conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, 0.5f, ClientProtocol.class, null);
for(int i = -3; i <= 3; i++) {
pool.getClientIndex().set(i);
ConnectionContext conn = pool.getConnection();
Expand All @@ -213,7 +213,7 @@ public void getGetConnectionNamenodeProtocol() throws Exception {
int activeConns = 5;

ConnectionPool pool = new ConnectionPool(
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, NamenodeProtocol.class);
conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, NamenodeProtocol.class, null);
addConnectionsToPool(pool, totalConns, activeConns);
poolMap.put(
new ConnectionPoolId(
Expand Down Expand Up @@ -286,7 +286,7 @@ private void testConnectionCleanup(float ratio, int totalConns,

// Create one new connection pool
tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS,
NamenodeProtocol.class);
NamenodeProtocol.class, "mockNS");

Map<ConnectionPoolId, ConnectionPool> poolMap = tmpConnManager.getPools();
ConnectionPoolId connectionPoolId = new ConnectionPoolId(TEST_USER1,
Expand Down Expand Up @@ -317,6 +317,6 @@ public void testUnsupportedProtoExceptionMsg() throws Exception {
"Unsupported protocol for connection to NameNode: "
+ TestConnectionManager.class.getName(),
() -> ConnectionPool.newConnection(conf, TEST_NN_ADDRESS, TEST_USER1,
TestConnectionManager.class));
TestConnectionManager.class, null));
}
}