From b857b316b658f93a626cf91b0f6e044f5d3445a6 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 19 Dec 2016 13:08:24 +0100 Subject: [PATCH] Add BWC layer to seq no infra and enable BWC tests (#22185) Sequence BWC logic consists of two elements: 1) Wire level BWC using stream versions. 2) A changed to the global checkpoint maintenance semantics. For the sequence number infra to work with a mixed version clusters, we have to consider situation where the primary is on an old node and replicas are on new ones (i.e., the replicas will receive operations without seq#) and also the reverse (i.e., the primary sends operations to a replica but the replica can't process the seq# and respond with local checkpoint). An new primary with an old replica is a rare because we do not allow a replica to recover from a new primary. However, it can occur if the old primary failed and a new replica was promoted or during primary relocation where the source primary is treated as a replica until the master starts the target. 1) Old Primary & New Replica - this case is easy as is taken care of by the wire level BWC. All incoming requests will have their seq# set to `UNASSIGNED_SEQ_NO`, which doesn't confuse the local checkpoint logic (keeping it at `NO_OPS_PERFORMED`) 2) New Primary & Old replica - this one is trickier as the global checkpoint service currently takes all in sync replicas into consideration for the global checkpoint calculation. In order to deal with old replicas, we change the semantics to say all *new node* in sync replicas. That means the replicas on old nodes don't count for the global checkpointing. In this state the seq# infra is not fully operational (you can't search on it, because copies may miss it) but it is maintained on shards that can support it. The old replicas will have to go through a file based recovery at some point and will get the seq# information at that point. There is still an edge case where a new primary fails and an old replica takes over. I'lll discuss this one with @ywelsch as I prefer to avoid it completely. This PR also re-enables the BWC tests which were disabled. As such it had to fix any BWC issue that had crept in. Most notably an issue with the removal of the `timestamp` field in #21670. The commit also includes a fix for the default value of the seq number field in replicated write requests (it was 0 but should be -2), that surface some other minor bugs which are fixed as well. Last - I added some debugging tools like more sane node names and forcing replication request to implement a `toString` --- .gitignore | 5 +- .../gradle/test/ClusterFormationTasks.groovy | 1 + .../action/DocWriteResponse.java | 11 +- .../indices/flush/ShardFlushRequest.java | 2 +- .../admin/indices/stats/ShardStats.java | 9 +- .../action/bulk/TransportShardBulkAction.java | 13 +- .../action/delete/TransportDeleteAction.java | 11 +- .../action/index/IndexRequest.java | 5 +- .../action/index/TransportIndexAction.java | 17 +- .../replication/BasicReplicationRequest.java | 5 + .../replication/ReplicatedWriteRequest.java | 25 ++ .../replication/ReplicationOperation.java | 2 +- .../replication/ReplicationRequest.java | 24 +- .../TransportReplicationAction.java | 41 ++- .../cluster/metadata/MappingMetaData.java | 2 +- .../seqno/GlobalCheckpointSyncAction.java | 31 +- .../cluster/IndicesClusterStateService.java | 11 +- .../ReplicationOperationTests.java | 12 +- .../TransportReplicationActionTests.java | 5 + .../TransportWriteActionTests.java | 5 + .../ESIndexLevelReplicationTestCase.java | 2 +- qa/backwards-5.0/build.gradle | 6 +- .../elasticsearch/backwards/IndexingIT.java | 332 ++++++++++++++++++ qa/rolling-upgrade/build.gradle | 2 +- .../test/cat.shards/10_basic.yaml | 4 + .../test/rest/ESRestTestCase.java | 8 +- .../test/rest/yaml/ObjectPath.java | 17 +- 27 files changed, 519 insertions(+), 89 deletions(-) create mode 100644 qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java diff --git a/.gitignore b/.gitignore index b4ec8795057e2..5d7dbbefdc8b8 100644 --- a/.gitignore +++ b/.gitignore @@ -38,11 +38,14 @@ dependency-reduced-pom.xml # osx stuff .DS_Store +# default folders in which the create_bwc_index.py expects to find old es versions in +/backwards +/dev-tools/backwards + # needed in case docs build is run...maybe we can configure doc build to generate files under build? html_docs # random old stuff that we should look at the necessity of... /tmp/ -backwards/ eclipse-build diff --git a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy index 74cae08298bcb..4c6771ccda7c8 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy @@ -268,6 +268,7 @@ class ClusterFormationTasks { static Task configureWriteConfigTask(String name, Project project, Task setup, NodeInfo node, NodeInfo seedNode) { Map esConfig = [ 'cluster.name' : node.clusterName, + 'node.name' : "node-" + node.nodeNum, 'pidfile' : node.pidFile, 'path.repo' : "${node.sharedDir}/repo", 'path.shared_data' : "${node.sharedDir}/", diff --git a/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java b/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java index 7a12ab8ace255..aef99494d9265 100644 --- a/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java +++ b/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.action; +import org.elasticsearch.Version; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.WriteResponse; @@ -214,7 +215,11 @@ public void readFrom(StreamInput in) throws IOException { type = in.readString(); id = in.readString(); version = in.readZLong(); - seqNo = in.readZLong(); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + seqNo = in.readZLong(); + } else { + seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } forcedRefresh = in.readBoolean(); result = Result.readFrom(in); } @@ -226,7 +231,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(type); out.writeString(id); out.writeZLong(version); - out.writeZLong(seqNo); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeZLong(seqNo); + } out.writeBoolean(forcedRefresh); result.writeTo(out); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java index 83eaf11ca3a9e..ac32b16eb5711 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java @@ -58,6 +58,6 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { - return "flush {" + super.toString() + "}"; + return "flush {" + shardId + "}"; } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java index 877db0579a0ca..150b7c6a52bc5 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.stats; +import org.elasticsearch.Version; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -103,7 +104,9 @@ public void readFrom(StreamInput in) throws IOException { statePath = in.readString(); dataPath = in.readString(); isCustomDataPath = in.readBoolean(); - seqNoStats = in.readOptionalWriteable(SeqNoStats::new); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + seqNoStats = in.readOptionalWriteable(SeqNoStats::new); + } } @Override @@ -114,7 +117,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(statePath); out.writeString(dataPath); out.writeBoolean(isCustomDataPath); - out.writeOptionalWriteable(seqNoStats); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeOptionalWriteable(seqNoStats); + } } @Override diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index cef89e1ce7855..86024e4dcd592 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -50,7 +50,6 @@ import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.mapper.MapperParsingException; -import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -151,7 +150,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh final long version = indexResult.getVersion(); indexRequest.version(version); indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery()); - indexRequest.seqNo(indexResult.getSeqNo()); + indexRequest.setSeqNo(indexResult.getSeqNo()); assert indexRequest.versionType().validateVersionForWrites(indexRequest.version()); response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), indexResult.getSeqNo(), indexResult.getVersion(), indexResult.isCreated()); @@ -175,7 +174,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh // update the request with the version so it will go to the replicas deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery()); deleteRequest.version(deleteResult.getVersion()); - deleteRequest.seqNo(deleteResult.getSeqNo()); + deleteRequest.setSeqNo(deleteResult.getSeqNo()); assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version()); response = new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(), deleteResult.getSeqNo(), deleteResult.getVersion(), deleteResult.isFound()); @@ -286,7 +285,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind final long version = updateOperationResult.getVersion(); indexRequest.version(version); indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery()); - indexRequest.seqNo(updateOperationResult.getSeqNo()); + indexRequest.setSeqNo(updateOperationResult.getSeqNo()); assert indexRequest.versionType().validateVersionForWrites(indexRequest.version()); } break; @@ -297,7 +296,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind // update the request with the version so it will go to the replicas deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery()); deleteRequest.version(updateOperationResult.getVersion()); - deleteRequest.seqNo(updateOperationResult.getSeqNo()); + deleteRequest.setSeqNo(updateOperationResult.getSeqNo()); assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version()); } break; @@ -349,9 +348,9 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind break; } assert (replicaRequest.request() instanceof IndexRequest - && ((IndexRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) || + && ((IndexRequest) replicaRequest.request()).getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) || (replicaRequest.request() instanceof DeleteRequest - && ((DeleteRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO); + && ((DeleteRequest) replicaRequest.request()).getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO); // successful operation break; // out of retry loop } else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) { diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 0d0d76c76919c..5601d54ea4740 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.delete; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; @@ -39,7 +40,6 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -125,12 +125,14 @@ protected DeleteResponse newResponseInstance() { protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, IndexShard primary) throws Exception { final Engine.DeleteResult result = executeDeleteRequestOnPrimary(request, primary); final DeleteResponse response; + final DeleteRequest replicaRequest; if (result.hasFailure() == false) { // update the request with the version so it will go to the replicas request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); request.version(result.getVersion()); - request.seqNo(result.getSeqNo()); + request.setSeqNo(result.getSeqNo()); assert request.versionType().validateVersionForWrites(request.version()); + replicaRequest = request; response = new DeleteResponse( primary.shardId(), request.type(), @@ -140,8 +142,9 @@ protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, Inde result.isFound()); } else { response = null; + replicaRequest = null; } - return new WritePrimaryResult(request, response, result.getTranslogLocation(), result.getFailure(), primary); + return new WritePrimaryResult(replicaRequest, response, result.getTranslogLocation(), result.getFailure(), primary); } @Override @@ -158,7 +161,7 @@ public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest re public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) { final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), - request.seqNo(), request.primaryTerm(), request.version(), request.versionType()); + request.getSeqNo(), request.primaryTerm(), request.version(), request.versionType()); return replica.delete(delete); } diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java index de80f85b89f17..5809280946c03 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -524,7 +524,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(routing); out.writeOptionalString(parent); if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { - out.writeOptionalString(null); + // Serialize a fake timestamp. 5.x expect this value to be set by the #process method so we can't use null. + // On the other hand, indices created on 5.x do not index the timestamp field. Therefore passing a 0 (or any value) for + // the transport layer OK as it will be ignored. + out.writeOptionalString("0"); out.writeOptionalWriteable(null); } out.writeBytesReference(source); diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 70220679752ec..9ed9f7f7cd11d 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -165,19 +165,22 @@ protected IndexResponse newResponseInstance() { protected WritePrimaryResult shardOperationOnPrimary(IndexRequest request, IndexShard primary) throws Exception { final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction); final IndexResponse response; + final IndexRequest replicaRequest; if (indexResult.hasFailure() == false) { // update the version on request so it will happen on the replicas final long version = indexResult.getVersion(); request.version(version); request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); - request.seqNo(indexResult.getSeqNo()); + request.setSeqNo(indexResult.getSeqNo()); assert request.versionType().validateVersionForWrites(request.version()); + replicaRequest = request; response = new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getSeqNo(), indexResult.getVersion(), indexResult.isCreated()); } else { response = null; + replicaRequest = null; } - return new WritePrimaryResult(request, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary); + return new WritePrimaryResult(replicaRequest, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary); } @Override @@ -197,9 +200,9 @@ public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest reque final Engine.Index operation; try { - operation = replica.prepareIndexOnReplica(sourceToParse, request.seqNo(), request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry()); + operation = replica.prepareIndexOnReplica(sourceToParse, request.getSeqNo(), request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry()); } catch (MapperParsingException e) { - return new Engine.IndexResult(e, request.version(), request.seqNo()); + return new Engine.IndexResult(e, request.version(), request.getSeqNo()); } Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); if (update != null) { @@ -221,7 +224,7 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque try { operation = prepareIndexOperationOnPrimary(request, primary); } catch (MapperParsingException | IllegalArgumentException e) { - return new Engine.IndexResult(e, request.version(), request.seqNo()); + return new Engine.IndexResult(e, request.version(), request.getSeqNo()); } Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); final ShardId shardId = primary.shardId(); @@ -232,12 +235,12 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update); } catch (IllegalArgumentException e) { // throws IAE on conflicts merging dynamic mappings - return new Engine.IndexResult(e, request.version(), request.seqNo()); + return new Engine.IndexResult(e, request.version(), request.getSeqNo()); } try { operation = prepareIndexOperationOnPrimary(request, primary); } catch (MapperParsingException | IllegalArgumentException e) { - return new Engine.IndexResult(e, request.version(), request.seqNo()); + return new Engine.IndexResult(e, request.version(), request.getSeqNo()); } update = operation.parsedDoc().dynamicMappingsUpdate(); if (update != null) { diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java index f431c67b2904b..b4731d19e29e4 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java @@ -37,4 +37,9 @@ public BasicReplicationRequest() { public BasicReplicationRequest(ShardId shardId) { super(shardId); } + + @Override + public String toString() { + return "BasicReplicationRequest{" + shardId + "}"; + } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java index fa02dac9e1e2d..107c791a069eb 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java @@ -19,12 +19,14 @@ package org.elasticsearch.action.support.replication; +import org.elasticsearch.Version; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ShardId; import java.io.IOException; @@ -36,6 +38,8 @@ public abstract class ReplicatedWriteRequest> extends ReplicationRequest implements WriteRequest { private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; + private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + /** * Constructor for deserialization. */ @@ -62,11 +66,32 @@ public RefreshPolicy getRefreshPolicy() { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); refreshPolicy = RefreshPolicy.readFrom(in); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + seqNo = in.readZLong(); + } else { + seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); refreshPolicy.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeZLong(seqNo); + } + } + + /** + * Returns the sequence number for this operation. The sequence number is assigned while the operation + * is performed on the primary shard. + */ + public long getSeqNo() { + return seqNo; + } + + /** sets the sequence number for this operation. should only be called on the primary shard */ + public void setSeqNo(long seqNo) { + this.seqNo = seqNo; } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 47284789850a7..25dcc29a5c3a3 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -283,7 +283,7 @@ protected List getShards(ShardId shardId, ClusterState state) { } private void decPendingAndFinishIfNeeded() { - assert pendingActions.get() > 0; + assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]"; if (pendingActions.decrementAndGet() == 0) { finish(); } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index d520b3d4e70ce..091f96c408f67 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -55,7 +55,6 @@ public abstract class ReplicationRequest(request, primary.allocationId().getId())); @@ -950,6 +951,8 @@ public void failShard(String reason, Exception e) { public PrimaryResult perform(Request request) throws Exception { PrimaryResult result = shardOperationOnPrimary(request, indexShard); if (result.replicaRequest() != null) { + assert result.finalFailure == null : "a replica request [" + result.replicaRequest() + + "] with a primary failure [" + result.finalFailure + "]"; result.replicaRequest().primaryTerm(indexShard.getPrimaryTerm()); } return result; @@ -983,16 +986,25 @@ public ReplicaResponse(String allocationId, long localCheckpoint) { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - localCheckpoint = in.readZLong(); - allocationId = in.readString(); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + super.readFrom(in); + localCheckpoint = in.readZLong(); + allocationId = in.readString(); + } else { + // 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing. + } } @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeZLong(localCheckpoint); - out.writeString(allocationId); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + super.writeTo(out); + out.writeZLong(localCheckpoint); + out.writeString(allocationId); + } else { + // we use to write empty responses + Empty.INSTANCE.writeTo(out); + } } @Override @@ -1016,10 +1028,9 @@ public void performOn(ShardRouting replica, ReplicaRequest request, ActionListen listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]")); return; } - transportService.sendRequest(node, transportReplicaAction, - new ConcreteShardRequest<>(request, replica.allocationId().getId()), transportOptions, - // Eclipse can't handle when this is <> so we specify the type here. - new ActionListenerResponseHandler(listener, ReplicaResponse::new)); + final ConcreteShardRequest concreteShardRequest = + new ConcreteShardRequest<>(request, replica.allocationId().getId()); + sendReplicaRequest(concreteShardRequest, node, listener); } @Override @@ -1060,6 +1071,14 @@ public void onFailure(Exception shardFailedError) { } } + /** sends the given replica request to the supplied nodes */ + protected void sendReplicaRequest(ConcreteShardRequest concreteShardRequest, DiscoveryNode node, + ActionListener listener) { + transportService.sendRequest(node, transportReplicaAction, concreteShardRequest, transportOptions, + // Eclipse can't handle when this is <> so we specify the type here. + new ActionListenerResponseHandler(listener, ReplicaResponse::new)); + } + /** a wrapper class to encapsulate a request when being sent to a specific allocation id **/ public static final class ConcreteShardRequest extends TransportRequest { diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java index 3ea61385f1c3f..0f9db99326da8 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java @@ -204,7 +204,7 @@ public void writeTo(StreamOutput out) throws IOException { // timestamp out.writeBoolean(false); // enabled out.writeString(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format()); - out.writeOptionalString(null); + out.writeOptionalString("now"); // 5.x default out.writeOptionalBoolean(null); } out.writeBoolean(hasParentField()); diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 8e87729831398..6e13573794d79 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -20,20 +20,21 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -41,8 +42,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.io.UncheckedIOException; -import java.io.UnsupportedEncodingException; public class GlobalCheckpointSyncAction extends TransportReplicationAction { @@ -65,6 +64,17 @@ protected ReplicationResponse newResponseInstance() { return new ReplicationResponse(); } + @Override + protected void sendReplicaRequest(ConcreteShardRequest concreteShardRequest, DiscoveryNode node, + ActionListener listener) { + if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + super.sendReplicaRequest(concreteShardRequest, node, listener); + } else { + listener.onResponse( + new ReplicaResponse(concreteShardRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO)); + } + } + @Override protected PrimaryResult shardOperationOnPrimary(PrimaryRequest request, IndexShard indexShard) throws Exception { long checkpoint = indexShard.getGlobalCheckpoint(); @@ -105,6 +115,11 @@ private PrimaryRequest() { public PrimaryRequest(ShardId shardId) { super(shardId); } + + @Override + public String toString() { + return "GlobalCkpSyncPrimary{" + shardId + "}"; + } } public static final class ReplicaRequest extends ReplicationRequest { @@ -134,6 +149,14 @@ public void writeTo(StreamOutput out) throws IOException { public long getCheckpoint() { return checkpoint; } + + @Override + public String toString() { + return "GlobalCkpSyncReplica{" + + "checkpoint=" + checkpoint + + ", shardId=" + shardId + + '}'; + } } } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index a49c2a97cb0fc..f0d72f6c4c5d2 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.store.LockObtainFailedException; import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; @@ -561,9 +562,15 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard shard.updateRoutingEntry(shardRouting); if (shardRouting.primary()) { IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId()); - Set activeIds = indexShardRoutingTable.activeShards().stream().map(r -> r.allocationId().getId()) + Set activeIds = indexShardRoutingTable.activeShards().stream() + // filter to shards that track seq# and should be taken into consideration for checkpoint tracking + // shards on old nodes will go through a file based recovery which will also transfer seq# information. + .filter(sr -> nodes.get(sr.currentNodeId()).getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) + .map(r -> r.allocationId().getId()) .collect(Collectors.toSet()); - Set initializingIds = indexShardRoutingTable.getAllInitializingShards().stream().map(r -> r.allocationId().getId()) + Set initializingIds = indexShardRoutingTable.getAllInitializingShards().stream() + .filter(sr -> nodes.get(sr.currentNodeId()).getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) + .map(r -> r.allocationId().getId()) .collect(Collectors.toSet()); shard.updateAllocationIdsFromMaster(activeIds, initializingIds); } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 47325bf5f9818..74a57f3aa91e2 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -33,8 +33,6 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.shard.IndexShardNotStartedException; @@ -42,7 +40,6 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -366,13 +363,8 @@ public Request() { } @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public String toString() { + return "Request{}"; } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 655244e286f68..4a9b70c7b9068 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -984,6 +984,11 @@ public void onRetry() { super.onRetry(); isRetrySet.set(true); } + + @Override + public String toString() { + return "Request{}"; + } } static class Response extends ReplicationResponse { diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 571bbfa72e0f7..cd71418f0e5cd 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -202,6 +202,11 @@ private static class TestRequest extends ReplicatedWriteRequest { public TestRequest() { setShardId(new ShardId("test", "test", 1)); } + + @Override + public String toString() { + return "TestRequest{}"; + } } private static class TestResponse extends ReplicationResponse implements WriteResponse { diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 2425552c24642..02b6eca43a338 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -433,7 +433,7 @@ protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest reques final long version = indexResult.getVersion(); request.version(version); request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); - request.seqNo(indexResult.getSeqNo()); + request.setSeqNo(indexResult.getSeqNo()); assert request.versionType().validateVersionForWrites(request.version()); } request.primaryTerm(primary.getPrimaryTerm()); diff --git a/qa/backwards-5.0/build.gradle b/qa/backwards-5.0/build.gradle index b151485ef3fcb..5347429f03f49 100644 --- a/qa/backwards-5.0/build.gradle +++ b/qa/backwards-5.0/build.gradle @@ -16,9 +16,9 @@ apply plugin: 'elasticsearch.rest-test' integTest { includePackaged = true cluster { - numNodes = 2 - numBwcNodes = 1 - bwcVersion = "6.0.0-alpha1-SNAPSHOT" + numNodes = 4 + numBwcNodes = 2 + bwcVersion = "5.2.0-SNAPSHOT" setting 'logger.org.elasticsearch', 'DEBUG' } } diff --git a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java new file mode 100644 index 0000000000000..87343e830e249 --- /dev/null +++ b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -0,0 +1,332 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.backwards; + +import org.apache.http.HttpHost; +import org.apache.http.entity.StringEntity; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.Version; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbersService; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.yaml.ObjectPath; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; + +public class IndexingIT extends ESRestTestCase { + + private ObjectPath objectPath(Response response) throws IOException { + String body = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + String contentType = response.getHeader("Content-Type"); + XContentType xContentType = XContentType.fromMediaTypeOrFormat(contentType); + return ObjectPath.createFromXContent(xContentType.xContent(), body); + } + + private void assertOK(Response response) { + assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201))); + } + + private void ensureGreen() throws IOException { + Map params = new HashMap<>(); + params.put("wait_for_status", "green"); + params.put("wait_for_no_relocating_shards", "true"); + assertOK(client().performRequest("GET", "_cluster/health", params)); + } + + private void createIndex(String name, Settings settings) throws IOException { + assertOK(client().performRequest("PUT", name, Collections.emptyMap(), + new StringEntity("{ \"settings\": " + Strings.toString(settings, true) + " }"))); + } + + private void updateIndexSetting(String name, Settings.Builder settings) throws IOException { + updateIndexSetting(name, settings.build()); + } + private void updateIndexSetting(String name, Settings settings) throws IOException { + assertOK(client().performRequest("PUT", name + "/_settings", Collections.emptyMap(), + new StringEntity(Strings.toString(settings, true)))); + } + + protected int indexDocs(String index, final int idStart, final int numDocs) throws IOException { + for (int i = 0; i < numDocs; i++) { + final int id = idStart + i; + assertOK(client().performRequest("PUT", index + "/test/" + id, emptyMap(), + new StringEntity("{\"test\": \"test_" + id + "\"}"))); + } + return numDocs; + } + + public void testSeqNoCheckpoints() throws Exception { + Nodes nodes = buildNodeAndVersions(); + logger.info("cluster discovered: {}", nodes.toString()); + final String bwcNames = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.joining(",")); + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2) + .put("index.routing.allocation.include._name", bwcNames); + + final boolean checkGlobalCheckpoints = nodes.getMaster().getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED); + logger.info("master version is [{}], global checkpoints will be [{}]", nodes.getMaster().getVersion(), + checkGlobalCheckpoints ? "checked" : "not be checked"); + if (checkGlobalCheckpoints) { + settings.put(IndexSettings.INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL.getKey(), "100ms"); + } + final String index = "test"; + createIndex(index, settings.build()); + try (RestClient newNodeClient = buildClient(restClientSettings(), + nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + int numDocs = indexDocs(index, 0, randomInt(5)); + assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient); + + logger.info("allowing shards on all nodes"); + updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); + ensureGreen(); + logger.info("indexing some more docs"); + numDocs += indexDocs(index, numDocs, randomInt(5)); + assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient); + logger.info("moving primary to new node"); + Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); + updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); + ensureGreen(); + logger.info("indexing some more docs"); + int numDocsOnNewPrimary = indexDocs(index, numDocs, randomInt(5)); + numDocs += numDocsOnNewPrimary; + assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); + } + } + + private void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) throws Exception { + assertBusy(() -> { + try { + List shards = buildShards(nodes, client); + Shard primaryShard = shards.stream().filter(Shard::isPrimary).findFirst().get(); + assertNotNull("failed to find primary shard", primaryShard); + final long expectedGlobalCkp; + final long expectMaxSeqNo; + logger.info("primary resolved to node {}", primaryShard.getNode()); + if (primaryShard.getNode().getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + expectMaxSeqNo = numDocs - 1; + expectedGlobalCkp = numDocs - 1; + } else { + expectedGlobalCkp = SequenceNumbersService.UNASSIGNED_SEQ_NO; + expectMaxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + } + for (Shard shard : shards) { + if (shard.getNode().getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + final SeqNoStats seqNoStats = shard.getSeqNoStats(); + logger.info("stats for {}, primary [{}]: [{}]", shard.getNode(), shard.isPrimary(), seqNoStats); + assertThat("max_seq no on " + shard.getNode() + " is wrong", seqNoStats.getMaxSeqNo(), equalTo(expectMaxSeqNo)); + assertThat("localCheckpoint no on " + shard.getNode() + " is wrong", + seqNoStats.getLocalCheckpoint(), equalTo(expectMaxSeqNo)); + if (checkGlobalCheckpoints) { + assertThat("globalCheckpoint no on " + shard.getNode() + " is wrong", + seqNoStats.getGlobalCheckpoint(), equalTo(expectedGlobalCkp)); + } + } else { + logger.info("skipping seq no test on {}", shard.getNode()); + } + } + } catch (IOException e) { + throw new AssertionError("unexpected io exception", e); + } + }); + } + + private List buildShards(Nodes nodes, RestClient client) throws IOException { + Response response = client.performRequest("GET", "test/_stats", singletonMap("level", "shards")); + List shardStats = objectPath(response).evaluate("indices.test.shards.0"); + ArrayList shards = new ArrayList<>(); + for (Object shard : shardStats) { + final String nodeId = ObjectPath.evaluate(shard, "routing.node"); + final Boolean primary = ObjectPath.evaluate(shard, "routing.primary"); + final Node node = nodes.getSafe(nodeId); + final SeqNoStats seqNoStats; + if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + Integer maxSeqNo = ObjectPath.evaluate(shard, "seq_no.max"); + Integer localCheckpoint = ObjectPath.evaluate(shard, "seq_no.local_checkpoint"); + Integer globalCheckpoint = ObjectPath.evaluate(shard, "seq_no.global_checkpoint"); + seqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); + } else { + seqNoStats = null; + } + shards.add(new Shard(node, primary, seqNoStats)); + } + return shards; + } + + private Nodes buildNodeAndVersions() throws IOException { + Response response = client().performRequest("GET", "_nodes"); + ObjectPath objectPath = objectPath(response); + Map nodesAsMap = objectPath.evaluate("nodes"); + Nodes nodes = new Nodes(); + for (String id : nodesAsMap.keySet()) { + nodes.add(new Node( + id, + objectPath.evaluate("nodes." + id + ".name"), + Version.fromString(objectPath.evaluate("nodes." + id + ".version")), + HttpHost.create(objectPath.evaluate("nodes." + id + ".http.publish_address")))); + } + response = client().performRequest("GET", "_cluster/state"); + nodes.setMasterNodeId(objectPath(response).evaluate("master_node")); + return nodes; + } + + final class Nodes extends HashMap { + + private String masterNodeId = null; + + public Node getMaster() { + return get(masterNodeId); + } + + public void setMasterNodeId(String id) { + if (get(id) == null) { + throw new IllegalArgumentException("node with id [" + id + "] not found. got:" + toString()); + } + masterNodeId = id; + } + + public void add(Node node) { + put(node.getId(), node); + } + + public List getNewNodes() { + Version bwcVersion = getBWCVersion(); + return values().stream().filter(n -> n.getVersion().after(bwcVersion)).collect(Collectors.toList()); + } + + public List getBWCNodes() { + Version bwcVersion = getBWCVersion(); + return values().stream().filter(n -> n.getVersion().equals(bwcVersion)).collect(Collectors.toList()); + } + + public Version getBWCVersion() { + if (isEmpty()) { + throw new IllegalStateException("no nodes available"); + } + return Version.fromId(values().stream().map(node -> node.getVersion().id).min(Integer::compareTo).get()); + } + + public Node getSafe(String id) { + Node node = get(id); + if (node == null) { + throw new IllegalArgumentException("node with id [" + id + "] not found"); + } + return node; + } + + @Override + public String toString() { + return "Nodes{" + + "masterNodeId='" + masterNodeId + "'\n" + + values().stream().map(Node::toString).collect(Collectors.joining("\n")) + + '}'; + } + } + + final class Node { + private final String id; + private final String nodeName; + private final Version version; + private final HttpHost publishAddress; + + Node(String id, String nodeName, Version version, HttpHost publishAddress) { + this.id = id; + this.nodeName = nodeName; + this.version = version; + this.publishAddress = publishAddress; + } + + public String getId() { + return id; + } + + public String getNodeName() { + return nodeName; + } + + public HttpHost getPublishAddress() { + return publishAddress; + } + + public Version getVersion() { + return version; + } + + @Override + public String toString() { + return "Node{" + + "id='" + id + '\'' + + ", nodeName='" + nodeName + '\'' + + ", version=" + version + + '}'; + } + } + + final class Shard { + private final Node node; + private final boolean Primary; + private final SeqNoStats seqNoStats; + + Shard(Node node, boolean primary, SeqNoStats seqNoStats) { + this.node = node; + Primary = primary; + this.seqNoStats = seqNoStats; + } + + public Node getNode() { + return node; + } + + public boolean isPrimary() { + return Primary; + } + + public SeqNoStats getSeqNoStats() { + return seqNoStats; + } + + @Override + public String toString() { + return "Shard{" + + "node=" + node + + ", Primary=" + Primary + + ", seqNoStats=" + seqNoStats + + '}'; + } + } +} diff --git a/qa/rolling-upgrade/build.gradle b/qa/rolling-upgrade/build.gradle index e17e245410820..182e6a9f7d947 100644 --- a/qa/rolling-upgrade/build.gradle +++ b/qa/rolling-upgrade/build.gradle @@ -25,7 +25,7 @@ task oldClusterTest(type: RestIntegTestTask) { mustRunAfter(precommit) cluster { distribution = 'zip' - bwcVersion = '6.0.0-alpha1-SNAPSHOT' // TODO: either randomize, or make this settable with sysprop + bwcVersion = '5.2.0-SNAPSHOT' // TODO: either randomize, or make this settable with sysprop numBwcNodes = 2 numNodes = 2 clusterName = 'rolling-upgrade' diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yaml index 8c7cd83b0e039..4a37734d28462 100755 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yaml @@ -1,5 +1,9 @@ --- "Help": + - skip: + version: " - 5.99.99" + reason: seq no stats were added in 6.0.0 + - do: cat.shards: help: true diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 0fc8cb4506b0b..975e6e2f86682 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -111,8 +111,8 @@ public void initClient() throws IOException { } clusterHosts = unmodifiableList(hosts); logger.info("initializing REST clients against {}", clusterHosts); - client = buildClient(restClientSettings()); - adminClient = buildClient(restAdminSettings()); + client = buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()])); + adminClient = buildClient(restAdminSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()])); } assert client != null; assert adminClient != null; @@ -272,8 +272,8 @@ protected String getProtocol() { return "http"; } - private RestClient buildClient(Settings settings) throws IOException { - RestClientBuilder builder = RestClient.builder(clusterHosts.toArray(new HttpHost[clusterHosts.size()])); + protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException { + RestClientBuilder builder = RestClient.builder(hosts); String keystorePath = settings.get(TRUSTSTORE_PATH); if (keystorePath != null) { final String keystorePass = settings.get(TRUSTSTORE_PASSWORD); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ObjectPath.java b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ObjectPath.java index 6311944fdcbb7..265fd7b3e8561 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ObjectPath.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ObjectPath.java @@ -46,17 +46,28 @@ public ObjectPath(Object object) { this.object = object; } + + /** + * A utility method that creates an {@link ObjectPath} via {@link #ObjectPath(Object)} returns + * the result of calling {@link #evaluate(String)} on it. + */ + public static T evaluate(Object object, String path) throws IOException { + return new ObjectPath(object).evaluate(path, Stash.EMPTY); + } + + /** * Returns the object corresponding to the provided path if present, null otherwise */ - public Object evaluate(String path) throws IOException { + public T evaluate(String path) throws IOException { return evaluate(path, Stash.EMPTY); } /** * Returns the object corresponding to the provided path if present, null otherwise */ - public Object evaluate(String path, Stash stash) throws IOException { + @SuppressWarnings("unchecked") + public T evaluate(String path, Stash stash) throws IOException { String[] parts = parsePath(path); Object object = this.object; for (String part : parts) { @@ -65,7 +76,7 @@ public Object evaluate(String path, Stash stash) throws IOException { return null; } } - return object; + return (T)object; } @SuppressWarnings("unchecked")