Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Index creation waits for write consistency shards #18985

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions core/src/main/java/org/elasticsearch/action/ActionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.action;

import java.util.function.Consumer;

/**
* A listener for action responses or failures.
*/
Expand All @@ -33,4 +35,31 @@ public interface ActionListener<Response> {
* A failure caused by an exception at some phase of the task.
*/
void onFailure(Exception e);

/**
* Creates a listener that listens for a response (or failure) and executes the
* corresponding consumer when the response (or failure) is received.
*
* @param onResponse the consumer of the response, when the listener receives one
* @param onFailure the consumer of the failure, when the listener receives one
* @param <Response> the type of the response
* @return a listener that listens for responses and invokes the consumer when received
*/
static <Response> ActionListener<Response> wrap(Consumer<Response> onResponse, Consumer<Exception> onFailure) {
return new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
onResponse.accept(response);
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
onFailure.accept(e);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.create;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ack.ClusterStateUpdateRequest;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -55,6 +56,8 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ

private final Set<ClusterBlock> blocks = new HashSet<>();

private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;


public CreateIndexClusterStateUpdateRequest(TransportMessage originalMessage, String cause, String index, boolean updateAllTypes) {
this.originalMessage = originalMessage;
Expand Down Expand Up @@ -98,6 +101,11 @@ public CreateIndexClusterStateUpdateRequest shrinkFrom(Index shrinkFrom) {
return this;
}

public CreateIndexClusterStateUpdateRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}

public TransportMessage originalMessage() {
return originalMessage;
}
Expand Down Expand Up @@ -142,4 +150,8 @@ public Index shrinkFrom() {
public boolean updateAllTypes() {
return updateAllTypes;
}

public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -77,6 +78,8 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>

private boolean updateAllTypes = false;

private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;

public CreateIndexRequest() {
}

Expand Down Expand Up @@ -440,6 +443,30 @@ public CreateIndexRequest updateAllTypes(boolean updateAllTypes) {
return this;
}

public ActiveShardCount waitForActiveShards() {
return waitForActiveShards;
}

/**
* Sets the number of shard copies that should be active for index creation to return.
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
* wait for all shards (primary and all replicas) to be active before returning.
* Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Index creation will only wait up until the timeout value for the number of shard copies
* to be active before returning. Check {@link CreateIndexResponse#isShardsAcked()} to
* determine if the requisite shard copies were all started before returning or timing out.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public CreateIndexRequest waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.waitForActiveShards = waitForActiveShards;
return this;
}


@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -462,6 +489,7 @@ public void readFrom(StreamInput in) throws IOException {
aliases.add(Alias.read(in));
}
updateAllTypes = in.readBoolean();
waitForActiveShards = ActiveShardCount.readFrom(in);
}

@Override
Expand All @@ -486,5 +514,6 @@ public void writeTo(StreamOutput out) throws IOException {
alias.writeTo(out);
}
out.writeBoolean(updateAllTypes);
waitForActiveShards.writeTo(out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.create;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -249,4 +250,23 @@ public CreateIndexRequestBuilder setUpdateAllTypes(boolean updateAllTypes) {
request.updateAllTypes(updateAllTypes);
return this;
}

/**
* Sets the number of shard copies that should be active for index creation to return.
* Defaults to {@link ActiveShardCount#DEFAULT}, which will wait for one shard copy
* (the primary) to become active. Set this value to {@link ActiveShardCount#ALL} to
* wait for all shards (primary and all replicas) to be active before returning.
* Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Index creation will only wait up until the timeout value for the number of shard copies
* to be active before returning. Check {@link CreateIndexResponse#isShardsAcked()} to
* determine if the requisite shard copies were all started before returning or timing out.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public CreateIndexRequestBuilder setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
request.waitForActiveShards(waitForActiveShards);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;

Expand All @@ -30,22 +31,41 @@
*/
public class CreateIndexResponse extends AcknowledgedResponse {

private boolean shardsAcked;

protected CreateIndexResponse() {
}

protected CreateIndexResponse(boolean acknowledged) {
protected CreateIndexResponse(boolean acknowledged, boolean shardsAcked) {
super(acknowledged);
assert acknowledged || shardsAcked == false; // if its not acknowledged, then shards acked should be false too
this.shardsAcked = shardsAcked;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
readAcknowledged(in);
shardsAcked = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
writeAcknowledged(out);
out.writeBoolean(shardsAcked);
}

/**
* Returns true if the requisite number of shards were started before
* returning from the index creation operation. If {@link #isAcknowledged()}
* is false, then this also returns false.
*/
public boolean isShardsAcked() {
return shardsAcked;
}

public void addCustomFields(XContentBuilder builder) throws IOException {
builder.field("shards_acknowledged", isShardsAcked());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -77,24 +75,12 @@ protected void masterOperation(final CreateIndexRequest request, final ClusterSt
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, indexName, request.updateAllTypes())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases()).customs(request.customs());
.aliases(request.aliases()).customs(request.customs())
.waitForActiveShards(request.waitForActiveShards());

createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {

@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new CreateIndexResponse(response.isAcknowledged()));
}

@Override
public void onFailure(Exception t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create", t, request.index());
} else {
logger.debug("[{}] failed to create", t, request.index());
}
listener.onFailure(t);
}
});
createIndexService.createIndex(updateRequest, ActionListener.wrap(response ->
listener.onResponse(new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcked())),
listener::onFailure));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we lost the logging here, did we get them somewhere else?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind, I saw them later on

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.ParseField;
Expand Down Expand Up @@ -206,4 +207,22 @@ public void source(BytesReference source) {
}
}

/**
* Sets the number of shard copies that should be active for creation of the
* new rollover index to return. Defaults to {@link ActiveShardCount#DEFAULT}, which will
* wait for one shard copy (the primary) to become active. Set this value to
* {@link ActiveShardCount#ALL} to wait for all shards (primary and all replicas) to be active
* before returning. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Index creation will only wait up until the timeout value for the number of shard copies
* to be active before returning. Check {@link RolloverResponse#isShardsAcked()} to
* determine if the requisite shard copies were all started before returning or timing out.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public void setWaitForActiveShards(ActiveShardCount waitForActiveShards) {
this.createIndexRequest.waitForActiveShards(waitForActiveShards);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.indices.rollover;

import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -70,4 +71,23 @@ public RolloverRequestBuilder mapping(String type, String source) {
this.request.getCreateIndexRequest().mapping(type, source);
return this;
}

/**
* Sets the number of shard copies that should be active for creation of the
* new rollover index to return. Defaults to {@link ActiveShardCount#DEFAULT}, which will
* wait for one shard copy (the primary) to become active. Set this value to
* {@link ActiveShardCount#ALL} to wait for all shards (primary and all replicas) to be active
* before returning. Otherwise, use {@link ActiveShardCount#from(int)} to set this value to any
* non-negative integer, up to the number of copies per shard (number of replicas + 1),
* to wait for the desired amount of shard copies to become active before returning.
* Index creation will only wait up until the timeout value for the number of shard copies
* to be active before returning. Check {@link RolloverResponse#isShardsAcked()} to
* determine if the requisite shard copies were all started before returning or timing out.
*
* @param waitForActiveShards number of active shard copies to wait on
*/
public RolloverRequestBuilder waitForActiveShards(ActiveShardCount waitForActiveShards) {
this.request.setWaitForActiveShards(waitForActiveShards);
return this;
}
}
Loading