@@ -419,7 +419,8 @@ public synchronized Writable getRpcResponse() {
419419 * socket: responses may be delivered out of order. */
420420 private class Connection extends Thread {
421421 private InetSocketAddress server ; // server ip:port
422- private final ConnectionId remoteId ; // connection id
422+ // This remoteId needs to be mutable in order to handle updated addresses
423+ private ConnectionId remoteId ; // connection id
423424 private AuthMethod authMethod ; // authentication method
424425 private AuthProtocol authProtocol ;
425426 private int serviceClass ;
@@ -645,9 +646,11 @@ private synchronized boolean updateAddress() throws IOException {
645646 LOG .warn ("Address change detected. Old: " + server .toString () +
646647 " New: " + currentAddr .toString ());
647648 server = currentAddr ;
648- // Update the remote address so that reconnections are with the updated address. This
649- // avoids thrashing.
650- remoteId .setAddress (currentAddr );
649+ // Update the remote address so that reconnections are with the updated address.
650+ // This avoids thrashing. We remove the old connection and then replace the remoteId
651+ // because it is an immutable class that is used as a key in the connections map.
652+ removeMethod .accept (this );
653+ remoteId = ConnectionId .updateAddress (remoteId , currentAddr );
651654 UserGroupInformation ticket = remoteId .getTicket ();
652655 this .setName ("IPC Client (" + socketFactory .hashCode ()
653656 + ") connection to " + server .toString () + " from "
@@ -1720,7 +1723,21 @@ public static class ConnectionId {
17201723 private final int pingInterval ; // how often sends ping to the server in msecs
17211724 private String saslQop ; // here for testing
17221725 private final Configuration conf ; // used to get the expected kerberos principal name
1723-
1726+
1727+ /**
1728+ * Creates a new identifier with an updated address, maintaining all other settings. This is
1729+ * used to update the remote address when an address change is detected.
1730+ *
1731+ * @param original the identifier that will be replaced
1732+ * @param address the updated address
1733+ * @return a replacement identifier
1734+ * @see Connection#updateAddress()
1735+ */
1736+ private static ConnectionId updateAddress (ConnectionId original , InetSocketAddress address ) {
1737+ return new ConnectionId (address , original .protocol , original .ticket , original .rpcTimeout ,
1738+ original .connectionRetryPolicy , original .conf );
1739+ }
1740+
17241741 public ConnectionId (InetSocketAddress address , Class <?> protocol ,
17251742 UserGroupInformation ticket , int rpcTimeout ,
17261743 RetryPolicy connectionRetryPolicy , Configuration conf ) {
@@ -1757,15 +1774,6 @@ InetSocketAddress getAddress() {
17571774 return address ;
17581775 }
17591776
1760- /**
1761- * Used to update the remote address when an address change is detected.
1762- *
1763- * @param address the new address
1764- */
1765- private void setAddress (InetSocketAddress address ) {
1766- this .address = address ;
1767- }
1768-
17691777 Class <?> getProtocol () {
17701778 return protocol ;
17711779 }
0 commit comments