Skip to content

Commit d2d7f26

Browse files
jasontedorjkakavas
authored andcommitted
Remove client feature tracking (#44929)
This commit removes the infrastructure for client feature tracking. We introduced this functionality to support clients that do not necessarily understand all the features that the server might support, for example, customs in the cluster state provided by plugins that a client might not have. This can arise in situations such as rolling upgrades from the OSS distribution to the default distribution. With the removal of the transport client, this infrastructure is no longer needed. This commit removes client feature tracking from the server in 8.0.0.
1 parent 929ef7f commit d2d7f26

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+92
-928
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterState.java

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
import java.util.Locale;
6666
import java.util.Map;
6767
import java.util.Objects;
68-
import java.util.Optional;
6968
import java.util.Set;
7069
import java.util.stream.StreamSupport;
7170

@@ -95,40 +94,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
9594

9695
public static final ClusterState EMPTY_STATE = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();
9796

98-
/**
99-
* An interface that implementors use when a class requires a client to maybe have a feature.
100-
*/
101-
public interface FeatureAware {
102-
103-
/**
104-
* An optional feature that is required for the client to have.
105-
*
106-
* @return an empty optional if no feature is required otherwise a string representing the required feature
107-
*/
108-
default Optional<String> getRequiredFeature() {
109-
return Optional.empty();
110-
}
111-
112-
/**
113-
* Tests whether or not the custom should be serialized. The criteria are:
114-
* <ul>
115-
* <li>the output stream must be at least the minimum supported version of the custom</li>
116-
* </ul>
117-
* <p>
118-
* That is, we only serialize customs to clients than can understand the custom based on the version of the client.
119-
*
120-
* @param out the output stream
121-
* @param custom the custom to serialize
122-
* @param <T> the type of the custom
123-
* @return true if the custom should be serialized and false otherwise
124-
*/
125-
static <T extends VersionedNamedWriteable & FeatureAware> boolean shouldSerialize(final StreamOutput out, final T custom) {
126-
return out.getVersion().onOrAfter(custom.getMinimalSupportedVersion());
127-
}
128-
129-
}
130-
131-
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, FeatureAware {
97+
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
13298

13399
/**
134100
* Returns <code>true</code> iff this {@link Custom} is private to the cluster and should never be send to a client.
@@ -777,13 +743,13 @@ public void writeTo(StreamOutput out) throws IOException {
777743
// filter out custom states not supported by the other node
778744
int numberOfCustoms = 0;
779745
for (final ObjectCursor<Custom> cursor : customs.values()) {
780-
if (FeatureAware.shouldSerialize(out, cursor.value)) {
746+
if (VersionedNamedWriteable.shouldSerialize(out, cursor.value)) {
781747
numberOfCustoms++;
782748
}
783749
}
784750
out.writeVInt(numberOfCustoms);
785751
for (final ObjectCursor<Custom> cursor : customs.values()) {
786-
if (FeatureAware.shouldSerialize(out, cursor.value)) {
752+
if (VersionedNamedWriteable.shouldSerialize(out, cursor.value)) {
787753
out.writeNamedWriteable(cursor.value);
788754
}
789755
}

server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
import org.apache.lucene.util.CollectionUtil;
2828
import org.elasticsearch.Version;
2929
import org.elasticsearch.action.AliasesRequest;
30-
import org.elasticsearch.cluster.ClusterState;
31-
import org.elasticsearch.cluster.ClusterState.FeatureAware;
3230
import org.elasticsearch.cluster.Diff;
3331
import org.elasticsearch.cluster.Diffable;
3432
import org.elasticsearch.cluster.DiffableUtils;
@@ -44,6 +42,7 @@
4442
import org.elasticsearch.common.collect.ImmutableOpenMap;
4543
import org.elasticsearch.common.io.stream.StreamInput;
4644
import org.elasticsearch.common.io.stream.StreamOutput;
45+
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
4746
import org.elasticsearch.common.regex.Regex;
4847
import org.elasticsearch.common.settings.Setting;
4948
import org.elasticsearch.common.settings.Setting.Property;
@@ -125,7 +124,7 @@ public enum XContentContext {
125124
*/
126125
public static EnumSet<XContentContext> ALL_CONTEXTS = EnumSet.allOf(XContentContext.class);
127126

128-
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, ClusterState.FeatureAware {
127+
public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
129128

130129
EnumSet<XContentContext> context();
131130
}
@@ -916,13 +915,13 @@ public void writeTo(StreamOutput out) throws IOException {
916915
// filter out custom states not supported by the other node
917916
int numberOfCustoms = 0;
918917
for (final ObjectCursor<Custom> cursor : customs.values()) {
919-
if (FeatureAware.shouldSerialize(out, cursor.value)) {
918+
if (VersionedNamedWriteable.shouldSerialize(out, cursor.value)) {
920919
numberOfCustoms++;
921920
}
922921
}
923922
out.writeVInt(numberOfCustoms);
924923
for (final ObjectCursor<Custom> cursor : customs.values()) {
925-
if (FeatureAware.shouldSerialize(out, cursor.value)) {
924+
if (VersionedNamedWriteable.shouldSerialize(out, cursor.value)) {
926925
out.writeNamedWriteable(cursor.value);
927926
}
928927
}

server/src/main/java/org/elasticsearch/common/io/stream/VersionedNamedWriteable.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,19 @@ public interface VersionedNamedWriteable extends NamedWriteable {
3535
* The minimal version of the recipient this object can be sent to
3636
*/
3737
Version getMinimalSupportedVersion();
38+
39+
/**
40+
* Tests whether or not the custom should be serialized. The criteria is the output stream must be at least the minimum supported
41+
* version of the custom. That is, we only serialize customs to clients than can understand the custom based on the version of the
42+
* client.
43+
*
44+
* @param out the output stream
45+
* @param custom the custom to serialize
46+
* @param <T> the type of the custom
47+
* @return true if the custom should be serialized and false otherwise
48+
*/
49+
static <T extends VersionedNamedWriteable> boolean shouldSerialize(final StreamOutput out, final T custom) {
50+
return out.getVersion().onOrAfter(custom.getMinimalSupportedVersion());
51+
}
52+
3853
}

server/src/main/java/org/elasticsearch/persistent/PersistentTaskParams.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@
1919

2020
package org.elasticsearch.persistent;
2121

22-
import org.elasticsearch.cluster.ClusterState;
2322
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
2423
import org.elasticsearch.common.xcontent.ToXContentObject;
2524

2625
/**
2726
* Parameters used to start persistent task
2827
*/
29-
public interface PersistentTaskParams extends VersionedNamedWriteable, ToXContentObject, ClusterState.FeatureAware {
28+
public interface PersistentTaskParams extends VersionedNamedWriteable, ToXContentObject {
3029

3130
}

server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetaData.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.Strings;
3131
import org.elasticsearch.common.io.stream.StreamInput;
3232
import org.elasticsearch.common.io.stream.StreamOutput;
33+
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
3334
import org.elasticsearch.common.io.stream.Writeable;
3435
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
3536
import org.elasticsearch.common.xcontent.ObjectParser;
@@ -532,7 +533,7 @@ public PersistentTasksCustomMetaData(StreamInput in) throws IOException {
532533
public void writeTo(StreamOutput out) throws IOException {
533534
out.writeLong(lastAllocationId);
534535
Map<String, PersistentTask<?>> filteredTasks = tasks.values().stream()
535-
.filter(t -> ClusterState.FeatureAware.shouldSerialize(out, t.getParams()))
536+
.filter(t -> VersionedNamedWriteable.shouldSerialize(out, t.getParams()))
536537
.collect(Collectors.toMap(PersistentTask::getId, Function.identity()));
537538
out.writeMap(filteredTasks, StreamOutput::writeString, (stream, value) -> value.writeTo(stream));
538539
}

server/src/main/java/org/elasticsearch/plugins/Plugin.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.elasticsearch.bootstrap.BootstrapCheck;
2323
import org.elasticsearch.client.Client;
24-
import org.elasticsearch.cluster.ClusterState;
2524
import org.elasticsearch.cluster.metadata.IndexMetaData;
2625
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
2726
import org.elasticsearch.cluster.metadata.MetaData;
@@ -49,7 +48,6 @@
4948
import java.util.Collections;
5049
import java.util.List;
5150
import java.util.Map;
52-
import java.util.Optional;
5351
import java.util.Set;
5452
import java.util.function.UnaryOperator;
5553

@@ -72,17 +70,6 @@
7270
*/
7371
public abstract class Plugin implements Closeable {
7472

75-
/**
76-
* A feature exposed by the plugin. This should be used if a plugin exposes {@link ClusterState.Custom} or {@link MetaData.Custom}; see
77-
* also {@link ClusterState.FeatureAware}.
78-
*
79-
* @return a feature set represented by this plugin, or the empty optional if the plugin does not expose cluster state or metadata
80-
* customs
81-
*/
82-
protected Optional<String> getFeature() {
83-
return Optional.empty();
84-
}
85-
8673
/**
8774
* Returns components added by this plugin.
8875
*

server/src/main/java/org/elasticsearch/plugins/PluginsService.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.elasticsearch.common.settings.Settings;
4040
import org.elasticsearch.index.IndexModule;
4141
import org.elasticsearch.threadpool.ExecutorBuilder;
42-
import org.elasticsearch.transport.TransportSettings;
4342

4443
import java.io.IOException;
4544
import java.lang.reflect.Constructor;
@@ -59,9 +58,7 @@
5958
import java.util.Locale;
6059
import java.util.Map;
6160
import java.util.Objects;
62-
import java.util.Optional;
6361
import java.util.Set;
64-
import java.util.TreeMap;
6562
import java.util.function.Function;
6663
import java.util.stream.Collectors;
6764

@@ -202,7 +199,6 @@ private static void logPluginInfo(final List<PluginInfo> pluginInfos, final Stri
202199

203200
public Settings updatedSettings() {
204201
Map<String, String> foundSettings = new HashMap<>();
205-
final Map<String, String> features = new TreeMap<>();
206202
final Settings.Builder builder = Settings.builder();
207203
for (Tuple<PluginInfo, Plugin> plugin : plugins) {
208204
Settings settings = plugin.v2().additionalSettings();
@@ -214,23 +210,6 @@ public Settings updatedSettings() {
214210
}
215211
}
216212
builder.put(settings);
217-
final Optional<String> maybeFeature = plugin.v2().getFeature();
218-
if (maybeFeature.isPresent()) {
219-
final String feature = maybeFeature.get();
220-
if (features.containsKey(feature)) {
221-
final String message = String.format(
222-
Locale.ROOT,
223-
"duplicate feature [%s] in plugin [%s], already added in [%s]",
224-
feature,
225-
plugin.v1().getName(),
226-
features.get(feature));
227-
throw new IllegalArgumentException(message);
228-
}
229-
features.put(feature, plugin.v1().getName());
230-
}
231-
}
232-
for (final String feature : features.keySet()) {
233-
builder.put(TransportSettings.FEATURE_PREFIX + "." + feature, true);
234213
}
235214
return builder.put(this.settings).build();
236215
}

server/src/main/java/org/elasticsearch/transport/InboundHandler.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import java.net.InetSocketAddress;
3939
import java.util.Collections;
4040
import java.util.Map;
41-
import java.util.Set;
4241

4342
public class InboundHandler {
4443

@@ -153,7 +152,6 @@ private void messageReceived(BytesReference reference, TcpChannel channel) throw
153152
}
154153

155154
private void handleRequest(TcpChannel channel, InboundMessage.Request message, int messageLengthBytes) {
156-
final Set<String> features = message.getFeatures();
157155
final String action = message.getActionName();
158156
final long requestId = message.getRequestId();
159157
final StreamInput stream = message.getStreamInput();
@@ -162,7 +160,7 @@ private void handleRequest(TcpChannel channel, InboundMessage.Request message, i
162160
TransportChannel transportChannel = null;
163161
try {
164162
if (message.isHandshake()) {
165-
handshaker.handleHandshake(version, features, channel, requestId, stream);
163+
handshaker.handleHandshake(version, channel, requestId, stream);
166164
} else {
167165
final RequestHandlerRegistry reg = getRequestHandler(action);
168166
if (reg == null) {
@@ -174,7 +172,7 @@ private void handleRequest(TcpChannel channel, InboundMessage.Request message, i
174172
} else {
175173
breaker.addWithoutBreaking(messageLengthBytes);
176174
}
177-
transportChannel = new TcpTransportChannel(outboundHandler, channel, action, requestId, version, features,
175+
transportChannel = new TcpTransportChannel(outboundHandler, channel, action, requestId, version,
178176
circuitBreakerService, messageLengthBytes, message.isCompress());
179177
final TransportRequest request = reg.newRequest(stream);
180178
request.remoteAddress(new TransportAddress(channel.getRemoteAddress()));
@@ -190,7 +188,7 @@ private void handleRequest(TcpChannel channel, InboundMessage.Request message, i
190188
} catch (Exception e) {
191189
// the circuit breaker tripped
192190
if (transportChannel == null) {
193-
transportChannel = new TcpTransportChannel(outboundHandler, channel, action, requestId, version, features,
191+
transportChannel = new TcpTransportChannel(outboundHandler, channel, action, requestId, version,
194192
circuitBreakerService, 0, message.isCompress());
195193
}
196194
try {

server/src/main/java/org/elasticsearch/transport/InboundMessage.java

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,6 @@
3131

3232
import java.io.Closeable;
3333
import java.io.IOException;
34-
import java.util.Arrays;
35-
import java.util.Collections;
36-
import java.util.Set;
37-
import java.util.TreeSet;
3834

3935
public abstract class InboundMessage extends NetworkMessage implements Closeable {
4036

@@ -96,15 +92,12 @@ InboundMessage deserialize(BytesReference reference) throws IOException {
9692

9793
InboundMessage message;
9894
if (TransportStatus.isRequest(status)) {
99-
final String[] featuresFound = streamInput.readStringArray();
100-
final Set<String> features;
101-
if (featuresFound.length == 0) {
102-
features = Collections.emptySet();
103-
} else {
104-
features = Collections.unmodifiableSet(new TreeSet<>(Arrays.asList(featuresFound)));
95+
if (remoteVersion.before(Version.V_8_0_0)) {
96+
// discard features
97+
streamInput.readStringArray();
10598
}
10699
final String action = streamInput.readString();
107-
message = new Request(threadContext, remoteVersion, status, requestId, action, features, streamInput);
100+
message = new Request(threadContext, remoteVersion, status, requestId, action, streamInput);
108101
} else {
109102
message = new Response(threadContext, remoteVersion, status, requestId, streamInput);
110103
}
@@ -146,22 +139,17 @@ private static void ensureVersionCompatibility(Version version, Version currentV
146139
public static class Request extends InboundMessage {
147140

148141
private final String actionName;
149-
private final Set<String> features;
150142

151-
Request(ThreadContext threadContext, Version version, byte status, long requestId, String actionName, Set<String> features,
143+
Request(ThreadContext threadContext, Version version, byte status, long requestId, String actionName,
152144
StreamInput streamInput) {
153145
super(threadContext, version, status, requestId, streamInput);
154146
this.actionName = actionName;
155-
this.features = features;
156147
}
157148

158149
String getActionName() {
159150
return actionName;
160151
}
161152

162-
Set<String> getFeatures() {
163-
return features;
164-
}
165153
}
166154

167155
public static class Response extends InboundMessage {

server/src/main/java/org/elasticsearch/transport/OutboundHandler.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,15 @@ final class OutboundHandler {
4949

5050
private final String nodeName;
5151
private final Version version;
52-
private final String[] features;
5352
private final ThreadPool threadPool;
5453
private final BigArrays bigArrays;
5554
private final TransportLogger transportLogger;
5655
private volatile TransportMessageListener messageListener = TransportMessageListener.NOOP_LISTENER;
5756

58-
OutboundHandler(String nodeName, Version version, String[] features, ThreadPool threadPool, BigArrays bigArrays,
57+
OutboundHandler(String nodeName, Version version, ThreadPool threadPool, BigArrays bigArrays,
5958
TransportLogger transportLogger) {
6059
this.nodeName = nodeName;
6160
this.version = version;
62-
this.features = features;
6361
this.threadPool = threadPool;
6462
this.bigArrays = bigArrays;
6563
this.transportLogger = transportLogger;
@@ -83,8 +81,8 @@ void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long
8381
final TransportRequest request, final TransportRequestOptions options, final Version channelVersion,
8482
final boolean compressRequest, final boolean isHandshake) throws IOException, TransportException {
8583
Version version = Version.min(this.version, channelVersion);
86-
OutboundMessage.Request message = new OutboundMessage.Request(threadPool.getThreadContext(), features, request, version, action,
87-
requestId, isHandshake, compressRequest);
84+
OutboundMessage.Request message =
85+
new OutboundMessage.Request(threadPool.getThreadContext(), request, version, action, requestId, isHandshake, compressRequest);
8886
ActionListener<Void> listener = ActionListener.wrap(() ->
8987
messageListener.onRequestSent(node, requestId, action, request, options));
9088
sendMessage(channel, message, listener);

0 commit comments

Comments
 (0)