Skip to content

Commit cac2837

Browse files
author
Steve Vaughan Jr
committed
Use a stable hashCode to allow safe IP addr changes
1 parent 1f1fdcf commit cac2837

File tree

2 files changed

+97
-22
lines changed
  • hadoop-common-project/hadoop-common/src

2 files changed

+97
-22
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -419,8 +419,7 @@ public synchronized Writable getRpcResponse() {
419419
* socket: responses may be delivered out of order. */
420420
private class Connection extends Thread {
421421
private InetSocketAddress server; // server ip:port
422-
// This remoteId needs to be mutable in order to handle updated addresses
423-
private ConnectionId remoteId; // connection id
422+
private final ConnectionId remoteId; // connection id
424423
private AuthMethod authMethod; // authentication method
425424
private AuthProtocol authProtocol;
426425
private int serviceClass;
@@ -647,10 +646,8 @@ private synchronized boolean updateAddress() throws IOException {
647646
" New: " + currentAddr.toString());
648647
server = currentAddr;
649648
// Update the remote address so that reconnections are with the updated address.
650-
// This avoids thrashing. We remove the old connection and then replace the remoteId
651-
// because it is an immutable class that is used as a key in the connections map.
652-
removeMethod.accept(this);
653-
remoteId = ConnectionId.updateAddress(remoteId, currentAddr);
649+
// This avoids thrashing.
650+
remoteId.setAddress(currentAddr);
654651
UserGroupInformation ticket = remoteId.getTicket();
655652
this.setName("IPC Client (" + socketFactory.hashCode()
656653
+ ") connection to " + server.toString() + " from "
@@ -1706,7 +1703,7 @@ private Connection getConnection(ConnectionId remoteId,
17061703
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
17071704
@InterfaceStability.Evolving
17081705
public static class ConnectionId {
1709-
private final InetSocketAddress address;
1706+
private InetSocketAddress address;
17101707
private final UserGroupInformation ticket;
17111708
private final Class<?> protocol;
17121709
private static final int PRIME = 16777619;
@@ -1724,20 +1721,6 @@ public static class ConnectionId {
17241721
private String saslQop; // here for testing
17251722
private final Configuration conf; // used to get the expected kerberos principal name
17261723

1727-
/**
1728-
* Creates a new identifier with an updated address, maintaining all other settings. This is
1729-
* used to update the remote address when an address change is detected.
1730-
*
1731-
* @param original the identifier that will be replaced
1732-
* @param address the updated address
1733-
* @return a replacement identifier
1734-
* @see Connection#updateAddress()
1735-
*/
1736-
private static ConnectionId updateAddress(ConnectionId original, InetSocketAddress address) {
1737-
return new ConnectionId(address, original.protocol, original.ticket, original.rpcTimeout,
1738-
original.connectionRetryPolicy, original.conf);
1739-
}
1740-
17411724
public ConnectionId(InetSocketAddress address, Class<?> protocol,
17421725
UserGroupInformation ticket, int rpcTimeout,
17431726
RetryPolicy connectionRetryPolicy, Configuration conf) {
@@ -1774,6 +1757,27 @@ InetSocketAddress getAddress() {
17741757
return address;
17751758
}
17761759

1760+
/**
1761+
* This is used to update the remote address when an address change is detected. This method
1762+
* ensures that the {@link #hashCode()} won't change.
1763+
*
1764+
* @param address the updated address
1765+
* @throws IllegalArgumentException if the hostname or port doesn't match
1766+
* @see Connection#updateAddress()
1767+
*/
1768+
void setAddress(InetSocketAddress address) {
1769+
if (!Objects.equals(this.address.getHostName(), address.getHostName())) {
1770+
throw new IllegalArgumentException("Hostname must match: " + this.address + " vs "
1771+
+ address);
1772+
}
1773+
if (this.address.getPort() != address.getPort()) {
1774+
throw new IllegalArgumentException("Port must match: " + this.address + " vs " + address);
1775+
}
1776+
1777+
this.address = address;
1778+
}
1779+
1780+
17771781
Class<?> getProtocol() {
17781782
return protocol;
17791783
}
@@ -1884,7 +1888,11 @@ && isEqual(this.protocol, that.protocol)
18841888
@Override
18851889
public int hashCode() {
18861890
int result = connectionRetryPolicy.hashCode();
1887-
result = PRIME * result + ((address == null) ? 0 : address.hashCode());
1891+
// We calculate based on the host name and port without the IP address, since the hashCode
1892+
// must be stable even if the IP address is updated.
1893+
result = PRIME * result + ((address == null || address.getHostName() == null) ? 0 :
1894+
address.getHostName().hashCode());
1895+
result = PRIME * result + ((address == null) ? 0 : address.getPort());
18881896
result = PRIME * result + (doPing ? 1231 : 1237);
18891897
result = PRIME * result + maxIdleTime;
18901898
result = PRIME * result + pingInterval;

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.ipc;
2020

21+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2122
import static org.junit.Assert.assertEquals;
2223
import static org.junit.Assert.assertFalse;
2324
import static org.junit.Assert.assertNotNull;
@@ -93,6 +94,7 @@
9394
import org.apache.hadoop.test.LambdaTestUtils;
9495
import org.apache.hadoop.test.Whitebox;
9596
import org.apache.hadoop.util.StringUtils;
97+
import org.assertj.core.api.Condition;
9698
import org.junit.Assert;
9799
import org.junit.Assume;
98100
import org.junit.Before;
@@ -815,6 +817,71 @@ public Void call() throws IOException {
815817
}
816818
}
817819

820+
/**
821+
* The {@link ConnectionId#hashCode} has to be stable despite updates that occur as the the
822+
* address evolves over time. The {@link ConnectionId} is used as a primary key in maps, so
823+
* its hashCode can't change.
824+
*
825+
* @throws IOException if there is a client or server failure
826+
*/
827+
@Test
828+
public void testStableHashCode() throws IOException {
829+
Server server = new TestServer(5, false);
830+
try {
831+
server.start();
832+
833+
// Leave host unresolved to start. Use "localhost" as opposed
834+
// to local IP from NetUtils.getConnectAddress(server) to force
835+
// resolution later
836+
InetSocketAddress unresolvedAddr = InetSocketAddress.createUnresolved(
837+
"localhost", NetUtils.getConnectAddress(server).getPort());
838+
839+
// Setup: Create a ConnectionID using an unresolved address, and get it's hashCode to serve
840+
// as a point of comparison.
841+
int rpcTimeout = MIN_SLEEP_TIME * 2;
842+
final ConnectionId remoteId = getConnectionId(unresolvedAddr, rpcTimeout, conf);
843+
int expected = remoteId.hashCode();
844+
845+
// Start client
846+
Client.setConnectTimeout(conf, 100);
847+
Client client = new Client(LongWritable.class, conf);
848+
try {
849+
// Test: Call should re-resolve host and succeed
850+
LongWritable param = new LongWritable(RANDOM.nextLong());
851+
client.call(RPC.RpcKind.RPC_BUILTIN, param, remoteId,
852+
RPC.RPC_SERVICE_CLASS_DEFAULT, null);
853+
int actual = remoteId.hashCode();
854+
855+
// Verify: The hashCode should match, although the InetAddress is different since it has
856+
// now been resolved
857+
assertThat(remoteId.getAddress()).isNotEqualTo(unresolvedAddr);
858+
assertThat(remoteId.getAddress().getHostName()).isEqualTo(unresolvedAddr.getHostName());
859+
assertThat(remoteId.hashCode()).isEqualTo(expected);
860+
861+
// Verify: The hashCode is protected against updates to the host name
862+
String hostName = InetAddress.getLocalHost().getHostName();
863+
InetSocketAddress mismatchedHostName = NetUtils.createSocketAddr(
864+
InetAddress.getLocalHost().getHostName(),
865+
remoteId.getAddress().getPort());
866+
assertThatExceptionOfType(IllegalArgumentException.class)
867+
.isThrownBy(() -> remoteId.setAddress(mismatchedHostName))
868+
.withMessageStartingWith("Hostname must match");
869+
870+
// Verify: The hashCode is protected against updates to the port
871+
InetSocketAddress mismatchedPort = NetUtils.createSocketAddr(
872+
remoteId.getAddress().getHostName(),
873+
remoteId.getAddress().getPort() + 1);
874+
assertThatExceptionOfType(IllegalArgumentException.class)
875+
.isThrownBy(() -> remoteId.setAddress(mismatchedPort))
876+
.withMessageStartingWith("Port must match");
877+
} finally {
878+
client.stop();
879+
}
880+
} finally {
881+
server.stop();
882+
}
883+
}
884+
818885
@Test(timeout=60000)
819886
public void testIpcFlakyHostResolution() throws IOException {
820887
// start server

0 commit comments

Comments
 (0)