Skip to content

Record Force Merges in Live Commit Data #52694

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.index.mapper.ParsedDocument;

import java.io.IOException;
Expand Down Expand Up @@ -89,7 +90,7 @@ public synchronized MergePolicy.OneMerge getNextMerge() {
StreamSupport.stream(e.getLastCommittedSegmentInfos().spliterator(), false).collect(Collectors.toList());
segmentsReference.set(segments);
// trigger a background merge that will be managed by the concurrent merge scheduler
e.forceMerge(randomBoolean(), 0, false, false, false);
e.forceMerge(randomBoolean(), 0, false, false, false, UUIDs.randomBase64UUID());
/*
* Merging happens in the background on a merge thread, and the maybeDie handler is invoked on yet another thread; we have
* to wait for these events to finish.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

package org.elasticsearch.action.admin.indices.forcemerge;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

Expand Down Expand Up @@ -53,20 +56,35 @@ public static final class Defaults {
private boolean onlyExpungeDeletes = Defaults.ONLY_EXPUNGE_DELETES;
private boolean flush = Defaults.FLUSH;

private static final Version FORCE_MERGE_UUID_VERSION = Version.V_8_0_0;

/**
* Force merge UUID to store in the live commit data of a shard under
* {@link org.elasticsearch.index.engine.Engine#FORCE_MERGE_UUID_KEY} after force merging it.
*/
@Nullable
private final String forceMergeUUID;

/**
* Constructs a merge request over one or more indices.
*
* @param indices The indices to merge, no indices passed means all indices will be merged.
*/
public ForceMergeRequest(String... indices) {
super(indices);
forceMergeUUID = UUIDs.randomBase64UUID();
}

public ForceMergeRequest(StreamInput in) throws IOException {
super(in);
maxNumSegments = in.readInt();
onlyExpungeDeletes = in.readBoolean();
flush = in.readBoolean();
if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
forceMergeUUID = in.readOptionalString();
} else {
forceMergeUUID = null;
}
}

/**
Expand Down Expand Up @@ -103,6 +121,15 @@ public ForceMergeRequest onlyExpungeDeletes(boolean onlyExpungeDeletes) {
return this;
}

/**
* Force merge UUID to use when force merging or {@code null} if not using one in a mixed version cluster containing nodes older than
* {@link #FORCE_MERGE_UUID_VERSION}.
*/
@Nullable
public String forceMergeUUID() {
return forceMergeUUID;
}

/**
* Should flush be performed after the merge. Defaults to {@code true}.
*/
Expand Down Expand Up @@ -132,6 +159,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeInt(maxNumSegments);
out.writeBoolean(onlyExpungeDeletes);
out.writeBoolean(flush);
if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) {
out.writeOptionalString(forceMergeUUID);
}
}

@Override
Expand Down
11 changes: 3 additions & 8 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public abstract class Engine implements Closeable {

public static final String SYNC_COMMIT_ID = "sync_id"; // TODO: Remove sync_id in 9.0
public static final String HISTORY_UUID_KEY = "history_uuid";
public static final String FORCE_MERGE_UUID_KEY = "force_merge_uuid";
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";

Expand Down Expand Up @@ -1035,18 +1036,12 @@ public final void flush() throws EngineException {
*/
public abstract void rollTranslogGeneration() throws EngineException;

/**
* Force merges to 1 segment
*/
public void forceMerge(boolean flush) throws IOException {
forceMerge(flush, 1, false, false, false);
}

/**
* Triggers a forced merge on this engine
*/
public abstract void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
boolean upgrade, boolean upgradeOnlyAncientSegments) throws EngineException, IOException;
boolean upgrade, boolean upgradeOnlyAncientSegments,
@Nullable String forceMergeUUID) throws EngineException, IOException;

/**
* Snapshots the most recent index and returns a handle to it. If needed will try and "commit" the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ public class InternalEngine extends Engine {
@Nullable
private final String historyUUID;

/**
* UUID value that is updated every time the engine is force merged.
*/
@Nullable
private volatile String forceMergeUUID;

public InternalEngine(EngineConfig engineConfig) {
this(engineConfig, LocalCheckpointTracker::new);
}
Expand Down Expand Up @@ -222,7 +228,9 @@ public InternalEngine(EngineConfig engineConfig) {
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
historyUUID = loadHistoryUUID(writer);
final Map<String, String> commitData = commitDataAsMap(writer);
historyUUID = loadHistoryUUID(commitData);
forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY);
indexWriter = writer;
} catch (IOException | TranslogCorruptedException e) {
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
Expand Down Expand Up @@ -558,6 +566,12 @@ public String getHistoryUUID() {
return historyUUID;
}

/** returns the force merge uuid for the engine */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Nullable
public String getForceMergeUUID() {
return forceMergeUUID;
}

/** Returns how many bytes we are currently moving from indexing buffer to segments on disk */
@Override
public long getWritingBytes() {
Expand All @@ -567,8 +581,8 @@ public long getWritingBytes() {
/**
* Reads the current stored history ID from the IW commit data.
*/
private String loadHistoryUUID(final IndexWriter writer) {
final String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY);
private String loadHistoryUUID(Map<String, String> commitData) {
final String uuid = commitData.get(HISTORY_UUID_KEY);
if (uuid == null) {
throw new IllegalStateException("commit doesn't contain history uuid");
}
Expand Down Expand Up @@ -1815,7 +1829,8 @@ final Map<BytesRef, VersionValue> getVersionMap() {

@Override
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, IOException {
final boolean upgrade, final boolean upgradeOnlyAncientSegments,
final String forceMergeUUID) throws EngineException, IOException {
if (onlyExpungeDeletes && maxNumSegments >= 0) {
throw new IllegalArgumentException("only_expunge_deletes and max_num_segments are mutually exclusive");
}
Expand Down Expand Up @@ -1850,6 +1865,7 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
indexWriter.maybeMerge();
} else {
indexWriter.forceMerge(maxNumSegments, true /* blocks and waits for merges*/);
this.forceMergeUUID = forceMergeUUID;
}
if (flush) {
flush(false, true);
Expand Down Expand Up @@ -2297,12 +2313,16 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(6);
final Map<String, String> commitData = new HashMap<>(7);
commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
final String currentForceMergeUUID = forceMergeUUID;
if (currentForceMergeUUID != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also check the NA value and not write it out in that case? Otherwise it will be up to all readers to do the right thing? I find the NA / null handling a bit confusing here, and wonder if we should just have one of these. My preference would be to have it Nullable everywhere, and not introduce another "null" value that is NA.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ywelsch, that's a good point :) => went for nullable all the way in e842569 now

commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID);
}
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {

@Override
public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
boolean upgrade, boolean upgradeOnlyAncientSegments) {
boolean upgrade, boolean upgradeOnlyAncientSegments, String forceMergeUUID) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@ public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
}
Engine engine = getEngine();
engine.forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(),
forceMerge.onlyExpungeDeletes(), false, false);
forceMerge.onlyExpungeDeletes(), false, false, forceMerge.forceMergeUUID());
}

/**
Expand All @@ -1089,7 +1089,7 @@ public org.apache.lucene.util.Version upgrade(UpgradeRequest upgrade) throws IOE
final Engine engine = getEngine();
engine.forceMerge(true, // we need to flush at the end to make sure the upgrade is durable
Integer.MAX_VALUE, // we just want to upgrade the segments, not actually optimize to a single segment
false, true, upgrade.upgradeOnlyAncientSegments());
false, true, upgrade.upgradeOnlyAncientSegments(), null);
org.apache.lucene.util.Version version = minimumCompatibleVersion();
if (logger.isTraceEnabled()) {
logger.trace("upgraded segments for {} from version {} to version {}", shardId, previousVersion, version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -135,7 +136,7 @@ void recoverFromLocalShards(Consumer<MappingMetaData> mappingUpdateConsumer, fin
// just trigger a merge to do housekeeping on the
// copied segments - we will also see them in stats etc.
indexShard.getEngine().forceMerge(false, -1, false,
false, false);
false, false, UUIDs.randomBase64UUID());
return true;
} catch (IOException ex) {
throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.indices.forcemerge;

import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;

import java.io.IOException;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class ForceMergeIT extends ESIntegTestCase {

public void testForceMergeUUIDConsistent() throws IOException {
internalCluster().ensureAtLeastNumDataNodes(2);
final String index = "test-index";
createIndex(index,
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureGreen(index);
final ClusterState state = clusterService().state();
final IndexRoutingTable indexShardRoutingTables = state.routingTable().getIndicesRouting().get(index);
final IndexShardRoutingTable shardRouting = indexShardRoutingTables.getShards().get(0);
final String primaryNodeId = shardRouting.primaryShard().currentNodeId();
final String replicaNodeId = shardRouting.replicaShards().get(0).currentNodeId();
final Index idx = shardRouting.primaryShard().index();
final IndicesService primaryIndicesService =
internalCluster().getInstance(IndicesService.class, state.nodes().get(primaryNodeId).getName());
final IndicesService replicaIndicesService = internalCluster().getInstance(IndicesService.class,
state.nodes().get(replicaNodeId).getName());
final IndexShard primary = primaryIndicesService.indexService(idx).getShard(0);
final IndexShard replica = replicaIndicesService.indexService(idx).getShard(0);

assertThat(getForceMergeUUID(primary), nullValue());
assertThat(getForceMergeUUID(replica), nullValue());

final ForceMergeResponse forceMergeResponse =
client().admin().indices().prepareForceMerge(index).setMaxNumSegments(1).get();

assertThat(forceMergeResponse.getFailedShards(), is(0));
assertThat(forceMergeResponse.getSuccessfulShards(), is(2));

// Force flush to force a new commit that contains the force flush UUID
final FlushResponse flushResponse = client().admin().indices().prepareFlush(index).setForce(true).get();
assertThat(flushResponse.getFailedShards(), is(0));
assertThat(flushResponse.getSuccessfulShards(), is(2));

final String primaryForceMergeUUID = getForceMergeUUID(primary);
assertThat(primaryForceMergeUUID, notNullValue());

final String replicaForceMergeUUID = getForceMergeUUID(replica);
assertThat(replicaForceMergeUUID, notNullValue());
assertThat(primaryForceMergeUUID, is(replicaForceMergeUUID));
}

private static String getForceMergeUUID(IndexShard indexShard) throws IOException {
try (Engine.IndexCommitRef indexCommitRef = indexShard.acquireLastIndexCommit(true)) {
return indexCommitRef.getIndexCommit().getUserData().get(Engine.FORCE_MERGE_UUID_KEY);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -98,7 +99,7 @@ void syncFlush(String syncId) throws IOException {
// make sure that we have committed translog; otherwise, we can flush after relaying translog in store recovery
flush(true, true);
// make sure that background merges won't happen; otherwise, IndexWriter#hasUncommittedChanges can become true again
forceMerge(false);
forceMerge(false, 1, false, false, false, UUIDs.randomBase64UUID());
assertNotNull(indexWriter);
try (ReleasableLock ignored = writeLock.acquire()) {
assertThat(getTranslogStats().getUncommittedOperations(), equalTo(0));
Expand Down
Loading