Skip to content

Validate build hash in handshake #65601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 117 additions & 18 deletions server/src/main/java/org/elasticsearch/transport/TransportService.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
Expand All @@ -37,6 +38,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -73,10 +75,27 @@
import java.util.function.Predicate;
import java.util.function.Supplier;

public class TransportService extends AbstractLifecycleComponent implements ReportingService<TransportInfo>, TransportMessageListener,
TransportConnectionListener {
public class TransportService extends AbstractLifecycleComponent
implements ReportingService<TransportInfo>, TransportMessageListener, TransportConnectionListener {

private static final Logger logger = LogManager.getLogger(TransportService.class);

private static final String PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY = "es.unsafely_permit_handshake_from_incompatible_builds";
private static final boolean PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS;

static {
final String value = System.getProperty(PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY);
if (value == null) {
PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS = false;
} else if (Boolean.parseBoolean(value)) {
PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS = true;
} else {
throw new IllegalArgumentException("invalid value [" + value + "] for system property ["
+ PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "]");
}
}


public static final String DIRECT_RESPONSE_PROFILE = ".direct";
public static final String HANDSHAKE_ACTION_NAME = "internal:transport/handshake";

Expand Down Expand Up @@ -115,6 +134,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) {
private final RemoteClusterService remoteClusterService;

private final boolean validateConnections;
private final boolean requireCompatibleBuild;

/** if set will call requests sent to this id to shortcut and executed locally */
volatile DiscoveryNode localNode = null;
Expand Down Expand Up @@ -160,9 +180,15 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, TransportInterceptor transportInterceptor,
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders, ConnectionManager connectionManager) {

final boolean isTransportClient = TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey()));

// If we are a transport client then we skip the check that the remote node has a compatible build hash
this.requireCompatibleBuild = isTransportClient == false;

// The only time we do not want to validate node connections is when this is a transport client using the simple node sampler
this.validateConnections = TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey())) == false ||
TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings);
this.validateConnections = isTransportClient == false || TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings);

this.transport = transport;
transport.setSlowLogThreshold(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings));
this.threadPool = threadPool;
Expand Down Expand Up @@ -192,7 +218,14 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa
false, false,
HandshakeRequest::new,
(request, channel, task) -> channel.sendResponse(
new HandshakeResponse(localNode, clusterName, localNode.getVersion())));
new HandshakeResponse(localNode.getVersion(), Build.CURRENT.hash(), localNode, clusterName)));

if (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS) {
logger.warn("transport handshakes from incompatible builds are unsafely permitted on this node; remove system property [" +
PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] to resolve this warning");
DeprecationLogger.getLogger(TransportService.class).deprecate("permit_handshake_from_incompatible_builds",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: little much to log two messages for one thing? This seems like it's kind of a dev-only option anyway, why bother with the deprecation logger?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ehh I worry we'll get people using this, and the two logs are for two separate aspects. Deprecation logs don't go into the main server log (by default at least) but I think this deserves a visible warning whenever it's used.

"system property [" + PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY + "] is deprecated and should be removed");
}
}

public RemoteClusterService getRemoteClusterService() {
Expand Down Expand Up @@ -482,7 +515,7 @@ public void onFailure(Exception e) {
listener.onFailure(e);
}
}
, HandshakeResponse::new, ThreadPool.Names.GENERIC
, in -> new HandshakeResponse(in, requireCompatibleBuild), ThreadPool.Names.GENERIC
));
}

Expand All @@ -504,28 +537,89 @@ private HandshakeRequest() {
}

public static class HandshakeResponse extends TransportResponse {

private static final Version BUILD_HASH_HANDSHAKE_VERSION = Version.V_7_11_0;

private final Version version;

@Nullable // if version < BUILD_HASH_HANDSHAKE_VERSION
private final String buildHash;

private final DiscoveryNode discoveryNode;

private final ClusterName clusterName;
private final Version version;

public HandshakeResponse(DiscoveryNode discoveryNode, ClusterName clusterName, Version version) {
this.discoveryNode = discoveryNode;
this.version = version;
this.clusterName = clusterName;
public HandshakeResponse(Version version, String buildHash, DiscoveryNode discoveryNode, ClusterName clusterName) {
this.buildHash = Objects.requireNonNull(buildHash);
this.discoveryNode = Objects.requireNonNull(discoveryNode);
this.version = Objects.requireNonNull(version);
this.clusterName = Objects.requireNonNull(clusterName);
}

public HandshakeResponse(StreamInput in) throws IOException {
public HandshakeResponse(StreamInput in, boolean requireCompatibleBuild) throws IOException {
super(in);
discoveryNode = in.readOptionalWriteable(DiscoveryNode::new);
clusterName = new ClusterName(in);
version = Version.readVersion(in);
if (in.getVersion().onOrAfter(BUILD_HASH_HANDSHAKE_VERSION)) {
// the first two fields need only VInts and raw (ASCII) characters, so we cross our fingers and hope that they appear
// on the wire as we expect them to even if this turns out to be an incompatible build
version = Version.readVersion(in);
buildHash = in.readString();

try {
// If the remote node is incompatible then make an effort to identify it anyway, so we can mention it in the exception
// message, but recognise that this may fail
discoveryNode = new DiscoveryNode(in);
} catch (Exception e) {
if (isIncompatibleBuild(version, buildHash, requireCompatibleBuild)) {
throw new IllegalArgumentException("unidentifiable remote node is build [" + buildHash +
"] of version [" + version + "] but this node is build [" + Build.CURRENT.hash() +
"] of version [" + Version.CURRENT + "] which has an incompatible wire format", e);
} else {
throw e;
}
}

if (isIncompatibleBuild(version, buildHash, requireCompatibleBuild)) {
if (PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS) {
logger.warn("remote node [{}] is build [{}] of version [{}] but this node is build [{}] of version [{}] " +
"which may not be compatible; remove system property [{}] to resolve this warning",
discoveryNode, buildHash, version, Build.CURRENT.hash(), Version.CURRENT,
PERMIT_HANDSHAKES_FROM_INCOMPATIBLE_BUILDS_KEY);
} else {
throw new IllegalArgumentException("remote node [" + discoveryNode + "] is build [" + buildHash +
"] of version [" + version + "] but this node is build [" + Build.CURRENT.hash() +
"] of version [" + Version.CURRENT + "] which has an incompatible wire format");
}
}

clusterName = new ClusterName(in);
} else {
discoveryNode = in.readOptionalWriteable(DiscoveryNode::new);
clusterName = new ClusterName(in);
version = Version.readVersion(in);
buildHash = null;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(discoveryNode);
clusterName.writeTo(out);
Version.writeVersion(version, out);
if (out.getVersion().onOrAfter(BUILD_HASH_HANDSHAKE_VERSION)) {
Version.writeVersion(version, out);
out.writeString(buildHash);
discoveryNode.writeTo(out);
clusterName.writeTo(out);
} else {
out.writeOptionalWriteable(discoveryNode);
clusterName.writeTo(out);
Version.writeVersion(version, out);
}
}

public Version getVersion() {
return version;
}

public String getBuildHash() {
return buildHash;
}

public DiscoveryNode getDiscoveryNode() {
Expand All @@ -535,6 +629,10 @@ public DiscoveryNode getDiscoveryNode() {
public ClusterName getClusterName() {
return clusterName;
}

private static boolean isIncompatibleBuild(Version version, String buildHash, boolean requireCompatibleBuild) {
return requireCompatibleBuild && version == Version.CURRENT && Build.CURRENT.hash().equals(buildHash) == false;
}
}

public void disconnectFromNode(DiscoveryNode node) {
Expand Down Expand Up @@ -1354,4 +1452,5 @@ public void onResponseReceived(long requestId, Transport.ResponseContext holder)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.client.transport;

import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
Expand Down Expand Up @@ -102,7 +103,11 @@ public void sendRequest(long requestId, String action, TransportRequest request,
} else if (TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
TransportResponseHandler transportResponseHandler = responseHandlers.onResponseReceived(requestId, listener);
Version version = node.getVersion();
transportResponseHandler.handleResponse(new TransportService.HandshakeResponse(node, clusterName, version));
transportResponseHandler.handleResponse(new TransportService.HandshakeResponse(
version,
Build.CURRENT.hash(),
node,
clusterName));

} else {
throw new UnsupportedOperationException("Mock transport does not understand action " + action);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.client.transport;

import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse;
Expand Down Expand Up @@ -180,7 +181,11 @@ public <T extends TransportResponse> void sendRequest(Transport.Connection conne
clusterStateLatch.countDown();
} else if (TransportService.HANDSHAKE_ACTION_NAME .equals(action)) {
((TransportResponseHandler<TransportService.HandshakeResponse>) handler).handleResponse(
new TransportService.HandshakeResponse(connection.getNode(), clusterName, connection.getNode().getVersion()));
new TransportService.HandshakeResponse(
connection.getNode().getVersion(),
Build.CURRENT.hash(),
connection.getNode(),
clusterName));
} else {
handler.handleException(new TransportException("", new InternalException(action)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -479,7 +480,7 @@ private TestTransportService(Transport transport, ThreadPool threadPool) {
@Override
public void handshake(Transport.Connection connection, TimeValue timeout, Predicate<ClusterName> clusterNamePredicate,
ActionListener<HandshakeResponse> listener) {
listener.onResponse(new HandshakeResponse(connection.getNode(), new ClusterName(""), Version.CURRENT));
listener.onResponse(new HandshakeResponse(Version.CURRENT, Build.CURRENT.hash(), connection.getNode(), new ClusterName("")));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.cluster.coordination;

import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
Expand Down Expand Up @@ -226,7 +227,11 @@ public void testFailsNodeThatDisconnects() {
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
assertFalse(node.equals(localNode));
if (action.equals(HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT));
handleResponse(requestId, new TransportService.HandshakeResponse(
Version.CURRENT,
Build.CURRENT.hash(),
node,
ClusterName.DEFAULT));
return;
}
deterministicTaskQueue.scheduleNow(new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster.coordination;

import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
Expand Down Expand Up @@ -221,7 +222,11 @@ public void testFollowerFailsImmediatelyOnDisconnection() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
if (action.equals(HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT));
handleResponse(requestId, new TransportService.HandshakeResponse(
Version.CURRENT,
Build.CURRENT.hash(),
node,
ClusterName.DEFAULT));
return;
}
assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME));
Expand Down Expand Up @@ -330,7 +335,11 @@ public void testFollowerFailsImmediatelyOnHealthCheckFailure() {
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) {
if (action.equals(HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(node, ClusterName.DEFAULT, Version.CURRENT));
handleResponse(requestId, new TransportService.HandshakeResponse(
Version.CURRENT,
Build.CURRENT.hash(),
node,
ClusterName.DEFAULT));
return;
}
assertThat(action, equalTo(LEADER_CHECK_ACTION_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.cluster.coordination;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
Expand Down Expand Up @@ -162,8 +163,12 @@ private void setupMasterServiceAndCoordinator(long term, ClusterState initialSta
@Override
protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode destination) {
if (action.equals(HANDSHAKE_ACTION_NAME)) {
handleResponse(requestId, new TransportService.HandshakeResponse(destination, initialState.getClusterName(),
destination.getVersion()));
handleResponse(requestId, new TransportService.HandshakeResponse(
destination.getVersion(),
Build.CURRENT.hash(),
destination,
initialState.getClusterName()
));
} else if (action.equals(JoinHelper.VALIDATE_JOIN_ACTION_NAME)) {
handleResponse(requestId, new TransportResponse.Empty());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Build;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -98,7 +99,11 @@ protected void onSendRequest(long requestId, String action, TransportRequest req
if (fullConnectionFailure != null && node.getAddress().equals(remoteNode.getAddress())) {
handleError(requestId, fullConnectionFailure);
} else {
handleResponse(requestId, new HandshakeResponse(remoteNode, new ClusterName(remoteClusterName), Version.CURRENT));
handleResponse(requestId, new HandshakeResponse(
Version.CURRENT,
Build.CURRENT.hash(),
remoteNode,
new ClusterName(remoteClusterName)));
}
}
}
Expand Down
Loading