diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 7c5803e19c3e1..12f1d7b5a6e2c 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -681,6 +681,7 @@ + @@ -799,7 +800,6 @@ - @@ -1025,7 +1025,6 @@ - diff --git a/core/src/main/java/org/elasticsearch/action/WriteConsistencyLevel.java b/core/src/main/java/org/elasticsearch/action/WriteConsistencyLevel.java deleted file mode 100644 index 0813e85960f59..0000000000000 --- a/core/src/main/java/org/elasticsearch/action/WriteConsistencyLevel.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action; - - -/** - * Write Consistency Level control how many replicas should be active for a write operation to occur (a write operation - * can be index, or delete). - * - * - */ -public enum WriteConsistencyLevel { - DEFAULT((byte) 0), - ONE((byte) 1), - QUORUM((byte) 2), - ALL((byte) 3); - - private final byte id; - - WriteConsistencyLevel(byte id) { - this.id = id; - } - - public byte id() { - return id; - } - - public static WriteConsistencyLevel fromId(byte value) { - if (value == 0) { - return DEFAULT; - } else if (value == 1) { - return ONE; - } else if (value == 2) { - return QUORUM; - } else if (value == 3) { - return ALL; - } - throw new IllegalArgumentException("No write consistency match [" + value + "]"); - } - - public static WriteConsistencyLevel fromString(String value) { - if (value.equals("default")) { - return DEFAULT; - } else if (value.equals("one")) { - return ONE; - } else if (value.equals("quorum")) { - return QUORUM; - } else if (value.equals("all")) { - return ALL; - } - throw new IllegalArgumentException("No write consistency match [" + value + "]"); - } -} diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java index 38401fef18126..17df06dbf4b84 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequest.java @@ -466,6 +466,15 @@ public CreateIndexRequest waitForActiveShards(ActiveShardCount waitForActiveShar return this; } + /** + * A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical + * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)} + * to get the ActiveShardCount. + */ + public CreateIndexRequest waitForActiveShards(final int waitForActiveShards) { + return waitForActiveShards(ActiveShardCount.from(waitForActiveShards)); + } + @Override public void readFrom(StreamInput in) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java index 1c930b8951065..eaae4d53b73fd 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/create/CreateIndexRequestBuilder.java @@ -269,4 +269,13 @@ public CreateIndexRequestBuilder setWaitForActiveShards(ActiveShardCount waitFor request.waitForActiveShards(waitForActiveShards); return this; } + + /** + * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical + * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)} + * to get the ActiveShardCount. + */ + public CreateIndexRequestBuilder setWaitForActiveShards(final int waitForActiveShards) { + return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards)); + } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java index 3a9ec89db5da5..83eaf11ca3a9e 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.flush; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -33,6 +34,7 @@ public class ShardFlushRequest extends ReplicationRequest { public ShardFlushRequest(FlushRequest request, ShardId shardId) { super(shardId); this.request = request; + this.waitForActiveShards = ActiveShardCount.NONE; // don't wait for any active shards before proceeding, by default } public ShardFlushRequest() { diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 82fb6d70ca441..570307a717da2 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -69,11 +69,6 @@ protected ReplicaResult shardOperationOnReplica(ShardFlushRequest request) { return new ReplicaResult(); } - @Override - protected boolean checkWriteConsistency() { - return false; - } - @Override protected ClusterBlockLevel globalBlockLevel() { return ClusterBlockLevel.METADATA_WRITE; diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java index ac64e276778f6..9752e68517e15 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.replication.BasicReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction; @@ -54,7 +55,9 @@ protected ReplicationResponse newShardResponse() { @Override protected BasicReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId) { - return new BasicReplicationRequest(shardId); + BasicReplicationRequest replicationRequest = new BasicReplicationRequest(shardId); + replicationRequest.waitForActiveShards(ActiveShardCount.NONE); + return replicationRequest; } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index d7d0c289953a4..cf9f568195382 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -35,6 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; + public class TransportShardRefreshAction extends TransportReplicationAction { @@ -70,11 +71,6 @@ protected ReplicaResult shardOperationOnReplica(BasicReplicationRequest request) return new ReplicaResult(); } - @Override - protected boolean checkWriteConsistency() { - return false; - } - @Override protected ClusterBlockLevel globalBlockLevel() { return ClusterBlockLevel.METADATA_WRITE; diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java index 481a375492a8c..854611658dfe5 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequest.java @@ -225,4 +225,13 @@ public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) { this.createIndexRequest.waitForActiveShards(waitForActiveShards); } + /** + * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical + * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)} + * to get the ActiveShardCount. + */ + public void setWaitForActiveShards(final int waitForActiveShards) { + setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards)); + } + } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java index edc7aaa92d6b9..35890d1d3a6fd 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/rollover/RolloverRequestBuilder.java @@ -90,4 +90,13 @@ public RolloverRequestBuilder waitForActiveShards(ActiveShardCount waitForActive this.request.setWaitForActiveShards(waitForActiveShards); return this; } + + /** + * A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical + * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)} + * to get the ActiveShardCount. + */ + public RolloverRequestBuilder waitForActiveShards(final int waitForActiveShards) { + return waitForActiveShards(ActiveShardCount.from(waitForActiveShards)); + } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java index 9ba4acdefc60f..9cb60415a12cc 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequest.java @@ -144,6 +144,15 @@ public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) { this.getShrinkIndexRequest().waitForActiveShards(waitForActiveShards); } + /** + * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical + * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)} + * to get the ActiveShardCount. + */ + public void setWaitForActiveShards(final int waitForActiveShards) { + setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards)); + } + public void source(BytesReference source) { XContentType xContentType = XContentFactory.xContentType(source); if (xContentType != null) { diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java index 5ec1a5066ebdc..2bd10397193d5 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shrink/ShrinkRequestBuilder.java @@ -64,4 +64,13 @@ public ShrinkRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiv this.request.setWaitForActiveShards(waitForActiveShards); return this; } + + /** + * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical + * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)} + * to get the ActiveShardCount. + */ + public ShrinkRequestBuilder setWaitForActiveShards(final int waitForActiveShards) { + return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards)); + } } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index e0572344656b8..7e7aa4ce603be 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -23,10 +23,11 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.action.IndicesRequest; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -68,7 +69,7 @@ public class BulkRequest extends ActionRequest implements Composite List payloads = null; protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT; - private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; + private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; private long sizeInBytes = 0; @@ -432,15 +433,25 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null } /** - * Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT} + * Sets the number of shard copies that must be active before proceeding with the write. + * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. */ - public BulkRequest consistencyLevel(WriteConsistencyLevel consistencyLevel) { - this.consistencyLevel = consistencyLevel; + public BulkRequest waitForActiveShards(ActiveShardCount waitForActiveShards) { + this.waitForActiveShards = waitForActiveShards; return this; } - public WriteConsistencyLevel consistencyLevel() { - return this.consistencyLevel; + /** + * A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical + * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)} + * to get the ActiveShardCount. + */ + public BulkRequest waitForActiveShards(final int waitForActiveShards) { + return waitForActiveShards(ActiveShardCount.from(waitForActiveShards)); + } + + public ActiveShardCount waitForActiveShards() { + return this.waitForActiveShards; } @Override @@ -525,7 +536,7 @@ public ActionRequestValidationException validate() { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - consistencyLevel = WriteConsistencyLevel.fromId(in.readByte()); + waitForActiveShards = ActiveShardCount.readFrom(in); int size = in.readVInt(); for (int i = 0; i < size; i++) { byte type = in.readByte(); @@ -550,7 +561,7 @@ public void readFrom(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeByte(consistencyLevel.id()); + waitForActiveShards.writeTo(out); out.writeVInt(requests.size()); for (ActionRequest request : requests) { if (request instanceof IndexRequest) { diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java index 4f2b7aa702ecf..c48a8f507b862 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java @@ -20,12 +20,13 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.action.ActionRequestBuilder; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequestBuilder; +import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; @@ -111,13 +112,23 @@ public BulkRequestBuilder add(byte[] data, int from, int length, @Nullable Strin } /** - * Sets the consistency level. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}. + * Sets the number of shard copies that must be active before proceeding with the write. + * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. */ - public BulkRequestBuilder setConsistencyLevel(WriteConsistencyLevel consistencyLevel) { - request.consistencyLevel(consistencyLevel); + public BulkRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) { + request.waitForActiveShards(waitForActiveShards); return this; } + /** + * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical + * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)} + * to get the ActiveShardCount. + */ + public BulkRequestBuilder setWaitForActiveShards(final int waitForActiveShards) { + return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards)); + } + /** * A timeout to wait if the index operation can't be performed immediately. Defaults to 1m. */ diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 84c11d9fc8ec2..da080b54b2531 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -339,7 +339,7 @@ void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeN final List requests = entry.getValue(); BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, bulkRequest.getRefreshPolicy(), requests.toArray(new BulkItemRequest[requests.size()])); - bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel()); + bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards()); bulkShardRequest.timeout(bulkRequest.timeout()); if (task != null) { bulkShardRequest.setParentTask(nodeId, task.getId()); diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 4bf2bb47992fd..6f3d27ea36908 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -93,6 +93,7 @@ public void onFailure(Exception e) { @Override protected void resolveRequest(final MetaData metaData, IndexMetaData indexMetaData, DeleteRequest request) { + super.resolveRequest(metaData, indexMetaData, request); resolveAndValidateRouting(metaData, indexMetaData.getIndex().getName(), request); ShardId shardId = clusterService.operationRouting().shardId(clusterService.state(), indexMetaData.getIndex().getName(), request.id(), request.routing()); diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 8d2234aa27ea3..785a5d23157dc 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -121,6 +121,7 @@ public void onFailure(Exception e) { @Override protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, IndexRequest request) { + super.resolveRequest(metaData, indexMetaData, request); MappingMetaData mappingMd =indexMetaData.mappingOrDefault(request.type()); request.resolveRouting(metaData); request.process(mappingMd, allowIdGeneration, indexMetaData.getIndex().getName()); diff --git a/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java b/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java index 87d45f6ccd1fc..cf7d2cf1e54b2 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/IngestActionFilter.java @@ -163,7 +163,7 @@ BulkRequest getBulkRequest() { } else { BulkRequest modifiedBulkRequest = new BulkRequest(); modifiedBulkRequest.setRefreshPolicy(bulkRequest.getRefreshPolicy()); - modifiedBulkRequest.consistencyLevel(bulkRequest.consistencyLevel()); + modifiedBulkRequest.waitForActiveShards(bulkRequest.waitForActiveShards()); modifiedBulkRequest.timeout(bulkRequest.timeout()); int slot = 0; diff --git a/core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java b/core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java index 90bd0450afb81..d6648462a5589 100644 --- a/core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java +++ b/core/src/main/java/org/elasticsearch/action/support/ActiveShardCount.java @@ -30,6 +30,8 @@ import java.io.IOException; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS; + /** * A class whose instances represent a value for counting the number * of active shard copies for a given shard in an index. @@ -63,8 +65,16 @@ public static ActiveShardCount from(final int value) { return get(value); } + /** + * Validates that the instance is valid for the given number of replicas in an index. + */ + public boolean validate(final int numberOfReplicas) { + assert numberOfReplicas >= 0; + return value <= numberOfReplicas + 1; + } + private static ActiveShardCount get(final int value) { - switch (validateValue(value)) { + switch (value) { case ACTIVE_SHARD_COUNT_DEFAULT: return DEFAULT; case ALL_ACTIVE_SHARDS: @@ -74,6 +84,7 @@ private static ActiveShardCount get(final int value) { case 0: return NONE; default: + assert value > 1; return new ActiveShardCount(value); } } @@ -87,29 +98,6 @@ public static ActiveShardCount readFrom(final StreamInput in) throws IOException return get(in.readInt()); } - private static int validateValue(final int value) { - if (value < 0 && value != ACTIVE_SHARD_COUNT_DEFAULT && value != ALL_ACTIVE_SHARDS) { - throw new IllegalArgumentException("Invalid ActiveShardCount[" + value + "]"); - } - return value; - } - - /** - * Resolve this instance to an actual integer value for the number of active shard counts. - * If {@link ActiveShardCount#ALL} is specified, then the given {@link IndexMetaData} is - * used to determine what the actual active shard count should be. The default value indicates - * one active shard. - */ - public int resolve(final IndexMetaData indexMetaData) { - if (this == ActiveShardCount.DEFAULT) { - return 1; - } else if (this == ActiveShardCount.ALL) { - return indexMetaData.getNumberOfReplicas() + 1; - } else { - return value; - } - } - /** * Parses the active shard count from the given string. Valid values are "all" for * all shard copies, null for the default value (which defaults to one shard copy), @@ -154,8 +142,12 @@ public boolean enoughShardsActive(final ClusterState clusterState, final String // all primary shards aren't active yet return false; } + ActiveShardCount waitForActiveShards = this; + if (waitForActiveShards == ActiveShardCount.DEFAULT) { + waitForActiveShards = SETTING_WAIT_FOR_ACTIVE_SHARDS.get(indexMetaData.getSettings()); + } for (final IntObjectCursor shardRouting : indexRoutingTable.getShards()) { - if (enoughShardsActive(shardRouting.value, indexMetaData) == false) { + if (waitForActiveShards.enoughShardsActive(shardRouting.value) == false) { // not enough active shard copies yet return false; } @@ -167,12 +159,14 @@ public boolean enoughShardsActive(final ClusterState clusterState, final String * Returns true iff the active shard count in the shard routing table is enough * to meet the required shard count represented by this instance. */ - public boolean enoughShardsActive(final IndexShardRoutingTable shardRoutingTable, final IndexMetaData indexMetaData) { - if (shardRoutingTable.activeShards().size() < resolve(indexMetaData)) { - // not enough active shard copies yet - return false; + public boolean enoughShardsActive(final IndexShardRoutingTable shardRoutingTable) { + if (this == ActiveShardCount.ALL) { + return shardRoutingTable.allShardsStarted(); + } else if (this == ActiveShardCount.DEFAULT) { + return shardRoutingTable.primaryShard().started(); + } else { + return shardRoutingTable.activeShards().size() >= value; } - return true; } @Override @@ -194,18 +188,14 @@ public boolean equals(Object o) { @Override public String toString() { - final String valStr; switch (value) { case ALL_ACTIVE_SHARDS: - valStr = "ALL"; - break; + return "ALL"; case ACTIVE_SHARD_COUNT_DEFAULT: - valStr = "DEFAULT"; - break; + return "DEFAULT"; default: - valStr = Integer.toString(value); + return Integer.toString(value); } - return "ActiveShardCount[" + valStr + "]"; } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index dc7846a74deb7..af6d8b030ca0f 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -22,7 +22,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; -import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -68,7 +68,6 @@ public class ReplicationOperation< private final AtomicInteger pendingShards = new AtomicInteger(); private final AtomicInteger successfulShards = new AtomicInteger(); private final boolean executeOnReplicas; - private final boolean checkWriteConsistency; private final Primary primary; private final Replicas replicasProxy; private final AtomicBoolean finished = new AtomicBoolean(); @@ -80,10 +79,8 @@ public class ReplicationOperation< public ReplicationOperation(Request request, Primary primary, ActionListener listener, - boolean executeOnReplicas, boolean checkWriteConsistency, - Replicas replicas, + boolean executeOnReplicas, Replicas replicas, Supplier clusterStateSupplier, ESLogger logger, String opType) { - this.checkWriteConsistency = checkWriteConsistency; this.executeOnReplicas = executeOnReplicas; this.replicasProxy = replicas; this.primary = primary; @@ -95,12 +92,12 @@ public ReplicationOperation(Request request, Primary 2) { - // only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, - // quorum is 1 (which is what it is initialized to) - requiredNumber = (shardRoutingTable.getSize() / 2) + 1; - } else if (consistencyLevel == WriteConsistencyLevel.ALL) { - requiredNumber = shardRoutingTable.getSize(); - } else { - requiredNumber = 1; - } - } else { - sizeActive = 0; - requiredNumber = 1; - } - } else { - sizeActive = 0; - requiredNumber = 1; + assert state != null : "replication operation must have access to the cluster state"; + final ActiveShardCount waitForActiveShards = request.waitForActiveShards(); + if (waitForActiveShards == ActiveShardCount.NONE) { + return null; // not waiting for any shards } - - if (sizeActive < requiredNumber) { - logger.trace("[{}] not enough active copies to meet write consistency of [{}] (have {}, needed {}), scheduling a retry." + - " op [{}], request [{}]", shardId, consistencyLevel, sizeActive, requiredNumber, opType, request); - return "Not enough active copies to meet write consistency of [" + consistencyLevel + "] (have " + sizeActive + ", needed " - + requiredNumber + ")."; - } else { + IndexRoutingTable indexRoutingTable = state.getRoutingTable().index(indexName); + if (indexRoutingTable == null) { + logger.trace("[{}] index not found in the routing table", shardId); + return "Index " + indexName + " not found in the routing table"; + } + IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.getId()); + if (shardRoutingTable == null) { + logger.trace("[{}] shard not found in the routing table", shardId); + return "Shard " + shardId + " not found in the routing table"; + } + if (waitForActiveShards.enoughShardsActive(shardRoutingTable)) { return null; + } else { + final String resolvedShards = waitForActiveShards == ActiveShardCount.ALL ? Integer.toString(shardRoutingTable.shards().size()) + : waitForActiveShards.toString(); + logger.trace("[{}] not enough active copies to meet shard count of [{}] (have {}, needed {}), scheduling a retry. op [{}], " + + "request [{}]", shardId, waitForActiveShards, shardRoutingTable.activeShards().size(), + resolvedShards, opType, request); + return "Not enough active copies to meet shard count of [" + waitForActiveShards + "] (have " + + shardRoutingTable.activeShards().size() + ", needed " + resolvedShards + ")."; } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index adb44dd4964c7..a1ddcdcedd50b 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -22,9 +22,9 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.IndicesRequest; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -60,7 +60,10 @@ public abstract class ReplicationRequest li * @param request the request to resolve */ protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, Request request) { + if (request.waitForActiveShards() == ActiveShardCount.DEFAULT) { + // if the wait for active shard count has not been set in the request, + // resolve it from the index settings + request.waitForActiveShards(indexMetaData.getWaitForActiveShards()); + } } /** @@ -164,13 +166,6 @@ protected void resolveRequest(MetaData metaData, IndexMetaData indexMetaData, Re */ protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest); - /** - * True if write consistency should be checked for an implementation - */ - protected boolean checkWriteConsistency() { - return true; - } - /** * Cluster level block to check before request execution */ @@ -353,7 +348,7 @@ protected ReplicationOperation createRep Request request, ActionListener listener, PrimaryShardReference primaryShardReference, boolean executeOnReplicas) { return new ReplicationOperation<>(request, primaryShardReference, listener, - executeOnReplicas, checkWriteConsistency(), replicasProxy, clusterService::state, logger, actionName + executeOnReplicas, replicasProxy, clusterService::state, logger, actionName ); } } @@ -566,11 +561,9 @@ protected void doRun() { } // resolve all derived request fields, so we can route and apply it - if (request.consistencyLevel() == WriteConsistencyLevel.DEFAULT) { - request.consistencyLevel(defaultWriteConsistencyLevel); - } resolveRequest(state.metaData(), indexMetaData, request); assert request.shardId() != null : "request shardId must be set in resolveRequest"; + assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest"; final ShardRouting primary = primary(state); if (retryIfUnavailable(state, primary)) { diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 036004615999c..919a3cb90b4c6 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -131,7 +131,7 @@ protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult .setRefreshPolicy(request.getRefreshPolicy()) .routing(request.routing()) .parent(request.parent()) - .consistencyLevel(request.consistencyLevel()); + .waitForActiveShards(request.waitForActiveShards()); if (request.versionType() != VersionType.INTERNAL) { // in all but the internal versioning mode, we want to create the new document using the given version. indexRequest.version(request.version()).versionType(request.versionType()); @@ -224,14 +224,14 @@ protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) .source(updatedSourceAsMap, updateSourceContentType) .version(updateVersion).versionType(request.versionType()) - .consistencyLevel(request.consistencyLevel()) + .waitForActiveShards(request.waitForActiveShards()) .timestamp(timestamp).ttl(ttl) .setRefreshPolicy(request.getRefreshPolicy()); return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType); } else if ("delete".equals(operation)) { DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) .version(updateVersion).versionType(request.versionType()) - .consistencyLevel(request.consistencyLevel()) + .waitForActiveShards(request.waitForActiveShards()) .setRefreshPolicy(request.getRefreshPolicy()); return new Result(deleteRequest, DocWriteResponse.Result.DELETED, updatedSourceAsMap, updateSourceContentType); } else if ("none".equals(operation)) { diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 43218dc44fa00..0d919ff0892cd 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -21,9 +21,10 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.DocumentRequest; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseFieldMatcher; @@ -74,7 +75,7 @@ public class UpdateRequest extends InstanceShardOperationRequest private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; - private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; + private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT; private IndexRequest upsertRequest; @@ -433,18 +434,28 @@ public RefreshPolicy getRefreshPolicy() { return refreshPolicy; } - public WriteConsistencyLevel consistencyLevel() { - return this.consistencyLevel; + public ActiveShardCount waitForActiveShards() { + return this.waitForActiveShards; } /** - * Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT} + * Sets the number of shard copies that must be active before proceeding with the write. + * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. */ - public UpdateRequest consistencyLevel(WriteConsistencyLevel consistencyLevel) { - this.consistencyLevel = consistencyLevel; + public UpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) { + this.waitForActiveShards = waitForActiveShards; return this; } + /** + * A shortcut for {@link #waitForActiveShards(ActiveShardCount)} where the numerical + * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)} + * to get the ActiveShardCount. + */ + public UpdateRequest waitForActiveShards(final int waitForActiveShards) { + return waitForActiveShards(ActiveShardCount.from(waitForActiveShards)); + } + /** * Sets the doc to use for updates when a script is not specified. */ @@ -703,7 +714,7 @@ public UpdateRequest scriptedUpsert(boolean scriptedUpsert) { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - consistencyLevel = WriteConsistencyLevel.fromId(in.readByte()); + waitForActiveShards = ActiveShardCount.readFrom(in); type = in.readString(); id = in.readString(); routing = in.readOptionalString(); @@ -738,7 +749,7 @@ public void readFrom(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeByte(consistencyLevel.id()); + waitForActiveShards.writeTo(out); out.writeString(type); out.writeString(id); out.writeOptionalString(routing); diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java index 403f4265fcdf4..f2d80bfe66e8f 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java @@ -19,9 +19,10 @@ package org.elasticsearch.action.update; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.WriteRequestBuilder; +import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequestBuilder; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.bytes.BytesReference; @@ -122,13 +123,23 @@ public UpdateRequestBuilder setVersionType(VersionType versionType) { } /** - * Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT} + * Sets the number of shard copies that must be active before proceeding with the write. + * See {@link ReplicationRequest#waitForActiveShards(ActiveShardCount)} for details. */ - public UpdateRequestBuilder setConsistencyLevel(WriteConsistencyLevel consistencyLevel) { - request.consistencyLevel(consistencyLevel); + public UpdateRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) { + request.waitForActiveShards(waitForActiveShards); return this; } + /** + * A shortcut for {@link #setWaitForActiveShards(ActiveShardCount)} where the numerical + * shard count is passed in, instead of having to first call {@link ActiveShardCount#from(int)} + * to get the ActiveShardCount. + */ + public UpdateRequestBuilder setWaitForActiveShards(final int waitForActiveShards) { + return setWaitForActiveShards(ActiveShardCount.from(waitForActiveShards)); + } + /** * Sets the doc to use for updates when a script is not specified. */ diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index b49b893f232a1..0af860953f0d4 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -25,6 +25,7 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.Version; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.Diffable; import org.elasticsearch.cluster.DiffableUtils; @@ -219,6 +220,16 @@ public static State fromString(String state) { public static final Setting INDEX_ROUTING_INITIAL_RECOVERY_GROUP_SETTING = Setting.groupSetting("index.routing.allocation.initial_recovery."); // this is only setable internally not a registered setting!! + /** + * The number of active shard copies to check for before proceeding with a write operation. + */ + public static final Setting SETTING_WAIT_FOR_ACTIVE_SHARDS = + new Setting<>("index.write.wait_for_active_shards", + "1", + ActiveShardCount::parseString, + Setting.Property.Dynamic, + Setting.Property.IndexScope); + public static final IndexMetaData PROTO = IndexMetaData.builder("") .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) .numberOfShards(1).numberOfReplicas(0).build(); @@ -266,12 +277,14 @@ public static State fromString(String state) { private final Version indexUpgradedVersion; private final org.apache.lucene.util.Version minimumCompatibleLuceneVersion; + private final ActiveShardCount waitForActiveShards; + private IndexMetaData(Index index, long version, long[] primaryTerms, State state, int numberOfShards, int numberOfReplicas, Settings settings, ImmutableOpenMap mappings, ImmutableOpenMap aliases, ImmutableOpenMap customs, ImmutableOpenIntMap> activeAllocationIds, DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters initialRecoveryFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters, Version indexCreatedVersion, Version indexUpgradedVersion, org.apache.lucene.util.Version minimumCompatibleLuceneVersion, - int routingNumShards) { + int routingNumShards, ActiveShardCount waitForActiveShards) { this.index = index; this.version = version; @@ -295,6 +308,7 @@ private IndexMetaData(Index index, long version, long[] primaryTerms, State stat this.minimumCompatibleLuceneVersion = minimumCompatibleLuceneVersion; this.routingNumShards = routingNumShards; this.routingFactor = routingNumShards / numberOfShards; + this.waitForActiveShards = waitForActiveShards; assert numberOfShards * routingFactor == routingNumShards : routingNumShards + " must be a multiple of " + numberOfShards; } @@ -378,6 +392,14 @@ public int getTotalNumberOfShards() { return totalNumberOfShards; } + /** + * Returns the configured {@link #SETTING_WAIT_FOR_ACTIVE_SHARDS}, which defaults + * to an active shard count of 1 if not specified. + */ + public ActiveShardCount getWaitForActiveShards() { + return waitForActiveShards; + } + public Settings getSettings() { return settings; } @@ -973,10 +995,17 @@ public IndexMetaData build() { + "] but should be equal to number of shards [" + numberOfShards() + "]"); } + final ActiveShardCount waitForActiveShards = SETTING_WAIT_FOR_ACTIVE_SHARDS.get(settings); + if (waitForActiveShards.validate(numberOfReplicas) == false) { + throw new IllegalArgumentException("invalid " + SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey() + + "[" + waitForActiveShards + "]: cannot be greater than " + + "number of shard copies [" + (numberOfReplicas + 1) + "]"); + } + final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE); return new IndexMetaData(new Index(index, uuid), version, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(), tmpAliases.build(), customs.build(), filledActiveAllocationIds.build(), requireFilters, initialRecoveryFilters, includeFilters, excludeFilters, - indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion, getRoutingNumShards()); + indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion, getRoutingNumShards(), waitForActiveShards); } public static void toXContent(IndexMetaData indexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 3e5e0e9223717..04316cbc63817 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardsObserver; import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterState; @@ -347,7 +348,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { .setRoutingNumShards(routingNumShards); // Set up everything, now locally create the index to see that things are ok, and apply final IndexMetaData tmpImd = tmpImdBuilder.settings(actualIndexSettings).build(); - if (request.waitForActiveShards().resolve(tmpImd) > tmpImd.getNumberOfReplicas() + 1) { + ActiveShardCount waitForActiveShards = request.waitForActiveShards(); + if (waitForActiveShards == ActiveShardCount.DEFAULT) { + waitForActiveShards = tmpImd.getWaitForActiveShards(); + } + if (waitForActiveShards.validate(tmpImd.getNumberOfReplicas()) == false) { throw new IllegalArgumentException("invalid wait_for_active_shards[" + request.waitForActiveShards() + "]: cannot be greater than number of shard copies [" + (tmpImd.getNumberOfReplicas() + 1) + "]"); diff --git a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index 43db77a1e9538..370a6c07668cb 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -140,6 +140,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS_SETTING, FsDirectoryService.INDEX_LOCK_FACTOR_SETTING, EngineConfig.INDEX_CODEC_SETTING, + IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS, // validate that built-in similarities don't get redefined Setting.groupSetting("index.similarity.", (s) -> { Map groups = s.getAsGroups(); diff --git a/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java b/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java index 623af6d2f478c..a5a9ec4034454 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java @@ -19,11 +19,11 @@ package org.elasticsearch.rest.action.bulk; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkShardRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.Requests; import org.elasticsearch.common.Strings; @@ -79,9 +79,9 @@ public void handleRequest(final RestRequest request, final RestChannel channel, String defaultPipeline = request.param("pipeline"); String[] defaultFields = fieldsParam != null ? Strings.commaDelimitedListToStringArray(fieldsParam) : null; - String consistencyLevel = request.param("consistency"); - if (consistencyLevel != null) { - bulkRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel)); + String waitForActiveShards = request.param("wait_for_active_shards"); + if (waitForActiveShards != null) { + bulkRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); } bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); bulkRequest.setRefreshPolicy(request.param("refresh")); diff --git a/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java b/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java index 2f9b10096cc1c..869dca8dce040 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java @@ -19,8 +19,8 @@ package org.elasticsearch.rest.action.delete; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -55,9 +55,9 @@ public void handleRequest(final RestRequest request, final RestChannel channel, deleteRequest.version(RestActions.parseVersion(request)); deleteRequest.versionType(VersionType.fromString(request.param("version_type"), deleteRequest.versionType())); - String consistencyLevel = request.param("consistency"); - if (consistencyLevel != null) { - deleteRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel)); + String waitForActiveShards = request.param("wait_for_active_shards"); + if (waitForActiveShards != null) { + deleteRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); } client.delete(deleteRequest, new RestStatusToXContentListener<>(channel)); diff --git a/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java index a4222adacd79c..ef5f02d4b1622 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java @@ -19,8 +19,8 @@ package org.elasticsearch.rest.action.index; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -98,9 +98,9 @@ public void handleRequest(final RestRequest request, final RestChannel channel, } } } - String consistencyLevel = request.param("consistency"); - if (consistencyLevel != null) { - indexRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel)); + String waitForActiveShards = request.param("wait_for_active_shards"); + if (waitForActiveShards != null) { + indexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); } client.index(indexRequest, new RestStatusToXContentListener<>(channel, r -> r.getLocation(indexRequest.routing()))); } diff --git a/core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java b/core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java index cfe26d35c56ee..5fbdd29f9635a 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java @@ -19,8 +19,8 @@ package org.elasticsearch.rest.action.update; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.Strings; @@ -53,9 +53,9 @@ public void handleRequest(final RestRequest request, final RestChannel channel, updateRequest.parent(request.param("parent")); updateRequest.timeout(request.paramAsTime("timeout", updateRequest.timeout())); updateRequest.setRefreshPolicy(request.param("refresh")); - String consistencyLevel = request.param("consistency"); - if (consistencyLevel != null) { - updateRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel)); + String waitForActiveShards = request.param("wait_for_active_shards"); + if (waitForActiveShards != null) { + updateRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); } updateRequest.docAsUpsert(request.paramAsBoolean("doc_as_upsert", updateRequest.docAsUpsert())); String sField = request.param("fields"); diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java index 7231bee0bef12..57adec340ac68 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java @@ -50,6 +50,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked; @@ -475,4 +476,34 @@ public void testCreateShrinkIndexFails() throws Exception { ensureGreen(); assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); } + + /** + * This test ensures that index creation adheres to the {@link IndexMetaData#SETTING_WAIT_FOR_ACTIVE_SHARDS}. + */ + public void testDefaultWaitForActiveShardsUsesIndexSetting() throws Exception { + final String indexName = "test"; + final int numReplicas = internalCluster().numDataNodes(); + Settings settings = Settings.builder() + .put(SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(numReplicas)) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), numReplicas) + .build(); + assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(settings).get()); + assertAcked(client().admin().indices().prepareDelete(indexName)); + + // all should fail + settings = Settings.builder() + .put(settings) + .put(SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), "all") + .build(); + assertFalse(client().admin().indices().prepareCreate(indexName).setSettings(settings).setTimeout("100ms").get().isShardsAcked()); + assertAcked(client().admin().indices().prepareDelete(indexName)); + + // the numeric equivalent of all should also fail + settings = Settings.builder() + .put(settings) + .put(SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(numReplicas + 1)) + .build(); + assertFalse(client().admin().indices().prepareCreate(indexName).setSettings(settings).setTimeout("100ms").get().isShardsAcked()); + } } diff --git a/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java b/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java index 8be67cb0fcf2b..a8699dd3ea796 100644 --- a/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.index; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.VersionType; import org.elasticsearch.test.ESTestCase; @@ -131,4 +132,13 @@ public void testValidateTTL() { assertThat(validate, notNullValue()); assertThat(validate.getMessage(), containsString("ttl must not be negative")); } + + public void testWaitForActiveShards() { + IndexRequest request = new IndexRequest("index", "type"); + final int count = randomIntBetween(0, 10); + request.waitForActiveShards(ActiveShardCount.from(count)); + assertEquals(request.waitForActiveShards(), ActiveShardCount.from(count)); + // test negative shard count value not allowed + expectThrows(IllegalArgumentException.class, () -> request.waitForActiveShards(ActiveShardCount.from(randomIntBetween(-10, -1)))); + } } diff --git a/core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java b/core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java index 83f0b1332c7da..eb8e9680ce400 100644 --- a/core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/ActiveShardCountTests.java @@ -37,8 +37,6 @@ import java.io.IOException; import java.nio.ByteBuffer; -import static org.hamcrest.Matchers.equalTo; - /** * Tests for the {@link ActiveShardCount} class */ @@ -47,41 +45,10 @@ public class ActiveShardCountTests extends ESTestCase { public void testFromIntValue() { assertSame(ActiveShardCount.from(0), ActiveShardCount.NONE); final int value = randomIntBetween(1, 50); - IndexMetaData indexMetaData = IndexMetaData.builder("test") - .settings(settings(Version.CURRENT)) - .numberOfShards(1) - .numberOfReplicas(0) - .build(); - assertEquals(ActiveShardCount.from(value).resolve(indexMetaData), value); + assertEquals(ActiveShardCount.from(value).toString(), Integer.toString(value)); expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.from(randomIntBetween(-10, -1))); } - public void testResolve() { - // one shard - IndexMetaData indexMetaData = IndexMetaData.builder("test") - .settings(settings(Version.CURRENT)) - .numberOfShards(1) - .numberOfReplicas(0) - .build(); - assertThat(ActiveShardCount.ALL.resolve(indexMetaData), equalTo(1)); - assertThat(ActiveShardCount.DEFAULT.resolve(indexMetaData), equalTo(1)); - assertThat(ActiveShardCount.NONE.resolve(indexMetaData), equalTo(0)); - final int value = randomIntBetween(2, 20); - assertThat(ActiveShardCount.from(value).resolve(indexMetaData), equalTo(value)); - - // more than one shard - final int numNewShards = randomIntBetween(1, 20); - indexMetaData = IndexMetaData.builder("test") - .settings(settings(Version.CURRENT)) - .numberOfShards(1) - .numberOfReplicas(numNewShards) - .build(); - assertThat(ActiveShardCount.ALL.resolve(indexMetaData), equalTo(numNewShards + 1)); - assertThat(ActiveShardCount.DEFAULT.resolve(indexMetaData), equalTo(1)); - assertThat(ActiveShardCount.NONE.resolve(indexMetaData), equalTo(0)); - assertThat(ActiveShardCount.from(value).resolve(indexMetaData), equalTo(value)); - } - public void testSerialization() throws IOException { doWriteRead(ActiveShardCount.ALL); doWriteRead(ActiveShardCount.DEFAULT); @@ -101,6 +68,14 @@ public void testParseString() { expectThrows(IllegalArgumentException.class, () -> ActiveShardCount.parseString(randomIntBetween(-10, -3) + "")); } + public void testValidate() { + assertTrue(ActiveShardCount.parseString("all").validate(randomIntBetween(0, 10))); + final int numReplicas = randomIntBetween(0, 10); + assertTrue(ActiveShardCount.from(randomIntBetween(0, numReplicas + 1)).validate(numReplicas)); + // invalid values shouldn't validate + assertFalse(ActiveShardCount.from(numReplicas + randomIntBetween(2, 10)).validate(numReplicas)); + } + private void doWriteRead(ActiveShardCount activeShardCount) throws IOException { final BytesStreamOutput out = new BytesStreamOutput(); activeShardCount.writeTo(out); @@ -119,15 +94,11 @@ public void testEnoughShardsActiveZero() { final String indexName = "test-idx"; final int numberOfShards = randomIntBetween(1, 5); final int numberOfReplicas = randomIntBetween(4, 7); - final ActiveShardCount waitForActiveShards = ActiveShardCount.from(0); + final ActiveShardCount waitForActiveShards = ActiveShardCount.NONE; ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas); assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); clusterState = startPrimaries(clusterState, indexName); assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); - clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards); - assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); - clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards); - assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); clusterState = startAllShards(clusterState, indexName); assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); } @@ -145,14 +116,15 @@ public void testEnoughShardsActiveRandom() { final String indexName = "test-idx"; final int numberOfShards = randomIntBetween(1, 5); final int numberOfReplicas = randomIntBetween(4, 7); - final ActiveShardCount waitForActiveShards = ActiveShardCount.from(randomIntBetween(2, numberOfReplicas)); + final int activeShardCount = randomIntBetween(2, numberOfReplicas); + final ActiveShardCount waitForActiveShards = ActiveShardCount.from(activeShardCount); ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas); assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); clusterState = startPrimaries(clusterState, indexName); assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); - clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards); + clusterState = startLessThanWaitOnShards(clusterState, indexName, activeShardCount - 2); assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); - clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards); + clusterState = startWaitOnShards(clusterState, indexName, activeShardCount - 1); assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); clusterState = startAllShards(clusterState, indexName); assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); @@ -168,7 +140,7 @@ public void testEnoughShardsActiveLevelAll() { assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); clusterState = startPrimaries(clusterState, indexName); assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); - clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards); + clusterState = startLessThanWaitOnShards(clusterState, indexName, numberOfReplicas - randomIntBetween(1, numberOfReplicas)); assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); clusterState = startAllShards(clusterState, indexName); assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); @@ -184,10 +156,6 @@ private void runTestForOneActiveShard(final ActiveShardCount activeShardCount) { assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName)); clusterState = startPrimaries(clusterState, indexName); assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); - clusterState = startLessThanWaitOnShards(clusterState, indexName, waitForActiveShards); - assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); - clusterState = startWaitOnShards(clusterState, indexName, waitForActiveShards); - assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); clusterState = startAllShards(clusterState, indexName); assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName)); } @@ -223,16 +191,15 @@ private ClusterState startPrimaries(final ClusterState clusterState, final Strin return ClusterState.builder(clusterState).routingTable(routingTable).build(); } - private ClusterState startLessThanWaitOnShards(final ClusterState clusterState, final String indexName, - final ActiveShardCount waitForActiveShards) { + private ClusterState startLessThanWaitOnShards(final ClusterState clusterState, final String indexName, final int numShardsToStart) { RoutingTable routingTable = clusterState.routingTable(); IndexRoutingTable indexRoutingTable = routingTable.index(indexName); IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); for (final ObjectCursor shardEntry : indexRoutingTable.getShards().values()) { final IndexShardRoutingTable shardRoutingTable = shardEntry.value; assert shardRoutingTable.getSize() > 2; + int numToStart = numShardsToStart; // want less than half, and primary is already started - int numToStart = waitForActiveShards.resolve(clusterState.metaData().index(indexName)) - 2; for (ShardRouting shardRouting : shardRoutingTable.getShards()) { if (shardRouting.primary()) { assertTrue(shardRouting.active()); @@ -250,15 +217,14 @@ private ClusterState startLessThanWaitOnShards(final ClusterState clusterState, return ClusterState.builder(clusterState).routingTable(routingTable).build(); } - private ClusterState startWaitOnShards(final ClusterState clusterState, final String indexName, - final ActiveShardCount waitForActiveShards) { + private ClusterState startWaitOnShards(final ClusterState clusterState, final String indexName, final int numShardsToStart) { RoutingTable routingTable = clusterState.routingTable(); IndexRoutingTable indexRoutingTable = routingTable.index(indexName); IndexRoutingTable.Builder newIndexRoutingTable = IndexRoutingTable.builder(indexRoutingTable.getIndex()); for (final ObjectCursor shardEntry : indexRoutingTable.getShards().values()) { final IndexShardRoutingTable shardRoutingTable = shardEntry.value; assert shardRoutingTable.getSize() > 2; - int numToStart = waitForActiveShards.resolve(clusterState.metaData().index(indexName)) - 1; // primary is already started + int numToStart = numShardsToStart; for (ShardRouting shardRouting : shardRoutingTable.getShards()) { if (shardRouting.primary()) { assertTrue(shardRouting.active()); diff --git a/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java b/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java index 1486df298ba63..5f52293d7eef6 100644 --- a/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java @@ -67,7 +67,7 @@ public void testCreateIndexNoActiveShardsNoWaiting() throws Exception { Settings settings = settingsBuilder.build(); CreateIndexResponse response = prepareCreate("test-idx") .setSettings(settings) - .setWaitForActiveShards(ActiveShardCount.from(0)) + .setWaitForActiveShards(ActiveShardCount.NONE) .get(); assertTrue(response.isAcknowledged()); } @@ -83,7 +83,7 @@ public void testCreateIndexNotEnoughActiveShardsTimesOut() throws Exception { final String indexName = "test-idx"; assertFalse(prepareCreate(indexName) .setSettings(settings) - .setWaitForActiveShards(ActiveShardCount.from(randomIntBetween(numDataNodes + 1, numReplicas + 1))) + .setWaitForActiveShards(randomIntBetween(numDataNodes + 1, numReplicas + 1)) .setTimeout("100ms") .get() .isShardsAcked()); @@ -97,8 +97,9 @@ public void testCreateIndexEnoughActiveShards() throws Exception { .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), randomIntBetween(1, 5)) .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), internalCluster().numDataNodes() + randomIntBetween(0, 3)) .build(); - ActiveShardCount waitForActiveShards = ActiveShardCount.from(randomIntBetween(0, internalCluster().numDataNodes())); - assertAcked(prepareCreate(indexName).setSettings(settings).setWaitForActiveShards(waitForActiveShards).get()); + assertAcked(prepareCreate(indexName).setSettings(settings) + .setWaitForActiveShards(randomIntBetween(0, internalCluster().numDataNodes())) + .get()); } public void testCreateIndexWaitsForAllActiveShards() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelIT.java b/core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java similarity index 74% rename from core/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelIT.java rename to core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java index 067f7e115306e..fbeac88dbe104 100644 --- a/core/src/test/java/org/elasticsearch/consistencylevel/WriteConsistencyLevelIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/WaitActiveShardCountIT.java @@ -17,10 +17,9 @@ * under the License. */ -package org.elasticsearch.consistencylevel; +package org.elasticsearch.action.support; import org.elasticsearch.action.UnavailableShardsException; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.cluster.health.ClusterHealthStatus; @@ -35,26 +34,25 @@ import static org.hamcrest.Matchers.equalTo; /** - * + * Tests setting the active shard count for replication operations (e.g. index) operates correctly. */ -public class WriteConsistencyLevelIT extends ESIntegTestCase { - public void testWriteConsistencyLevelReplication2() throws Exception { +public class WaitActiveShardCountIT extends ESIntegTestCase { + public void testReplicationWaitsForActiveShardCount() throws Exception { CreateIndexResponse createIndexResponse = - prepareCreate("test", 1, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 2)) - .get(); + prepareCreate("test", 1, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 2)).get(); assertAcked(createIndexResponse); - // indexing, by default, will work (ONE consistency level) - client().prepareIndex("test", "type1", "1").setSource(source("1", "test")).setConsistencyLevel(WriteConsistencyLevel.ONE).execute().actionGet(); + // indexing, by default, will work (waiting for one shard copy only) + client().prepareIndex("test", "type1", "1").setSource(source("1", "test")).execute().actionGet(); try { client().prepareIndex("test", "type1", "1").setSource(source("1", "test")) - .setConsistencyLevel(WriteConsistencyLevel.QUORUM) + .setWaitForActiveShards(2) // wait for 2 active shard copies .setTimeout(timeValueMillis(100)).execute().actionGet(); - fail("can't index, does not match consistency"); + fail("can't index, does not enough active shard copies"); } catch (UnavailableShardsException e) { assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); - assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [QUORUM] (have 1, needed 2). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]")); + assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [2] (have 1, needed 2). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]")); // but really, all is well } @@ -71,19 +69,19 @@ public void testWriteConsistencyLevelReplication2() throws Exception { assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); - // this should work, since we now have + // this should work, since we now have two client().prepareIndex("test", "type1", "1").setSource(source("1", "test")) - .setConsistencyLevel(WriteConsistencyLevel.QUORUM) + .setWaitForActiveShards(2) .setTimeout(timeValueSeconds(1)).execute().actionGet(); try { client().prepareIndex("test", "type1", "1").setSource(source("1", "test")) - .setConsistencyLevel(WriteConsistencyLevel.ALL) + .setWaitForActiveShards(ActiveShardCount.ALL) .setTimeout(timeValueMillis(100)).execute().actionGet(); - fail("can't index, does not match consistency"); + fail("can't index, not enough active shard copies"); } catch (UnavailableShardsException e) { assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE)); - assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet write consistency of [ALL] (have 2, needed 3). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]")); + assertThat(e.getMessage(), equalTo("[test][0] Not enough active copies to meet shard count of [" + ActiveShardCount.ALL + "] (have 2, needed 3). Timeout: [100ms], request: [index {[test][type1][1], source[{ \"type1\" : { \"id\" : \"1\", \"name\" : \"test\" } }]}]")); // but really, all is well } @@ -93,9 +91,9 @@ public void testWriteConsistencyLevelReplication2() throws Exception { assertThat(clusterHealth.isTimedOut(), equalTo(false)); assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); - // this should work, since we now have + // this should work, since we now have all shards started client().prepareIndex("test", "type1", "1").setSource(source("1", "test")) - .setConsistencyLevel(WriteConsistencyLevel.ALL) + .setWaitForActiveShards(ActiveShardCount.ALL) .setTimeout(timeValueSeconds(1)).execute().actionGet(); } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 9f41f0e37c263..8353f6dbacc4b 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -22,7 +22,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; -import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.cluster.ClusterState; @@ -136,7 +136,7 @@ public void testReplicationWithShadowIndex() throws Exception { Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); final TestReplicationOperation op = new TestReplicationOperation(request, - new TestPrimary(primaryShard, primaryTerm), listener, false, false, + new TestPrimary(primaryShard, primaryTerm), listener, false, new TestReplicaProxy(), () -> state, logger, "test"); op.execute(); assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); @@ -251,34 +251,17 @@ public Result perform(Request request) throws Exception { assertThat(request.processedOnReplicas, equalTo(expectedReplicas)); } - public void testWriteConsistency() throws Exception { + public void testWaitForActiveShards() throws Exception { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); final int assignedReplicas = randomInt(2); final int unassignedReplicas = randomInt(2); final int totalShards = 1 + assignedReplicas + unassignedReplicas; - final boolean passesWriteConsistency; - Request request = new Request(shardId).consistencyLevel(randomFrom(WriteConsistencyLevel.values())); - switch (request.consistencyLevel()) { - case ONE: - passesWriteConsistency = true; - break; - case DEFAULT: - case QUORUM: - if (totalShards <= 2) { - passesWriteConsistency = true; // primary is enough - } else { - passesWriteConsistency = assignedReplicas + 1 >= (totalShards / 2) + 1; - } - // we have to reset default (as the transport replication action will do) - request.consistencyLevel(WriteConsistencyLevel.QUORUM); - break; - case ALL: - passesWriteConsistency = unassignedReplicas == 0; - break; - default: - throw new RuntimeException("unknown consistency level [" + request.consistencyLevel() + "]"); - } + final int activeShardCount = randomIntBetween(0, totalShards); + Request request = new Request(shardId).waitForActiveShards( + activeShardCount == totalShards ? ActiveShardCount.ALL : ActiveShardCount.from(activeShardCount)); + final boolean passesActiveShardCheck = activeShardCount <= assignedReplicas + 1; + ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas]; for (int i = 0; i < assignedReplicas; i++) { replicaStates[i] = randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING); @@ -288,10 +271,10 @@ public void testWriteConsistency() throws Exception { } final ClusterState state = state(index, true, ShardRoutingState.STARTED, replicaStates); - logger.debug("using consistency level of [{}], assigned shards [{}], total shards [{}]." + + logger.debug("using active shard count of [{}], assigned shards [{}], total shards [{}]." + " expecting op to [{}]. using state: \n{}", - request.consistencyLevel(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas, - passesWriteConsistency ? "succeed" : "retry", + request.waitForActiveShards(), 1 + assignedReplicas, 1 + assignedReplicas + unassignedReplicas, + passesActiveShardCheck ? "succeed" : "retry", state.prettyPrint()); final long primaryTerm = state.metaData().index(index).primaryTerm(shardId.id()); final IndexShardRoutingTable shardRoutingTable = state.routingTable().index(index).shard(shardId.id()); @@ -299,17 +282,17 @@ public void testWriteConsistency() throws Exception { final ShardRouting primaryShard = shardRoutingTable.primaryShard(); final TestReplicationOperation op = new TestReplicationOperation(request, new TestPrimary(primaryShard, primaryTerm), - listener, randomBoolean(), true, new TestReplicaProxy(), () -> state, logger, "test"); + listener, randomBoolean(), new TestReplicaProxy(), () -> state, logger, "test"); - if (passesWriteConsistency) { - assertThat(op.checkWriteConsistency(), nullValue()); + if (passesActiveShardCheck) { + assertThat(op.checkActiveShardCount(), nullValue()); op.execute(); - assertTrue("operations should have been performed, consistency level is met", + assertTrue("operations should have been performed, active shard count is met", request.processedOnPrimary.get()); } else { - assertThat(op.checkWriteConsistency(), notNullValue()); + assertThat(op.checkActiveShardCount(), notNullValue()); op.execute(); - assertFalse("operations should not have been perform, consistency level is *NOT* met", + assertFalse("operations should not have been perform, active shard count is *NOT* met", request.processedOnPrimary.get()); assertListenerThrows("should throw exception to trigger retry", listener, UnavailableShardsException.class); } @@ -347,6 +330,7 @@ public Request() { this(); this.shardId = shardId; this.index = shardId.getIndexName(); + this.waitForActiveShards = ActiveShardCount.NONE; // keep things simple } @@ -458,13 +442,13 @@ public void failShard(ShardRouting replica, ShardRouting primary, String message class TestReplicationOperation extends ReplicationOperation { public TestReplicationOperation(Request request, Primary primary, ActionListener listener, Replicas replicas, Supplier clusterStateSupplier) { - this(request, primary, listener, true, false, replicas, clusterStateSupplier, ReplicationOperationTests.this.logger, "test"); + this(request, primary, listener, true, replicas, clusterStateSupplier, ReplicationOperationTests.this.logger, "test"); } public TestReplicationOperation(Request request, Primary primary, - ActionListener listener, boolean executeOnReplicas, boolean checkWriteConsistency, + ActionListener listener, boolean executeOnReplicas, Replicas replicas, Supplier clusterStateSupplier, ESLogger logger, String opType) { - super(request, primary, listener, executeOnReplicas, checkWriteConsistency, replicas, clusterStateSupplier, logger, opType); + super(request, primary, listener, executeOnReplicas, replicas, clusterStateSupplier, logger, opType); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index de2ddabb0fe5e..bca17fb143bd7 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.transport.NoNodeAvailableException; import org.elasticsearch.cluster.ClusterState; @@ -84,6 +85,7 @@ import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.hamcrest.CoreMatchers.containsString; @@ -677,6 +679,37 @@ protected ReplicaResult shardOperationOnReplica(Request request) { assertIndexShardCounter(0); } + /** + * This test ensures that replication operations adhere to the {@link IndexMetaData#SETTING_WAIT_FOR_ACTIVE_SHARDS} setting + * when the request is using the default value for waitForActiveShards. + */ + public void testDefaultWaitForActiveShardsUsesIndexSetting() throws Exception { + final String indexName = "test"; + final ShardId shardId = new ShardId(indexName, "_na_", 0); + + // test wait_for_active_shards index setting used when the default is set on the request + int numReplicas = randomIntBetween(0, 5); + int idxSettingWaitForActiveShards = randomIntBetween(0, numReplicas + 1); + ClusterState state = stateWithActivePrimary(indexName, randomBoolean(), numReplicas); + IndexMetaData indexMetaData = state.metaData().index(indexName); + Settings indexSettings = Settings.builder().put(indexMetaData.getSettings()) + .put(SETTING_WAIT_FOR_ACTIVE_SHARDS.getKey(), Integer.toString(idxSettingWaitForActiveShards)) + .build(); + MetaData.Builder metaDataBuilder = MetaData.builder(state.metaData()) + .put(IndexMetaData.builder(indexMetaData).settings(indexSettings).build(), true); + state = ClusterState.builder(state).metaData(metaDataBuilder).build(); + setState(clusterService, state); + Request request = new Request(shardId).waitForActiveShards(ActiveShardCount.DEFAULT); // set to default so index settings are used + action.resolveRequest(state.metaData(), state.metaData().index(indexName), request); + assertEquals(ActiveShardCount.from(idxSettingWaitForActiveShards), request.waitForActiveShards()); + + // test wait_for_active_shards when default not set on the request (request value should be honored over index setting) + int requestWaitForActiveShards = randomIntBetween(0, numReplicas + 1); + request = new Request(shardId).waitForActiveShards(ActiveShardCount.from(requestWaitForActiveShards)); + action.resolveRequest(state.metaData(), state.metaData().index(indexName), request); + assertEquals(ActiveShardCount.from(requestWaitForActiveShards), request.waitForActiveShards()); + } + private void assertIndexShardCounter(int expected) { assertThat(count.get(), equalTo(expected)); } @@ -719,6 +752,7 @@ public Request() { this(); this.shardId = shardId; this.index = shardId.getIndexName(); + this.waitForActiveShards = ActiveShardCount.NONE; // keep things simple } @@ -765,11 +799,6 @@ protected ReplicaResult shardOperationOnReplica(Request request) { return new ReplicaResult(); } - @Override - protected boolean checkWriteConsistency() { - return false; - } - @Override protected boolean resolveIndex() { return false; @@ -815,7 +844,7 @@ protected void acquireReplicaOperationLock(ShardId shardId, long primaryTerm, Ac class NoopReplicationOperation extends ReplicationOperation { public NoopReplicationOperation(Request request, ActionListener listener) { - super(request, null, listener, true, true, null, null, TransportReplicationActionTests.this.logger, "noop"); + super(request, null, listener, true, null, null, TransportReplicationActionTests.this.logger, "noop"); } @Override diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index d561536d2b9db..2bd2fe6e50d36 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -398,7 +398,7 @@ class IndexingOp extends ReplicationOperation listener, ReplicationGroup replicationGroup) { - super(request, new PrimaryRef(replicationGroup), listener, true, false, new ReplicasRef(replicationGroup), + super(request, new PrimaryRef(replicationGroup), listener, true, new ReplicasRef(replicationGroup), () -> null, logger, "indexing"); this.replicationGroup = replicationGroup; request.process(null, true, request.index()); @@ -409,6 +409,10 @@ protected List getShards(ShardId shardId, ClusterState state) { return replicationGroup.shardRoutings(); } + @Override + protected String checkActiveShardCount() { + return null; + } } private static class PrimaryRef implements ReplicationOperation.Primary { diff --git a/core/src/test/java/org/elasticsearch/indexing/IndexActionIT.java b/core/src/test/java/org/elasticsearch/indexing/IndexActionIT.java index ad54055666446..a174f807756a2 100644 --- a/core/src/test/java/org/elasticsearch/indexing/IndexActionIT.java +++ b/core/src/test/java/org/elasticsearch/indexing/IndexActionIT.java @@ -226,4 +226,5 @@ public void testInvalidIndexName() { e.getMessage().contains("Invalid index name [..], must not be \'.\' or '..'"), equalTo(true)); } } + } diff --git a/core/src/test/java/org/elasticsearch/search/basic/SearchWhileCreatingIndexIT.java b/core/src/test/java/org/elasticsearch/search/basic/SearchWhileCreatingIndexIT.java index 53ac2bc045ae6..96f6f055a3289 100644 --- a/core/src/test/java/org/elasticsearch/search/basic/SearchWhileCreatingIndexIT.java +++ b/core/src/test/java/org/elasticsearch/search/basic/SearchWhileCreatingIndexIT.java @@ -54,12 +54,8 @@ public void testTwoReplicas() throws Exception { private void searchWhileCreatingIndex(boolean createIndex, int numberOfReplicas) throws Exception { - // make sure we have enough nodes to guaranty default QUORUM consistency. - // TODO: add a smarter choice based on actual consistency (when that is randomized) - int shardsNo = numberOfReplicas + 1; - int neededNodes = shardsNo <= 2 ? 1 : shardsNo / 2 + 1; - internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(neededNodes, shardsNo)); - + // TODO: randomize the wait for active shards value on index creation and ensure the appropriate + // number of data nodes are started for the randomized active shard count value String id = randomAsciiOfLength(5); // we will go the primary or the replica, but in a // randomized re-creatable manner diff --git a/core/src/test/java/org/elasticsearch/search/basic/TransportSearchFailuresIT.java b/core/src/test/java/org/elasticsearch/search/basic/TransportSearchFailuresIT.java index bacc6d791d46d..76cfa610a6d1d 100644 --- a/core/src/test/java/org/elasticsearch/search/basic/TransportSearchFailuresIT.java +++ b/core/src/test/java/org/elasticsearch/search/basic/TransportSearchFailuresIT.java @@ -20,7 +20,6 @@ package org.elasticsearch.search.basic; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; @@ -118,7 +117,7 @@ public void testFailedSearchWithWrongQuery() throws Exception { } private void index(Client client, String id, String nameValue, int age) throws IOException { - client.index(Requests.indexRequest("test").type("type1").id(id).source(source(id, nameValue, age)).consistencyLevel(WriteConsistencyLevel.ONE)).actionGet(); + client.index(Requests.indexRequest("test").type("type1").id(id).source(source(id, nameValue, age))).actionGet(); } private XContentBuilder source(String id, String nameValue, int age) throws IOException { diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 39e26f382ca3a..b2fd95e4bb8dc 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -29,7 +29,6 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; @@ -47,7 +46,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.elect.ElectMasterService; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.ttl.IndicesTTLService; @@ -690,11 +688,10 @@ public void sendResponse(RestResponse response) { @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/12621") public void testChaosSnapshot() throws Exception { final List indices = new CopyOnWriteArrayList<>(); - Settings settings = Settings.builder().put("action.write_consistency", "one").build(); int initialNodes = between(1, 3); logger.info("--> start {} nodes", initialNodes); for (int i = 0; i < initialNodes; i++) { - internalCluster().startNode(settings); + internalCluster().startNode(); } logger.info("--> creating repository"); @@ -713,7 +710,7 @@ public void testChaosSnapshot() throws Exception { int asyncNodes = between(0, 5); logger.info("--> start {} additional nodes asynchronously", asyncNodes); - InternalTestCluster.Async> asyncNodesFuture = internalCluster().startNodesAsync(asyncNodes, settings); + InternalTestCluster.Async> asyncNodesFuture = internalCluster().startNodesAsync(asyncNodes); int asyncIndices = between(0, 10); logger.info("--> create {} additional indices asynchronously", asyncIndices); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index ce1e504fab386..0178d2e1fb68b 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -215,7 +215,7 @@ void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.Respon return; } request.timeout(mainRequest.getTimeout()); - request.consistencyLevel(mainRequest.getConsistency()); + request.waitForActiveShards(mainRequest.getWaitForActiveShards()); if (logger.isDebugEnabled()) { logger.debug("sending [{}] entry, [{}] bulk request", request.requests().size(), new ByteSizeValue(request.estimatedSizeInBytes())); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java index f7bcc9d1d6ce1..266116570e654 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBaseReindexRestHandler.java @@ -21,7 +21,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.GenericAction; -import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -108,9 +108,9 @@ protected Request setCommonOptions(RestRequest restRequest, Request request) { request.setRefresh(restRequest.paramAsBoolean("refresh", request.isRefresh())); request.setTimeout(restRequest.paramAsTime("timeout", request.getTimeout())); - String consistency = restRequest.param("consistency"); - if (consistency != null) { - request.setConsistency(WriteConsistencyLevel.fromString(consistency)); + String waitForActiveShards = restRequest.param("wait_for_active_shards"); + if (waitForActiveShards != null) { + request.setWaitForActiveShards(ActiveShardCount.parseString(waitForActiveShards)); } Float requestsPerSecond = parseRequestsPerSecond(restRequest); diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java index 50c9b25bc5bda..e34029bffab40 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractBulkByScrollRequest.java @@ -21,8 +21,8 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -71,9 +71,9 @@ public abstract class AbstractBulkByScrollRequest request) { request.setAbortOnVersionConflict(random().nextBoolean()); request.setRefresh(rarely()); request.setTimeout(TimeValue.parseTimeValue(randomTimeValue(), null, "test")); - request.setConsistency(randomFrom(WriteConsistencyLevel.values())); + request.setWaitForActiveShards(randomIntBetween(0, 10)); request.setScript(random().nextBoolean() ? null : randomScript()); request.setRequestsPerSecond(between(0, Integer.MAX_VALUE)); } @@ -116,7 +115,7 @@ private void assertRequestEquals(AbstractBulkIndexByScrollRequest request, assertEquals(request.isAbortOnVersionConflict(), tripped.isAbortOnVersionConflict()); assertEquals(request.isRefresh(), tripped.isRefresh()); assertEquals(request.getTimeout(), tripped.getTimeout()); - assertEquals(request.getConsistency(), tripped.getConsistency()); + assertEquals(request.getWaitForActiveShards(), tripped.getWaitForActiveShards()); assertEquals(request.getScript(), tripped.getScript()); assertEquals(request.getRetryBackoffInitialTime(), tripped.getRetryBackoffInitialTime()); assertEquals(request.getMaxRetries(), tripped.getMaxRetries()); @@ -234,7 +233,7 @@ private void assertResponseEquals(BulkIndexByScrollResponse expected, BulkIndexB assertEquals(expectedFailure.getReason().getClass(), actualFailure.getReason().getClass()); assertEquals(expectedFailure.getReason().getMessage(), actualFailure.getReason().getMessage()); } - + } private void assertTaskStatusEquals(BulkByScrollTask.Status expected, BulkByScrollTask.Status actual) { diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/50_consistency.yaml b/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/50_wait_for_active_shards.yaml similarity index 79% rename from modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/50_consistency.yaml rename to modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/50_wait_for_active_shards.yaml index bc594dce296dd..522cd351cd087 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/50_consistency.yaml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/delete_by_query/50_wait_for_active_shards.yaml @@ -1,5 +1,5 @@ --- -"can override consistency": +"can override wait_for_active_shards": - do: indices.create: index: test @@ -12,7 +12,6 @@ type: test id: 1 body: {"text": "test"} - consistency: one - do: indices.refresh: {} @@ -21,12 +20,13 @@ delete_by_query: index: test timeout: 1s + wait_for_active_shards: 4 body: query: match_all: {} - match: - failures.0.cause.reason: /Not.enough.active.copies.to.meet.write.consistency.of.\[QUORUM\].\(have.1,.needed.4\)..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[test\].containing.\[1\].requests\]/ + failures.0.cause.reason: /Not.enough.active.copies.to.meet.shard.count.of.\[4\].\(have.1,.needed.4\)..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[test\].containing.\[1\].requests\]/ - do: indices.refresh: {} @@ -40,7 +40,7 @@ - do: delete_by_query: index: test - consistency: one + wait_for_active_shards: 1 body: query: match_all: {} diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/60_consistency.yaml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/60_wait_for_active_shards.yaml similarity index 77% rename from modules/reindex/src/test/resources/rest-api-spec/test/reindex/60_consistency.yaml rename to modules/reindex/src/test/resources/rest-api-spec/test/reindex/60_wait_for_active_shards.yaml index 54e6e6df0df86..d3446243746b7 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/60_consistency.yaml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/60_wait_for_active_shards.yaml @@ -1,5 +1,5 @@ --- -"can override consistency": +"can override wait_for_active_shards": - do: indices.create: index: dest @@ -12,7 +12,6 @@ type: test id: 1 body: {"text": "test"} - consistency: one - do: indices.refresh: {} @@ -20,17 +19,18 @@ catch: unavailable reindex: timeout: 1s + wait_for_active_shards: 4 body: source: index: src dest: index: dest - match: - failures.0.cause.reason: /Not.enough.active.copies.to.meet.write.consistency.of.\[QUORUM\].\(have.1,.needed.4\)\..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[dest\].containing.\[1\].requests\]/ + failures.0.cause.reason: /Not.enough.active.copies.to.meet.shard.count.of.\[4\].\(have.1,.needed.4\)\..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[dest\].containing.\[1\].requests\]/ - do: reindex: - consistency: one + wait_for_active_shards: 1 body: source: index: src diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/50_consistency.yaml b/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/50_consistency.yaml index 591815fb5c43f..bb373866416f6 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/50_consistency.yaml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/update_by_query/50_consistency.yaml @@ -1,5 +1,5 @@ --- -"can override consistency": +"can override wait_for_active_shards": - do: indices.create: index: test @@ -12,7 +12,6 @@ type: test id: 1 body: {"text": "test"} - consistency: one - do: indices.refresh: {} @@ -20,14 +19,15 @@ catch: unavailable update_by_query: index: test + wait_for_active_shards: 4 timeout: 1s - match: - failures.0.cause.reason: /Not.enough.active.copies.to.meet.write.consistency.of.\[QUORUM\].\(have.1,.needed.4\)..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[test\].containing.\[1\].requests\]/ + failures.0.cause.reason: /Not.enough.active.copies.to.meet.shard.count.of.\[4\].\(have.1,.needed.4\)..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[test\].containing.\[1\].requests\]/ - do: update_by_query: index: test - consistency: one + wait_for_active_shards: 1 - match: {failures: []} - match: {updated: 1} - match: {version_conflicts: 0} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json b/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json index a75daf35204e3..27d951fa41b34 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json @@ -16,10 +16,9 @@ } }, "params": { - "consistency": { - "type" : "enum", - "options" : ["one", "quorum", "all"], - "description" : "Explicit write consistency setting for the operation" + "wait_for_active_shards": { + "type" : "string", + "description" : "Sets the number of shard copies that must be active before proceeding with the bulk operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)" }, "refresh": { "type" : "enum", diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/delete.json b/rest-api-spec/src/main/resources/rest-api-spec/api/delete.json index 5bb0e3fed4c50..389d00c670622 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/delete.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/delete.json @@ -23,10 +23,9 @@ } }, "params": { - "consistency": { - "type" : "enum", - "options" : ["one", "quorum", "all"], - "description" : "Specific write consistency setting for the operation" + "wait_for_active_shards": { + "type" : "string", + "description" : "Sets the number of shard copies that must be active before proceeding with the delete operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)" }, "parent": { "type" : "string", diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/delete_by_query.json b/rest-api-spec/src/main/resources/rest-api-spec/api/delete_by_query.json index e6ebc7628b7b9..f8743e1d1ba11 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/delete_by_query.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/delete_by_query.json @@ -177,10 +177,9 @@ "default": "1m", "description" : "Time each individual bulk request should wait for shards that are unavailable." }, - "consistency": { - "type" : "enum", - "options" : ["one", "quorum", "all"], - "description" : "Explicit write consistency setting for the operation" + "wait_for_active_shards": { + "type" : "string", + "description" : "Sets the number of shard copies that must be active before proceeding with the delete by query operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)" }, "scroll_size": { "type": "integer", diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/index.json b/rest-api-spec/src/main/resources/rest-api-spec/api/index.json index b7f7eeb9ef531..b46e3fcf116c8 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/index.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/index.json @@ -22,10 +22,9 @@ } }, "params": { - "consistency": { - "type" : "enum", - "options" : ["one", "quorum", "all"], - "description" : "Explicit write consistency setting for the operation" + "wait_for_active_shards": { + "type" : "string", + "description" : "Sets the number of shard copies that must be active before proceeding with the index operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)" }, "op_type": { "type" : "enum", diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/reindex.json b/rest-api-spec/src/main/resources/rest-api-spec/api/reindex.json index 1ff8f7e03c4ef..5fb4fe58db37f 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/reindex.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/reindex.json @@ -16,10 +16,9 @@ "default": "1m", "description" : "Time each individual bulk request should wait for shards that are unavailable." }, - "consistency": { - "type" : "enum", - "options" : ["one", "quorum", "all"], - "description" : "Explicit write consistency setting for the operation" + "wait_for_active_shards": { + "type" : "string", + "description" : "Sets the number of shard copies that must be active before proceeding with the reindex operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)" }, "wait_for_completion": { "type" : "boolean", diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/update.json b/rest-api-spec/src/main/resources/rest-api-spec/api/update.json index d91b900314881..a18d081f9b7f9 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/update.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/update.json @@ -23,10 +23,9 @@ } }, "params": { - "consistency": { - "type": "enum", - "options": ["one", "quorum", "all"], - "description": "Explicit write consistency setting for the operation" + "wait_for_active_shards": { + "type": "string", + "description": "Sets the number of shard copies that must be active before proceeding with the update operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)" }, "fields": { "type": "list", diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/update_by_query.json b/rest-api-spec/src/main/resources/rest-api-spec/api/update_by_query.json index b945c3bc659a5..72149adc66338 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/update_by_query.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/update_by_query.json @@ -185,10 +185,9 @@ "default": "1m", "description" : "Time each individual bulk request should wait for shards that are unavailable." }, - "consistency": { - "type" : "enum", - "options" : ["one", "quorum", "all"], - "description" : "Explicit write consistency setting for the operation" + "wait_for_active_shards": { + "type" : "string", + "description" : "Sets the number of shard copies that must be active before proceeding with the update by query operation. Defaults to 1, meaning the primary shard only. Set to `all` for all shard copies, otherwise set to any non-negative value less than or equal to the total number of copies for the shard (number of replicas + 1)" }, "scroll_size": { "type": "integer", diff --git a/test/framework/src/test/java/org/elasticsearch/test/rest/yaml/restspec/ClientYamlSuiteRestApiParserTests.java b/test/framework/src/test/java/org/elasticsearch/test/rest/yaml/restspec/ClientYamlSuiteRestApiParserTests.java index 342f2bb4ed2c1..8396e343a566a 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/rest/yaml/restspec/ClientYamlSuiteRestApiParserTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/rest/yaml/restspec/ClientYamlSuiteRestApiParserTests.java @@ -45,7 +45,7 @@ public void testParseRestSpecIndexApi() throws Exception { assertThat(restApi.getPathParts().get(1), equalTo("index")); assertThat(restApi.getPathParts().get(2), equalTo("type")); assertThat(restApi.getParams().size(), equalTo(4)); - assertThat(restApi.getParams(), contains("consistency", "op_type", "parent", "refresh")); + assertThat(restApi.getParams(), contains("wait_for_active_shards", "op_type", "parent", "refresh")); assertThat(restApi.isBodySupported(), equalTo(true)); assertThat(restApi.isBodyRequired(), equalTo(true)); } @@ -163,10 +163,9 @@ public void testParseRestSpecCountApi() throws Exception { " }\n" + " } ,\n" + " \"params\": {\n" + - " \"consistency\": {\n" + - " \"type\" : \"enum\",\n" + - " \"options\" : [\"one\", \"quorum\", \"all\"],\n" + - " \"description\" : \"Explicit write consistency setting for the operation\"\n" + + " \"wait_for_active_shards\": {\n" + + " \"type\" : \"string\",\n" + + " \"description\" : \"The number of active shard copies required to perform the operation\"\n" + " },\n" + " \"op_type\": {\n" + " \"type\" : \"enum\",\n" +