Skip to content

Commit 4a19118

Browse files
committed
Make Persistent Tasks implementations version and feature aware (#31045)
With #31020 we introduced the ability for transport clients to indicate what features they support in order to make sure we don't serialize object to them they don't support. This PR adapts the serialization logic of persistent tasks to be aware of those features and not serialize tasks that aren't supported. Also, a version check is added for the future where we may add new tasks implementations and need to be able to indicate they shouldn't be serialized both to nodes and clients. As the implementation relies on the interface of `PersistentTaskParams`, these are no longer optional. That's acceptable as all current implementation have them and we plan to make `PersistentTaskParams` more central in the future. Relates to #30731
1 parent ea3c379 commit 4a19118

39 files changed

+383
-118
lines changed

modules/tribe/src/test/java/org/elasticsearch/tribe/TribeServiceTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.tribe;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.cluster.ClusterName;
2324
import org.elasticsearch.cluster.MergableCustomMetaData;
2425
import org.elasticsearch.cluster.NamedDiff;
@@ -238,6 +239,11 @@ public String getWriteableName() {
238239
return TYPE;
239240
}
240241

242+
@Override
243+
public Version getMinimalSupportedVersion() {
244+
return Version.CURRENT.minimumCompatibilityVersion();
245+
}
246+
241247
public static MergableCustomMetaData1 readFrom(StreamInput in) throws IOException {
242248
return readFrom(MergableCustomMetaData1::new, in);
243249
}
@@ -270,6 +276,11 @@ public String getWriteableName() {
270276
return TYPE;
271277
}
272278

279+
@Override
280+
public Version getMinimalSupportedVersion() {
281+
return Version.CURRENT.minimumCompatibilityVersion();
282+
}
283+
273284
public static MergableCustomMetaData2 readFrom(StreamInput in) throws IOException {
274285
return readFrom(MergableCustomMetaData2::new, in);
275286
}

server/src/main/java/org/elasticsearch/cluster/ClusterState.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.carrotsearch.hppc.cursors.IntObjectCursor;
2323
import com.carrotsearch.hppc.cursors.ObjectCursor;
2424
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
25-
2625
import org.elasticsearch.client.transport.TransportClient;
2726
import org.elasticsearch.cluster.block.ClusterBlock;
2827
import org.elasticsearch.cluster.block.ClusterBlocks;
@@ -50,6 +49,7 @@
5049
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
5150
import org.elasticsearch.common.io.stream.StreamInput;
5251
import org.elasticsearch.common.io.stream.StreamOutput;
52+
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
5353
import org.elasticsearch.common.settings.Settings;
5454
import org.elasticsearch.common.xcontent.ToXContentFragment;
5555
import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -122,7 +122,7 @@ default Optional<String> getRequiredFeature() {
122122
* @param <T> the type of the custom
123123
* @return true if the custom should be serialized and false otherwise
124124
*/
125-
static <T extends NamedDiffable & FeatureAware> boolean shouldSerializeCustom(final StreamOutput out, final T custom) {
125+
static <T extends VersionedNamedWriteable & FeatureAware> boolean shouldSerialize(final StreamOutput out, final T custom) {
126126
if (out.getVersion().before(custom.getMinimalSupportedVersion())) {
127127
return false;
128128
}
@@ -745,13 +745,13 @@ public void writeTo(StreamOutput out) throws IOException {
745745
// filter out custom states not supported by the other node
746746
int numberOfCustoms = 0;
747747
for (final ObjectCursor<Custom> cursor : customs.values()) {
748-
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
748+
if (FeatureAware.shouldSerialize(out, cursor.value)) {
749749
numberOfCustoms++;
750750
}
751751
}
752752
out.writeVInt(numberOfCustoms);
753753
for (final ObjectCursor<Custom> cursor : customs.values()) {
754-
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
754+
if (FeatureAware.shouldSerialize(out, cursor.value)) {
755755
out.writeNamedWriteable(cursor.value);
756756
}
757757
}

server/src/main/java/org/elasticsearch/cluster/NamedDiffable.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,10 @@
1919

2020
package org.elasticsearch.cluster;
2121

22-
import org.elasticsearch.Version;
23-
import org.elasticsearch.common.io.stream.NamedWriteable;
22+
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
2423

2524
/**
26-
* Diff that also support NamedWriteable interface
25+
* Diff that also support {@link VersionedNamedWriteable} interface
2726
*/
28-
public interface NamedDiffable<T> extends Diffable<T>, NamedWriteable {
29-
/**
30-
* The minimal version of the recipient this custom object can be sent to
31-
*/
32-
default Version getMinimalSupportedVersion() {
33-
return Version.CURRENT.minimumIndexCompatibilityVersion();
34-
}
27+
public interface NamedDiffable<T> extends Diffable<T>, VersionedNamedWriteable {
3528
}

server/src/main/java/org/elasticsearch/cluster/RestoreInProgress.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,15 @@
2020
package org.elasticsearch.cluster;
2121

2222
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
23+
import org.elasticsearch.Version;
2324
import org.elasticsearch.cluster.ClusterState.Custom;
24-
import org.elasticsearch.snapshots.Snapshot;
2525
import org.elasticsearch.common.collect.ImmutableOpenMap;
2626
import org.elasticsearch.common.io.stream.StreamInput;
2727
import org.elasticsearch.common.io.stream.StreamOutput;
2828
import org.elasticsearch.common.xcontent.ToXContent;
2929
import org.elasticsearch.common.xcontent.XContentBuilder;
3030
import org.elasticsearch.index.shard.ShardId;
31+
import org.elasticsearch.snapshots.Snapshot;
3132

3233
import java.io.IOException;
3334
import java.util.ArrayList;
@@ -382,6 +383,11 @@ public String getWriteableName() {
382383
return TYPE;
383384
}
384385

386+
@Override
387+
public Version getMinimalSupportedVersion() {
388+
return Version.CURRENT.minimumCompatibilityVersion();
389+
}
390+
385391
public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException {
386392
return readDiffFrom(Custom.class, TYPE, in);
387393
}

server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,11 @@ public String getWriteableName() {
395395
return TYPE;
396396
}
397397

398+
@Override
399+
public Version getMinimalSupportedVersion() {
400+
return Version.CURRENT.minimumCompatibilityVersion();
401+
}
402+
398403
public static NamedDiff<Custom> readDiffFrom(StreamInput in) throws IOException {
399404
return readDiffFrom(Custom.class, TYPE, in);
400405
}

server/src/main/java/org/elasticsearch/cluster/metadata/IndexGraveyard.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.cluster.metadata;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.cluster.Diff;
2324
import org.elasticsearch.cluster.NamedDiff;
2425
import org.elasticsearch.common.ParseField;
@@ -34,8 +35,6 @@
3435
import org.elasticsearch.common.xcontent.XContentBuilder;
3536
import org.elasticsearch.common.xcontent.XContentParser;
3637
import org.elasticsearch.index.Index;
37-
import org.joda.time.DateTime;
38-
import org.joda.time.DateTimeZone;
3938

4039
import java.io.IOException;
4140
import java.util.ArrayList;
@@ -44,7 +43,6 @@
4443
import java.util.EnumSet;
4544
import java.util.List;
4645
import java.util.Objects;
47-
import java.util.concurrent.TimeUnit;
4846

4947
/**
5048
* A collection of tombstones for explicitly marking indices as deleted in the cluster state.
@@ -97,6 +95,11 @@ public String getWriteableName() {
9795
return TYPE;
9896
}
9997

98+
@Override
99+
public Version getMinimalSupportedVersion() {
100+
return Version.CURRENT.minimumCompatibilityVersion();
101+
}
102+
100103
@Override
101104
public EnumSet<MetaData.XContentContext> context() {
102105
return MetaData.API_AND_GATEWAY;

server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -793,13 +793,13 @@ public void writeTo(StreamOutput out) throws IOException {
793793
// filter out custom states not supported by the other node
794794
int numberOfCustoms = 0;
795795
for (final ObjectCursor<Custom> cursor : customs.values()) {
796-
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
796+
if (FeatureAware.shouldSerialize(out, cursor.value)) {
797797
numberOfCustoms++;
798798
}
799799
}
800800
out.writeVInt(numberOfCustoms);
801801
for (final ObjectCursor<Custom> cursor : customs.values()) {
802-
if (FeatureAware.shouldSerializeCustom(out, cursor.value)) {
802+
if (FeatureAware.shouldSerialize(out, cursor.value)) {
803803
out.writeNamedWriteable(cursor.value);
804804
}
805805
}

server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.cluster.metadata;
2121

2222
import org.elasticsearch.ElasticsearchParseException;
23+
import org.elasticsearch.Version;
2324
import org.elasticsearch.cluster.AbstractNamedDiffable;
2425
import org.elasticsearch.cluster.NamedDiff;
2526
import org.elasticsearch.cluster.metadata.MetaData.Custom;
@@ -102,6 +103,11 @@ public String getWriteableName() {
102103
return TYPE;
103104
}
104105

106+
@Override
107+
public Version getMinimalSupportedVersion() {
108+
return Version.CURRENT.minimumCompatibilityVersion();
109+
}
110+
105111
public RepositoriesMetaData(StreamInput in) throws IOException {
106112
RepositoryMetaData[] repository = new RepositoryMetaData[in.readVInt()];
107113
for (int i = 0; i < repository.length; i++) {
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.common.io.stream;
21+
22+
import org.elasticsearch.Version;
23+
24+
/**
25+
* A {@link NamedWriteable} that has a minimum version associated with it.
26+
*/
27+
public interface VersionedNamedWriteable extends NamedWriteable {
28+
29+
/**
30+
* Returns the name of the writeable object
31+
*/
32+
String getWriteableName();
33+
34+
/**
35+
* The minimal version of the recipient this object can be sent to
36+
*/
37+
Version getMinimalSupportedVersion();
38+
}

server/src/main/java/org/elasticsearch/ingest/IngestMetadata.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.ingest;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.cluster.Diff;
2324
import org.elasticsearch.cluster.DiffableUtils;
2425
import org.elasticsearch.cluster.NamedDiff;
@@ -69,6 +70,11 @@ public String getWriteableName() {
6970
return TYPE;
7071
}
7172

73+
@Override
74+
public Version getMinimalSupportedVersion() {
75+
return Version.CURRENT.minimumCompatibilityVersion();
76+
}
77+
7278
public Map<String, PipelineConfiguration> getPipelines() {
7379
return pipelines;
7480
}

0 commit comments

Comments
 (0)