Skip to content

Commit 713e931

Browse files
Record Force Merges in Live Commit Data (#52694)
* Record Force Merges in live commit data Prerequisite of #52182. Record force merges in the live commit data so two shard states with the same sequence number that differ only in whether or not they have been force merged can be distinguished when creating snapshots.
1 parent 1073d09 commit 713e931

File tree

11 files changed

+192
-36
lines changed

11 files changed

+192
-36
lines changed

qa/evil-tests/src/test/java/org/elasticsearch/index/engine/EvilInternalEngineTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.lucene.index.IndexWriter;
2323
import org.apache.lucene.index.MergePolicy;
2424
import org.apache.lucene.index.SegmentCommitInfo;
25+
import org.elasticsearch.common.UUIDs;
2526
import org.elasticsearch.index.mapper.ParsedDocument;
2627

2728
import java.io.IOException;
@@ -89,7 +90,7 @@ public synchronized MergePolicy.OneMerge getNextMerge() {
8990
StreamSupport.stream(e.getLastCommittedSegmentInfos().spliterator(), false).collect(Collectors.toList());
9091
segmentsReference.set(segments);
9192
// trigger a background merge that will be managed by the concurrent merge scheduler
92-
e.forceMerge(randomBoolean(), 0, false, false, false);
93+
e.forceMerge(randomBoolean(), 0, false, false, false, UUIDs.randomBase64UUID());
9394
/*
9495
* Merging happens in the background on a merge thread, and the maybeDie handler is invoked on yet another thread; we have
9596
* to wait for these events to finish.

server/src/main/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeRequest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919

2020
package org.elasticsearch.action.admin.indices.forcemerge;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.action.ActionRequestValidationException;
2324
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
25+
import org.elasticsearch.common.Nullable;
26+
import org.elasticsearch.common.UUIDs;
2427
import org.elasticsearch.common.io.stream.StreamInput;
2528
import org.elasticsearch.common.io.stream.StreamOutput;
2629

@@ -53,20 +56,35 @@ public static final class Defaults {
5356
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
5457
private boolean flush = Defaults.FLUSH;
5558

59+
private static final Version FORCE_MERGE_UUID_VERSION = Version.V_8_0_0;
60+
61+
/**
62+
* Force merge UUID to store in the live commit data of a shard under
63+
* {@link org.elasticsearch.index.engine.Engine#FORCE_MERGE_UUID_KEY} after force merging it.
64+
*/
65+
@Nullable
66+
private final String forceMergeUUID;
67+
5668
/**
5769
* Constructs a merge request over one or more indices.
5870
*
5971
* @param indices The indices to merge, no indices passed means all indices will be merged.
6072
*/
6173
public ForceMergeRequest(String... indices) {
6274
super(indices);
75+
forceMergeUUID = UUIDs.randomBase64UUID();
6376
}
6477

6578
public ForceMergeRequest(StreamInput in) throws IOException {
6679
super(in);
6780
maxNumSegments = in.readInt();
6881
onlyExpungeDeletes = in.readBoolean();
6982
flush = in.readBoolean();
83+
if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
84+
forceMergeUUID = in.readOptionalString();
85+
} else {
86+
forceMergeUUID = null;
87+
}
7088
}
7189

7290
/**
@@ -103,6 +121,15 @@ public ForceMergeRequest onlyExpungeDeletes(boolean onlyExpungeDeletes) {
103121
return this;
104122
}
105123

124+
/**
125+
* Force merge UUID to use when force merging or {@code null} if not using one in a mixed version cluster containing nodes older than
126+
* {@link #FORCE_MERGE_UUID_VERSION}.
127+
*/
128+
@Nullable
129+
public String forceMergeUUID() {
130+
return forceMergeUUID;
131+
}
132+
106133
/**
107134
* Should flush be performed after the merge. Defaults to {@code true}.
108135
*/
@@ -132,6 +159,9 @@ public void writeTo(StreamOutput out) throws IOException {
132159
out.writeInt(maxNumSegments);
133160
out.writeBoolean(onlyExpungeDeletes);
134161
out.writeBoolean(flush);
162+
if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
163+
out.writeOptionalString(forceMergeUUID);
164+
}
135165
}
136166

137167
@Override

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public abstract class Engine implements Closeable {
104104

105105
public static final String SYNC_COMMIT_ID = "sync_id"; // TODO: Remove sync_id in 9.0
106106
public static final String HISTORY_UUID_KEY = "history_uuid";
107+
public static final String FORCE_MERGE_UUID_KEY = "force_merge_uuid";
107108
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
108109
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
109110

@@ -1035,18 +1036,12 @@ public final void flush() throws EngineException {
10351036
*/
10361037
public abstract void rollTranslogGeneration() throws EngineException;
10371038

1038-
/**
1039-
* Force merges to 1 segment
1040-
*/
1041-
public void forceMerge(boolean flush) throws IOException {
1042-
forceMerge(flush, 1, false, false, false);
1043-
}
1044-
10451039
/**
10461040
* Triggers a forced merge on this engine
10471041
*/
10481042
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
1049-
boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException, IOException;
1043+
boolean upgrade, boolean upgradeOnlyAncientSegments,
1044+
@Nullable String forceMergeUUID) throws EngineException, IOException;
10501045

10511046
/**
10521047
* Snapshots the most recent index and returns a handle to it. If needed will try and "commit" the

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,12 @@ public class InternalEngine extends Engine {
184184
@Nullable
185185
private final String historyUUID;
186186

187+
/**
188+
* UUID value that is updated every time the engine is force merged.
189+
*/
190+
@Nullable
191+
private volatile String forceMergeUUID;
192+
187193
public InternalEngine(EngineConfig engineConfig) {
188194
this(engineConfig, LocalCheckpointTracker::new);
189195
}
@@ -222,7 +228,9 @@ public InternalEngine(EngineConfig engineConfig) {
222228
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
223229
writer = createWriter();
224230
bootstrapAppendOnlyInfoFromWriter(writer);
225-
historyUUID = loadHistoryUUID(writer);
231+
final Map<String, String> commitData = commitDataAsMap(writer);
232+
historyUUID = loadHistoryUUID(commitData);
233+
forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY);
226234
indexWriter = writer;
227235
} catch (IOException | TranslogCorruptedException e) {
228236
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
@@ -558,6 +566,12 @@ public String getHistoryUUID() {
558566
return historyUUID;
559567
}
560568

569+
/** returns the force merge uuid for the engine */
570+
@Nullable
571+
public String getForceMergeUUID() {
572+
return forceMergeUUID;
573+
}
574+
561575
/** Returns how many bytes we are currently moving from indexing buffer to segments on disk */
562576
@Override
563577
public long getWritingBytes() {
@@ -567,8 +581,8 @@ public long getWritingBytes() {
567581
/**
568582
* Reads the current stored history ID from the IW commit data.
569583
*/
570-
private String loadHistoryUUID(final IndexWriter writer) {
571-
final String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY);
584+
private String loadHistoryUUID(Map<String, String> commitData) {
585+
final String uuid = commitData.get(HISTORY_UUID_KEY);
572586
if (uuid == null) {
573587
throw new IllegalStateException("commit doesn't contain history uuid");
574588
}
@@ -1815,7 +1829,8 @@ final Map<BytesRef, VersionValue> getVersionMap() {
18151829

18161830
@Override
18171831
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
1818-
final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, IOException {
1832+
final boolean upgrade, final boolean upgradeOnlyAncientSegments,
1833+
final String forceMergeUUID) throws EngineException, IOException {
18191834
if (onlyExpungeDeletes && maxNumSegments >= 0) {
18201835
throw new IllegalArgumentException("only_expunge_deletes and max_num_segments are mutually exclusive");
18211836
}
@@ -1850,6 +1865,7 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
18501865
indexWriter.maybeMerge();
18511866
} else {
18521867
indexWriter.forceMerge(maxNumSegments, true /* blocks and waits for merges*/);
1868+
this.forceMergeUUID = forceMergeUUID;
18531869
}
18541870
if (flush) {
18551871
flush(false, true);
@@ -2297,12 +2313,16 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
22972313
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
22982314
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
22992315
*/
2300-
final Map<String, String> commitData = new HashMap<>(6);
2316+
final Map<String, String> commitData = new HashMap<>(7);
23012317
commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
23022318
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
23032319
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
23042320
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
23052321
commitData.put(HISTORY_UUID_KEY, historyUUID);
2322+
final String currentForceMergeUUID = forceMergeUUID;
2323+
if (currentForceMergeUUID != null) {
2324+
commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID);
2325+
}
23062326
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
23072327
logger.trace("committing writer with commit data [{}]", commitData);
23082328
return commitData.entrySet().iterator();

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
388388

389389
@Override
390390
public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
391-
boolean upgrade, boolean upgradeOnlyAncientSegments) {
391+
boolean upgrade, boolean upgradeOnlyAncientSegments, String forceMergeUUID) {
392392
}
393393

394394
@Override

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,7 +1073,7 @@ public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
10731073
}
10741074
Engine engine = getEngine();
10751075
engine.forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(),
1076-
forceMerge.onlyExpungeDeletes(), false, false);
1076+
forceMerge.onlyExpungeDeletes(), false, false, forceMerge.forceMergeUUID());
10771077
}
10781078

10791079
/**
@@ -1089,7 +1089,7 @@ public org.apache.lucene.util.Version upgrade(UpgradeRequest upgrade) throws IOE
10891089
final Engine engine = getEngine();
10901090
engine.forceMerge(true, // we need to flush at the end to make sure the upgrade is durable
10911091
Integer.MAX_VALUE, // we just want to upgrade the segments, not actually optimize to a single segment
1092-
false, true, upgrade.upgradeOnlyAncientSegments());
1092+
false, true, upgrade.upgradeOnlyAncientSegments(), null);
10931093
org.apache.lucene.util.Version version = minimumCompatibleVersion();
10941094
if (logger.isTraceEnabled()) {
10951095
logger.trace("upgraded segments for {} from version {} to version {}", shardId, previousVersion, version);

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.cluster.metadata.MappingMetaData;
3737
import org.elasticsearch.cluster.routing.RecoverySource;
3838
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
39+
import org.elasticsearch.common.UUIDs;
3940
import org.elasticsearch.common.lucene.Lucene;
4041
import org.elasticsearch.common.unit.ByteSizeValue;
4142
import org.elasticsearch.common.unit.TimeValue;
@@ -135,7 +136,7 @@ void recoverFromLocalShards(Consumer<MappingMetaData> mappingUpdateConsumer, fin
135136
// just trigger a merge to do housekeeping on the
136137
// copied segments - we will also see them in stats etc.
137138
indexShard.getEngine().forceMerge(false, -1, false,
138-
false, false);
139+
false, false, UUIDs.randomBase64UUID());
139140
return true;
140141
} catch (IOException ex) {
141142
throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex);
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.admin.indices.forcemerge;
21+
22+
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
23+
import org.elasticsearch.cluster.ClusterState;
24+
import org.elasticsearch.cluster.metadata.IndexMetaData;
25+
import org.elasticsearch.cluster.routing.IndexRoutingTable;
26+
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
27+
import org.elasticsearch.common.settings.Settings;
28+
import org.elasticsearch.index.Index;
29+
import org.elasticsearch.index.engine.Engine;
30+
import org.elasticsearch.index.shard.IndexShard;
31+
import org.elasticsearch.indices.IndicesService;
32+
import org.elasticsearch.test.ESIntegTestCase;
33+
34+
import java.io.IOException;
35+
36+
import static org.hamcrest.Matchers.is;
37+
import static org.hamcrest.Matchers.notNullValue;
38+
import static org.hamcrest.Matchers.nullValue;
39+
40+
public class ForceMergeIT extends ESIntegTestCase {
41+
42+
public void testForceMergeUUIDConsistent() throws IOException {
43+
internalCluster().ensureAtLeastNumDataNodes(2);
44+
final String index = "test-index";
45+
createIndex(index,
46+
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
47+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).build());
48+
ensureGreen(index);
49+
final ClusterState state = clusterService().state();
50+
final IndexRoutingTable indexShardRoutingTables = state.routingTable().getIndicesRouting().get(index);
51+
final IndexShardRoutingTable shardRouting = indexShardRoutingTables.getShards().get(0);
52+
final String primaryNodeId = shardRouting.primaryShard().currentNodeId();
53+
final String replicaNodeId = shardRouting.replicaShards().get(0).currentNodeId();
54+
final Index idx = shardRouting.primaryShard().index();
55+
final IndicesService primaryIndicesService =
56+
internalCluster().getInstance(IndicesService.class, state.nodes().get(primaryNodeId).getName());
57+
final IndicesService replicaIndicesService = internalCluster().getInstance(IndicesService.class,
58+
state.nodes().get(replicaNodeId).getName());
59+
final IndexShard primary = primaryIndicesService.indexService(idx).getShard(0);
60+
final IndexShard replica = replicaIndicesService.indexService(idx).getShard(0);
61+
62+
assertThat(getForceMergeUUID(primary), nullValue());
63+
assertThat(getForceMergeUUID(replica), nullValue());
64+
65+
final ForceMergeResponse forceMergeResponse =
66+
client().admin().indices().prepareForceMerge(index).setMaxNumSegments(1).get();
67+
68+
assertThat(forceMergeResponse.getFailedShards(), is(0));
69+
assertThat(forceMergeResponse.getSuccessfulShards(), is(2));
70+
71+
// Force flush to force a new commit that contains the force flush UUID
72+
final FlushResponse flushResponse = client().admin().indices().prepareFlush(index).setForce(true).get();
73+
assertThat(flushResponse.getFailedShards(), is(0));
74+
assertThat(flushResponse.getSuccessfulShards(), is(2));
75+
76+
final String primaryForceMergeUUID = getForceMergeUUID(primary);
77+
assertThat(primaryForceMergeUUID, notNullValue());
78+
79+
final String replicaForceMergeUUID = getForceMergeUUID(replica);
80+
assertThat(replicaForceMergeUUID, notNullValue());
81+
assertThat(primaryForceMergeUUID, is(replicaForceMergeUUID));
82+
}
83+
84+
private static String getForceMergeUUID(IndexShard indexShard) throws IOException {
85+
try (Engine.IndexCommitRef indexCommitRef = indexShard.acquireLastIndexCommit(true)) {
86+
return indexCommitRef.getIndexCommit().getUserData().get(Engine.FORCE_MERGE_UUID_KEY);
87+
}
88+
}
89+
}

server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorSyncIdIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.admin.indices.stats.ShardStats;
2424
import org.elasticsearch.cluster.metadata.IndexMetaData;
2525
import org.elasticsearch.cluster.routing.UnassignedInfo;
26+
import org.elasticsearch.common.UUIDs;
2627
import org.elasticsearch.common.settings.Settings;
2728
import org.elasticsearch.common.util.concurrent.ReleasableLock;
2829
import org.elasticsearch.index.Index;
@@ -98,7 +99,7 @@ void syncFlush(String syncId) throws IOException {
9899
// make sure that we have committed translog; otherwise, we can flush after relaying translog in store recovery
99100
flush(true, true);
100101
// make sure that background merges won't happen; otherwise, IndexWriter#hasUncommittedChanges can become true again
101-
forceMerge(false);
102+
forceMerge(false, 1, false, false, false, UUIDs.randomBase64UUID());
102103
assertNotNull(indexWriter);
103104
try (ReleasableLock ignored = writeLock.acquire()) {
104105
assertThat(getTranslogStats().getUncommittedOperations(), equalTo(0));

0 commit comments

Comments
 (0)