-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Record Force Merges in Live Commit Data #52694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
42ac71e
3fb159a
8ce66dc
6183d2f
68bf845
21e01a7
42ae9f4
e842569
7ebb39d
d707cba
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -184,6 +184,12 @@ public class InternalEngine extends Engine { | |
@Nullable | ||
private final String historyUUID; | ||
|
||
/** | ||
* UUID value that is updated every time the engine is force merged. | ||
*/ | ||
@Nullable | ||
private volatile String forceMergeUUID; | ||
|
||
public InternalEngine(EngineConfig engineConfig) { | ||
this(engineConfig, LocalCheckpointTracker::new); | ||
} | ||
|
@@ -222,7 +228,9 @@ public InternalEngine(EngineConfig engineConfig) { | |
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); | ||
writer = createWriter(); | ||
bootstrapAppendOnlyInfoFromWriter(writer); | ||
historyUUID = loadHistoryUUID(writer); | ||
final Map<String, String> commitData = commitDataAsMap(writer); | ||
historyUUID = loadHistoryUUID(commitData); | ||
forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY); | ||
indexWriter = writer; | ||
} catch (IOException | TranslogCorruptedException e) { | ||
throw new EngineCreationFailureException(shardId, "failed to create engine", e); | ||
|
@@ -558,6 +566,12 @@ public String getHistoryUUID() { | |
return historyUUID; | ||
} | ||
|
||
/** returns the force merge uuid for the engine */ | ||
@Nullable | ||
public String getForceMergeUUID() { | ||
return forceMergeUUID; | ||
} | ||
|
||
/** Returns how many bytes we are currently moving from indexing buffer to segments on disk */ | ||
@Override | ||
public long getWritingBytes() { | ||
|
@@ -567,8 +581,8 @@ public long getWritingBytes() { | |
/** | ||
* Reads the current stored history ID from the IW commit data. | ||
*/ | ||
private String loadHistoryUUID(final IndexWriter writer) { | ||
final String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY); | ||
private String loadHistoryUUID(Map<String, String> commitData) { | ||
final String uuid = commitData.get(HISTORY_UUID_KEY); | ||
if (uuid == null) { | ||
throw new IllegalStateException("commit doesn't contain history uuid"); | ||
} | ||
|
@@ -1815,7 +1829,8 @@ final Map<BytesRef, VersionValue> getVersionMap() { | |
|
||
@Override | ||
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, | ||
final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, IOException { | ||
final boolean upgrade, final boolean upgradeOnlyAncientSegments, | ||
final String forceMergeUUID) throws EngineException, IOException { | ||
if (onlyExpungeDeletes && maxNumSegments >= 0) { | ||
throw new IllegalArgumentException("only_expunge_deletes and max_num_segments are mutually exclusive"); | ||
} | ||
|
@@ -1850,6 +1865,7 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu | |
indexWriter.maybeMerge(); | ||
} else { | ||
indexWriter.forceMerge(maxNumSegments, true /* blocks and waits for merges*/); | ||
this.forceMergeUUID = forceMergeUUID; | ||
} | ||
if (flush) { | ||
flush(false, true); | ||
|
@@ -2297,12 +2313,16 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl | |
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time | ||
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene). | ||
*/ | ||
final Map<String, String> commitData = new HashMap<>(6); | ||
final Map<String, String> commitData = new HashMap<>(7); | ||
commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID()); | ||
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); | ||
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo())); | ||
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); | ||
commitData.put(HISTORY_UUID_KEY, historyUUID); | ||
final String currentForceMergeUUID = forceMergeUUID; | ||
if (currentForceMergeUUID != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we also check the NA value and not write it out in that case? Otherwise it will be up to all readers to do the right thing? I find the NA / null handling a bit confusing here, and wonder if we should just have one of these. My preference would be to have it Nullable everywhere, and not introduce another "null" value that is NA. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID); | ||
} | ||
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo())); | ||
logger.trace("committing writer with commit data [{}]", commitData); | ||
return commitData.entrySet().iterator(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.action.admin.indices.forcemerge; | ||
|
||
import org.elasticsearch.action.admin.indices.flush.FlushResponse; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
import org.elasticsearch.cluster.routing.IndexRoutingTable; | ||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.index.Index; | ||
import org.elasticsearch.index.engine.Engine; | ||
import org.elasticsearch.index.shard.IndexShard; | ||
import org.elasticsearch.indices.IndicesService; | ||
import org.elasticsearch.test.ESIntegTestCase; | ||
|
||
import java.io.IOException; | ||
|
||
import static org.hamcrest.Matchers.is; | ||
import static org.hamcrest.Matchers.notNullValue; | ||
import static org.hamcrest.Matchers.nullValue; | ||
|
||
public class ForceMergeIT extends ESIntegTestCase { | ||
|
||
public void testForceMergeUUIDConsistent() throws IOException { | ||
internalCluster().ensureAtLeastNumDataNodes(2); | ||
final String index = "test-index"; | ||
createIndex(index, | ||
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) | ||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).build()); | ||
ensureGreen(index); | ||
final ClusterState state = clusterService().state(); | ||
final IndexRoutingTable indexShardRoutingTables = state.routingTable().getIndicesRouting().get(index); | ||
final IndexShardRoutingTable shardRouting = indexShardRoutingTables.getShards().get(0); | ||
final String primaryNodeId = shardRouting.primaryShard().currentNodeId(); | ||
final String replicaNodeId = shardRouting.replicaShards().get(0).currentNodeId(); | ||
final Index idx = shardRouting.primaryShard().index(); | ||
final IndicesService primaryIndicesService = | ||
internalCluster().getInstance(IndicesService.class, state.nodes().get(primaryNodeId).getName()); | ||
final IndicesService replicaIndicesService = internalCluster().getInstance(IndicesService.class, | ||
state.nodes().get(replicaNodeId).getName()); | ||
final IndexShard primary = primaryIndicesService.indexService(idx).getShard(0); | ||
final IndexShard replica = replicaIndicesService.indexService(idx).getShard(0); | ||
|
||
assertThat(getForceMergeUUID(primary), nullValue()); | ||
assertThat(getForceMergeUUID(replica), nullValue()); | ||
|
||
final ForceMergeResponse forceMergeResponse = | ||
client().admin().indices().prepareForceMerge(index).setMaxNumSegments(1).get(); | ||
|
||
assertThat(forceMergeResponse.getFailedShards(), is(0)); | ||
assertThat(forceMergeResponse.getSuccessfulShards(), is(2)); | ||
|
||
// Force flush to force a new commit that contains the force flush UUID | ||
final FlushResponse flushResponse = client().admin().indices().prepareFlush(index).setForce(true).get(); | ||
assertThat(flushResponse.getFailedShards(), is(0)); | ||
assertThat(flushResponse.getSuccessfulShards(), is(2)); | ||
|
||
final String primaryForceMergeUUID = getForceMergeUUID(primary); | ||
assertThat(primaryForceMergeUUID, notNullValue()); | ||
|
||
final String replicaForceMergeUUID = getForceMergeUUID(replica); | ||
assertThat(replicaForceMergeUUID, notNullValue()); | ||
assertThat(primaryForceMergeUUID, is(replicaForceMergeUUID)); | ||
} | ||
|
||
private static String getForceMergeUUID(IndexShard indexShard) throws IOException { | ||
try (Engine.IndexCommitRef indexCommitRef = indexShard.acquireLastIndexCommit(true)) { | ||
return indexCommitRef.getIndexCommit().getUserData().get(Engine.FORCE_MERGE_UUID_KEY); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nullable