Skip to content

Bootstrap a new history_uuid when force allocating a stale primary #33432

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RecoverySource.EmptyStoreRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.ExistingStoreRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.LocalShardsRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -386,7 +387,7 @@ private Builder initializeAsRestore(IndexMetaData indexMetaData, SnapshotRecover
if (asNew && ignoreShards.contains(shardNumber)) {
// This shards wasn't completely snapshotted - restore it as new shard
indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId, primary,
primary ? StoreRecoverySource.EMPTY_STORE_INSTANCE : PeerRecoverySource.INSTANCE, unassignedInfo));
primary ? EmptyStoreRecoverySource.INSTANCE : PeerRecoverySource.INSTANCE, unassignedInfo));
} else {
indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(shardId, primary,
primary ? recoverySource : PeerRecoverySource.INSTANCE, unassignedInfo));
Expand All @@ -410,13 +411,13 @@ private Builder initializeEmpty(IndexMetaData indexMetaData, UnassignedInfo unas
final RecoverySource primaryRecoverySource;
if (indexMetaData.inSyncAllocationIds(shardNumber).isEmpty() == false) {
// we have previous valid copies for this shard. use them for recovery
primaryRecoverySource = StoreRecoverySource.EXISTING_STORE_INSTANCE;
primaryRecoverySource = ExistingStoreRecoverySource.INSTANCE;
} else if (indexMetaData.getResizeSourceIndex() != null) {
// this is a new index but the initial shards should merged from another index
primaryRecoverySource = LocalShardsRecoverySource.INSTANCE;
} else {
// a freshly created index with no restriction
primaryRecoverySource = StoreRecoverySource.EMPTY_STORE_INSTANCE;
primaryRecoverySource = EmptyStoreRecoverySource.INSTANCE;
}
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
for (int i = 0; i <= indexMetaData.getNumberOfReplicas(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
/**
* Represents the recovery source of a shard. Available recovery types are:
*
* - {@link StoreRecoverySource} recovery from the local store (empty or with existing data)
* - {@link EmptyStoreRecoverySource} recovery from an empty store
* - {@link ExistingStoreRecoverySource} recovery from an existing store
* - {@link PeerRecoverySource} recovery from a primary on another node
* - {@link SnapshotRecoverySource} recovery from a snapshot
* - {@link LocalShardsRecoverySource} recovery from other shards of another index on the same node
Expand All @@ -59,8 +60,8 @@ public void addAdditionalFields(XContentBuilder builder, ToXContent.Params param
public static RecoverySource readFrom(StreamInput in) throws IOException {
Type type = Type.values()[in.readByte()];
switch (type) {
case EMPTY_STORE: return StoreRecoverySource.EMPTY_STORE_INSTANCE;
case EXISTING_STORE: return StoreRecoverySource.EXISTING_STORE_INSTANCE;
case EMPTY_STORE: return EmptyStoreRecoverySource.INSTANCE;
case EXISTING_STORE: return new ExistingStoreRecoverySource(in);
case PEER: return PeerRecoverySource.INSTANCE;
case SNAPSHOT: return new SnapshotRecoverySource(in);
case LOCAL_SHARDS: return LocalShardsRecoverySource.INSTANCE;
Expand Down Expand Up @@ -91,6 +92,10 @@ public enum Type {

public abstract Type getType();

public boolean shouldBootstrapNewHistoryUUID() {
return false;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -107,25 +112,68 @@ public int hashCode() {
}

/**
* recovery from an existing on-disk store or a fresh copy
* Recovery from a fresh copy
*/
public abstract static class StoreRecoverySource extends RecoverySource {
public static final StoreRecoverySource EMPTY_STORE_INSTANCE = new StoreRecoverySource() {
@Override
public Type getType() {
return Type.EMPTY_STORE;
public static final class EmptyStoreRecoverySource extends RecoverySource {
public static final EmptyStoreRecoverySource INSTANCE = new EmptyStoreRecoverySource();

@Override
public Type getType() {
return Type.EMPTY_STORE;
}

@Override
public String toString() {
return "new shard recovery";
}
}

/**
* Recovery from an existing on-disk store
*/
public static final class ExistingStoreRecoverySource extends RecoverySource {
public static final ExistingStoreRecoverySource INSTANCE = new ExistingStoreRecoverySource(false);
public static final ExistingStoreRecoverySource FORCE_STALE_PRIMARY_INSTANCE = new ExistingStoreRecoverySource(true);

private final boolean bootstrapNewHistoryUUID;

private ExistingStoreRecoverySource(boolean bootstrapNewHistoryUUID) {
this.bootstrapNewHistoryUUID = bootstrapNewHistoryUUID;
}

private ExistingStoreRecoverySource(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
bootstrapNewHistoryUUID = in.readBoolean();
} else {
bootstrapNewHistoryUUID = false;
}
};
public static final StoreRecoverySource EXISTING_STORE_INSTANCE = new StoreRecoverySource() {
@Override
public Type getType() {
return Type.EXISTING_STORE;
}

@Override
public void addAdditionalFields(XContentBuilder builder, Params params) throws IOException {
builder.field("bootstrap_new_history_uuid", bootstrapNewHistoryUUID);
}

@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
out.writeBoolean(bootstrapNewHistoryUUID);
}
};
}

@Override
public boolean shouldBootstrapNewHistoryUUID() {
return bootstrapNewHistoryUUID;
}

@Override
public Type getType() {
return Type.EXISTING_STORE;
}

@Override
public String toString() {
return getType() == Type.EMPTY_STORE ? "new shard recovery" : "existing recovery";
return "existing store recovery; bootstrap_history_uuid=" + bootstrapNewHistoryUUID;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@

package org.elasticsearch.cluster.routing;

import org.elasticsearch.cluster.routing.RecoverySource.ExistingStoreRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent.Params;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -318,7 +317,7 @@ public ShardRouting moveToUnassigned(UnassignedInfo unassignedInfo) {
final RecoverySource recoverySource;
if (active()) {
if (primary()) {
recoverySource = StoreRecoverySource.EXISTING_STORE_INSTANCE;
recoverySource = ExistingStoreRecoverySource.INSTANCE;
} else {
recoverySource = PeerRecoverySource.INSTANCE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.EmptyStoreRecoverySource;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -136,7 +136,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
}

initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate,
StoreRecoverySource.EMPTY_STORE_INSTANCE);
EmptyStoreRecoverySource.INSTANCE);

return new RerouteExplanation(this, allocation.decision(Decision.YES, name() + " (allocation command)", "ignore deciders"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain)
"trying to allocate an existing primary shard [" + index + "][" + shardId + "], while no such shard has ever been active");
}

initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting);
initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, null,
RecoverySource.ExistingStoreRecoverySource.FORCE_STALE_PRIMARY_INSTANCE);
return new RerouteExplanation(this, allocation.decision(Decision.YES, name() + " (allocation command)", "ignore deciders"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,9 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId, indexShard.getPendingPrimaryTerm());
store.associateIndexWithNewTranslog(translogUUID);
} else if (indexShouldExists) {
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
store.bootstrapNewHistory();
}
// since we recover from local, just fill the files and size
try {
final RecoveryState.Index index = recoveryState.getIndex();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ static GroupShardsIterator<SearchShardIterator> getShardsIter(String index, Orig
ArrayList<ShardRouting> unassigned = new ArrayList<>();

ShardRouting routing = ShardRouting.newUnassigned(new ShardId(new Index(index, "_na_"), i), true,
RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"));
RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foobar"));
routing = routing.initialize(primaryNode.getId(), i + "p", 0);
routing.started();
started.add(routing);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.elasticsearch.cluster.routing;

import org.elasticsearch.cluster.routing.RecoverySource.StoreRecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.ExistingStoreRecoverySource;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand All @@ -37,7 +37,7 @@
public class AllocationIdTests extends ESTestCase {
public void testShardToStarted() {
logger.info("-- create unassigned shard");
ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true, StoreRecoverySource.EXISTING_STORE_INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true, ExistingStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
assertThat(shard.allocationId(), nullValue());

logger.info("-- initialize the shard");
Expand All @@ -57,7 +57,7 @@ public void testShardToStarted() {

public void testSuccessfulRelocation() {
logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true, StoreRecoverySource.EXISTING_STORE_INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true, ExistingStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard = shard.initialize("node1", null, -1);
shard = shard.moveToStarted();

Expand All @@ -80,7 +80,7 @@ public void testSuccessfulRelocation() {

public void testCancelRelocation() {
logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true, StoreRecoverySource.EXISTING_STORE_INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true, ExistingStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard = shard.initialize("node1", null, -1);
shard = shard.moveToStarted();

Expand All @@ -100,7 +100,7 @@ public void testCancelRelocation() {

public void testMoveToUnassigned() {
logger.info("-- build started shard");
ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true, StoreRecoverySource.EXISTING_STORE_INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
ShardRouting shard = ShardRouting.newUnassigned(new ShardId("test","_na_", 0), true, ExistingStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
shard = shard.initialize("node1", null, -1);
shard = shard.moveToStarted();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void testIterate() {

public ShardRouting newRouting(Index index, int id, boolean started) {
ShardRouting shardRouting = ShardRouting.newUnassigned(new ShardId(index, id), true,
RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
RecoverySource.EmptyStoreRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
shardRouting = ShardRoutingHelper.initialize(shardRouting, "some node");
if (started) {
shardRouting = ShardRoutingHelper.moveToStarted(shardRouting);
Expand Down
Loading