Skip to content

Validate build hash in handshake #65732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 108 additions & 16 deletions server/src/main/java/org/elasticsearch/transport/TransportService.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
Expand All @@ -34,6 +35,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -68,10 +70,27 @@
import java.util.function.Predicate;
import java.util.function.Supplier;

public class TransportService extends AbstractLifecycleComponent implements ReportingService<TransportInfo>, TransportMessageListener,
TransportConnectionListener {
public class TransportService extends AbstractLifecycleComponent
implements ReportingService<TransportInfo>, TransportMessageListener, TransportConnectionListener {

private static final Logger logger = LogManager.getLogger(TransportService.class);

private static final String PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY = "es.unsafely_permit_handshake_from_incompatible_builds";
private static final boolean PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS;

static {
final String value = System.getProperty(PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY);
if (value == null) {
PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS = false;
} else if (Boolean.parseBoolean(value)) {
PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS = true;
} else {
throw new IllegalArgumentException("invalid value [" + value + "] for system property ["
+ PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "]");
}
}


public static final String DIRECT_RESPONSE_PROFILE = ".direct";
public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake";

Expand Down Expand Up @@ -182,7 +201,14 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa
false, false,
HandshakeRequest::new,
(request, channel, task) -> channel.sendResponse(
new HandshakeResponse(localNode, clusterName, localNode.getVersion())));
new HandshakeResponse(localNode.getVersion(), Build.CURRENT.hash(), localNode, clusterName)));

if (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS) {
logger.warn("transport handshakes from incompatible builds are unsafely permitted on this node; remove system property [" +
PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] to resolve this warning");
DeprecationLogger.getLogger(TransportService.class).deprecate("permit_handshake_from_incompatible_builds",
"system property [" + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] is deprecated and should be removed");
}
}

public RemoteClusterService getRemoteClusterService() {
Expand Down Expand Up @@ -440,8 +466,8 @@ public void onResponse(HandshakeResponse response) {
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
, HandshakeResponse::new, ThreadPool.Names.GENERIC
},
HandshakeResponse::new, ThreadPool.Names.GENERIC
));
}

Expand All @@ -463,28 +489,89 @@ private HandshakeRequest() {
}

public static class HandshakeResponse extends TransportResponse {

private static final Version BUILD_HASH_HANDSHAKE_VERSION = Version.V_8_0_0;

private final Version version;

@Nullable // if version < BUILD_HASH_HANDSHAKE_VERSION
private final String buildHash;

private final DiscoveryNode discoveryNode;

private final ClusterName clusterName;
private final Version version;

public HandshakeResponse(DiscoveryNode discoveryNode, ClusterName clusterName, Version version) {
this.discoveryNode = discoveryNode;
this.version = version;
this.clusterName = clusterName;
public HandshakeResponse(Version version, String buildHash, DiscoveryNode discoveryNode, ClusterName clusterName) {
this.buildHash = Objects.requireNonNull(buildHash);
this.discoveryNode = Objects.requireNonNull(discoveryNode);
this.version = Objects.requireNonNull(version);
this.clusterName = Objects.requireNonNull(clusterName);
}

public HandshakeResponse(StreamInput in) throws IOException {
super(in);
discoveryNode = in.readOptionalWriteable(DiscoveryNode::new);
clusterName = new ClusterName(in);
version = Version.readVersion(in);
if (in.getVersion().onOrAfter(BUILD_HASH_HANDSHAKE_VERSION)) {
// the first two fields need only VInts and raw (ASCII) characters, so we cross our fingers and hope that they appear
// on the wire as we expect them to even if this turns out to be an incompatible build
version = Version.readVersion(in);
buildHash = in.readString();

try {
// If the remote node is incompatible then make an effort to identify it anyway, so we can mention it in the exception
// message, but recognise that this may fail
discoveryNode = new DiscoveryNode(in);
} catch (Exception e) {
if (isIncompatibleBuild(version, buildHash)) {
throw new IllegalArgumentException("unidentifiable remote node is build [" + buildHash +
"] of version [" + version + "] but this node is build [" + Build.CURRENT.hash() +
"] of version [" + Version.CURRENT + "] which has an incompatible wire format", e);
} else {
throw e;
}
}

if (isIncompatibleBuild(version, buildHash)) {
if (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS) {
logger.warn("remote node [{}] is build [{}] of version [{}] but this node is build [{}] of version [{}] " +
"which may not be compatible; remove system property [{}] to resolve this warning",
discoveryNode, buildHash, version, Build.CURRENT.hash(), Version.CURRENT,
PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY);
} else {
throw new IllegalArgumentException("remote node [" + discoveryNode + "] is build [" + buildHash +
"] of version [" + version + "] but this node is build [" + Build.CURRENT.hash() +
"] of version [" + Version.CURRENT + "] which has an incompatible wire format");
}
}

clusterName = new ClusterName(in);
} else {
discoveryNode = in.readOptionalWriteable(DiscoveryNode::new);
clusterName = new ClusterName(in);
version = Version.readVersion(in);
buildHash = null;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(discoveryNode);
clusterName.writeTo(out);
Version.writeVersion(version, out);
if (out.getVersion().onOrAfter(BUILD_HASH_HANDSHAKE_VERSION)) {
Version.writeVersion(version, out);
out.writeString(buildHash);
discoveryNode.writeTo(out);
clusterName.writeTo(out);
} else {
out.writeOptionalWriteable(discoveryNode);
clusterName.writeTo(out);
Version.writeVersion(version, out);
}
}

public Version getVersion() {
return version;
}

public String getBuildHash() {
return buildHash;
}

public DiscoveryNode getDiscoveryNode() {
Expand All @@ -494,6 +581,10 @@ public DiscoveryNode getDiscoveryNode() {
public ClusterName getClusterName() {
return clusterName;
}

private static boolean isIncompatibleBuild(Version version, String buildHash) {
return version == Version.CURRENT && Build.CURRENT.hash().equals(buildHash) == false;
}
}

public void disconnectFromNode(DiscoveryNode node) {
Expand Down Expand Up @@ -1293,4 +1384,5 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -479,7 +480,7 @@ private TestTransportService(Transport transport, ThreadPool threadPool) {
@Override
public void handshake(Transport.Connection connection, TimeValue timeout, Predicate<ClusterName> clusterNamePredicate,
ActionListener<HandshakeResponse> listener) {
listener.onResponse(new HandshakeResponse(connection.getNode(), new ClusterName(""), Version.CURRENT));
listener.onResponse(new HandshakeResponse(Version.CURRENT, Build.CURRENT.hash(), connection.getNode(), new ClusterName("")));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
Expand Down Expand Up @@ -227,7 +228,11 @@ public void testFailsNodeThatDisconnects() {
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
assertFalse(node.equals(localNode));
if (action.equals(HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT));
handleResponse(requestId, new TransportService.HandshakeResponse(
Version.CURRENT,
Build.CURRENT.hash(),
node,
ClusterName.DEFAULT));
return;
}
deterministicTaskQueue.scheduleNow(new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster.coordination;

import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
Expand Down Expand Up @@ -222,7 +223,11 @@ public void testFollowerFailsImmediatelyOnDisconnection() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
if (action.equals(HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT));
handleResponse(requestId, new TransportService.HandshakeResponse(
Version.CURRENT,
Build.CURRENT.hash(),
node,
ClusterName.DEFAULT));
return;
}
assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME));
Expand Down Expand Up @@ -332,7 +337,11 @@ public void testFollowerFailsImmediatelyOnHealthCheckFailure() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
if (action.equals(HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT));
handleResponse(requestId, new TransportService.HandshakeResponse(
Version.CURRENT,
Build.CURRENT.hash(),
node,
ClusterName.DEFAULT));
return;
}
assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
Expand Down Expand Up @@ -161,8 +162,12 @@ private void setupMasterServiceAndCoordinator(long term, ClusterState initialSta
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) {
if (action.equals(HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(destination, initialState.getClusterName(),
destination.getVersion()));
handleResponse(requestId, new TransportService.HandshakeResponse(
destination.getVersion(),
Build.CURRENT.hash(),
destination,
initialState.getClusterName()
));
} else if (action.equals(JoinHelper.VALIDATE_JOIN_ACTION_NAME)) {
handleResponse(requestId, new TransportResponse.Empty());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -98,7 +99,11 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
if (fullConnectionFailure != null && node.getAddress().equals(remoteNode.getAddress())) {
handleError(requestId, fullConnectionFailure);
} else {
handleResponse(requestId, new HandshakeResponse(remoteNode, new ClusterName(remoteClusterName), Version.CURRENT));
handleResponse(requestId, new HandshakeResponse(
Version.CURRENT,
Build.CURRENT.hash(),
remoteNode,
new ClusterName(remoteClusterName)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.transport;

import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterName;
Expand Down Expand Up @@ -58,7 +59,11 @@ public void testDeserializationFailureLogIdentifiesListener() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
if (action.equals(TransportService.HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(otherNode, new ClusterName(""), Version.CURRENT));
handleResponse(requestId, new TransportService.HandshakeResponse(
Version.CURRENT,
Build.CURRENT.hash(),
otherNode,
new ClusterName("")));
}
}
};
Expand Down
Loading