Skip to content

Start indexing throttling only after disk IO unthrottling does not keep up with the merge load #125654

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
Expand Down Expand Up @@ -466,7 +467,7 @@ public void testNonThrottleStats() throws Exception {
assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis(), equalTo(0L));
}

public void testThrottleStats() throws Exception {
public void testThrottleStats() {
assertAcked(
prepareCreate("test").setSettings(
settingsBuilder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
Expand All @@ -479,31 +480,38 @@ public void testThrottleStats() throws Exception {
)
);
ensureGreen();
long termUpto = 0;
IndicesStatsResponse stats;
// make sure we see throttling kicking in:
boolean done = false;
AtomicBoolean done = new AtomicBoolean();
AtomicLong termUpTo = new AtomicLong();
long start = System.currentTimeMillis();
while (done == false) {
for (int i = 0; i < 100; i++) {
// Provoke slowish merging by making many unique terms:
StringBuilder sb = new StringBuilder();
for (int j = 0; j < 100; j++) {
sb.append(' ');
sb.append(termUpto++);
}
prepareIndex("test").setId("" + termUpto).setSource("field" + (i % 10), sb.toString()).get();
if (i % 2 == 0) {
for (int threadIdx = 0; threadIdx < 5; threadIdx++) {
int finalThreadIdx = threadIdx;
new Thread(() -> {
IndicesStatsResponse stats;
while (done.get() == false) {
for (int i = 0; i < 100; i++) {
// Provoke slowish merging by making many unique terms:
StringBuilder sb = new StringBuilder();
for (int j = 0; j < 100; j++) {
sb.append(' ');
sb.append(termUpTo.incrementAndGet());
}
prepareIndex("test").setId("" + termUpTo.get()).setSource("field" + (i % 10), sb.toString()).get();
if (i % 2 == 0) {
refresh();
}
}
refresh();
if (finalThreadIdx == 0) {
stats = indicesAdmin().prepareStats().get();
done.set(stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis() > 0);
}
if (System.currentTimeMillis() - start > 300 * 1000) { // Wait 5 minutes for throttling to kick in
done.set(true);
fail("index throttling didn't kick in after 5 minutes of intense merging");
}
}
}
refresh();
stats = indicesAdmin().prepareStats().get();
// nodesStats = clusterAdmin().prepareNodesStats().setIndices(true).get();
done = stats.getPrimaries().getIndexing().getTotal().getThrottleTime().millis() > 0;
if (System.currentTimeMillis() - start > 300 * 1000) { // Wait 5 minutes for throttling to kick in
fail("index throttling didn't kick in after 5 minutes of intense merging");
}
}).start();
}

// Optimize & flush and wait; else we sometimes get a "Delete Index failed - not acked"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,10 @@ interface UpdateConsumer {
}
}

public boolean usingMaxTargetIORateBytesPerSec() {
return MAX_IO_RATE.getBytes() == targetIORateBytesPerSec.get();
}

// exposed for tests
Set<MergeTask> getRunningMergeTasks() {
return runningMergeTasks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
);
private final ShardId shardId;
private final MergeSchedulerConfig config;
private final Logger logger;
protected final Logger logger;
Copy link
Contributor Author

@albertzaharovits albertzaharovits Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated change, in order to let these log messages go to the ThreadPoolMergeScheduler's logger instead of the InternalEngine's.

private final MergeTracking mergeTracking;
private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService;
private final PriorityQueue<MergeTask> backloggedMergeTasks = new PriorityQueue<>(
Expand Down Expand Up @@ -191,7 +191,10 @@ private void checkMergeTaskThrottling() {
int configuredMaxMergeCount = config.getMaxMergeCount();
// both currently running and enqueued merge tasks are considered "active" for throttling purposes
int activeMerges = (int) (submittedMergesCount - doneMergesCount);
if (activeMerges > configuredMaxMergeCount && shouldThrottleIncomingMerges.get() == false) {
if (activeMerges > configuredMaxMergeCount
// only throttle indexing if disk IO is un-throttled, and we still can't keep up with the merge load
&& threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec()
&& shouldThrottleIncomingMerges.get() == false) {
// maybe enable merge task throttling
synchronized (shouldThrottleIncomingMerges) {
if (shouldThrottleIncomingMerges.getAndSet(true) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,165 @@ public void testSimpleMergeTaskReEnqueueingBySize() {
}
}

public void testIndexingThrottlingWhenSubmittingMerges() {
final int maxThreadCount = randomIntBetween(1, 5);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of testing with random values, I would recommend running a parametrized test with different parameters set so that we can have reproducible results.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not so much about parameterized testing as it is about just not hard-coding a specific value. It is quite common to do this in our code-base, so I'd prefer to keep this as is.

// settings validation requires maxMergeCount >= maxThreadCount
final int maxMergeCount = maxThreadCount + randomIntBetween(0, 5);
List<MergeTask> submittedMergeTasks = new ArrayList<>();
AtomicBoolean isUsingMaxTargetIORate = new AtomicBoolean(false);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mockThreadPoolMergeExecutorService(
submittedMergeTasks,
isUsingMaxTargetIORate
);
Settings mergeSchedulerSettings = Settings.builder()
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount)
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount)
.build();
TestThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
threadPoolMergeExecutorService
);
// make sure there are more merges submitted than the max merge count limit (which triggers IO throttling)
int excessMerges = randomIntBetween(1, 10);
int mergesToSubmit = maxMergeCount + excessMerges;
boolean expectIndexThrottling = false;
int submittedMerges = 0;
// merges are submitted, while some are also scheduled (but none is run)
while (submittedMerges < mergesToSubmit - 1) {
isUsingMaxTargetIORate.set(randomBoolean());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as above. I think having success with random number in place could give a false sense of healthy state, that could later end up in failure. I would instead suggest having a parametrized test case with all the combination of parameters sets considered to be interesting for testing. I would suggest some reading regarding table driven unit tests [1]

[1] https://semaphore.io/blog/table-driven-unit-tests-go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ok here, since the test ends up verifying that throttling does kick in when true and the loop here is expected to run a few times in most cases.

if (submittedMergeTasks.isEmpty() == false && randomBoolean()) {
// maybe schedule one submitted merge
MergeTask mergeTask = randomFrom(submittedMergeTasks);
submittedMergeTasks.remove(mergeTask);
mergeTask.schedule();
} else {
// submit one merge
MergeSource mergeSource = mock(MergeSource.class);
OneMerge oneMerge = mock(OneMerge.class);
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
submittedMerges++;
if (isUsingMaxTargetIORate.get() && submittedMerges > maxMergeCount) {
expectIndexThrottling = true;
} else if (submittedMerges <= maxMergeCount) {
expectIndexThrottling = false;
}
}
// assert IO throttle state
assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(expectIndexThrottling));
}
// submit one last merge when IO throttling is at max value
isUsingMaxTargetIORate.set(true);
MergeSource mergeSource = mock(MergeSource.class);
OneMerge oneMerge = mock(OneMerge.class);
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
// assert index throttling because IO throttling is at max value
assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(true));
}

public void testIndexingThrottlingWhileMergesAreRunning() {
final int maxThreadCount = randomIntBetween(1, 5);
// settings validation requires maxMergeCount >= maxThreadCount
final int maxMergeCount = maxThreadCount + randomIntBetween(0, 5);
List<MergeTask> submittedMergeTasks = new ArrayList<>();
List<MergeTask> scheduledToRunMergeTasks = new ArrayList<>();
AtomicBoolean isUsingMaxTargetIORate = new AtomicBoolean(false);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mockThreadPoolMergeExecutorService(
submittedMergeTasks,
isUsingMaxTargetIORate
);
Settings mergeSchedulerSettings = Settings.builder()
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount)
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount)
.build();
TestThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
threadPoolMergeExecutorService
);
int mergesToRun = randomIntBetween(0, 5);
// make sure there are more merges submitted and not run
int excessMerges = randomIntBetween(1, 10);
int mergesToSubmit = maxMergeCount + mergesToRun + excessMerges;
int mergesOutstanding = 0;
boolean expectIndexThrottling = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not clear to me that every test run invokes index throttling. I wonder if we are overcomplicating the test a little here and whether we should simply make it have X merges submitted and then verify that index-throttling kicks in (twice, one with the max-io-rate-simulation set to false first, then one where it is set to true)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, it's complicated and it not always tests that index throttling kicks in when IO throttling is at max level.
But, I still think the test is valuable because it constantly asserts the indexing throttling state while merges are submitted, scheduler, and running at the same time (and it's deterministic).

I've added another test, that asserts indexing throttling kicks in when more than max_merge_count merges are submitted (some are scheduled, but none is run). It's simpler, it always asserts that index throttling is toggled, but it's covering less.

Let me know if this now looks OK with you.

// merges are submitted, while some are also scheduled and run
while (mergesToSubmit > 0) {
isUsingMaxTargetIORate.set(randomBoolean());
if (submittedMergeTasks.isEmpty() == false && randomBoolean()) {
// maybe schedule one submitted merge
MergeTask mergeTask = randomFrom(submittedMergeTasks);
submittedMergeTasks.remove(mergeTask);
Schedule schedule = mergeTask.schedule();
if (schedule == Schedule.RUN) {
scheduledToRunMergeTasks.add(mergeTask);
}
} else {
if (mergesToRun > 0 && scheduledToRunMergeTasks.isEmpty() == false && randomBoolean()) {
// maybe run one scheduled merge
MergeTask mergeTask = randomFrom(scheduledToRunMergeTasks);
scheduledToRunMergeTasks.remove(mergeTask);
mergeTask.run();
mergesToRun--;
mergesOutstanding--;
} else {
// submit one merge
MergeSource mergeSource = mock(MergeSource.class);
OneMerge oneMerge = mock(OneMerge.class);
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
mergesToSubmit--;
mergesOutstanding++;
}
if (isUsingMaxTargetIORate.get() && mergesOutstanding > maxMergeCount) {
expectIndexThrottling = true;
} else if (mergesOutstanding <= maxMergeCount) {
expectIndexThrottling = false;
}
}
// assert IO throttle state
assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(expectIndexThrottling));
}
// execute all remaining merges (submitted or scheduled)
while (mergesToRun > 0 || submittedMergeTasks.isEmpty() == false || scheduledToRunMergeTasks.isEmpty() == false) {
// simulate that the {@link ThreadPoolMergeExecutorService} maybe peaked IO un-throttling
isUsingMaxTargetIORate.set(randomBoolean());
if (submittedMergeTasks.isEmpty() == false && (scheduledToRunMergeTasks.isEmpty() || randomBoolean())) {
// maybe schedule one submitted merge
MergeTask mergeTask = randomFrom(submittedMergeTasks);
submittedMergeTasks.remove(mergeTask);
Schedule schedule = mergeTask.schedule();
if (schedule == Schedule.RUN) {
scheduledToRunMergeTasks.add(mergeTask);
}
} else {
// maybe run one scheduled merge
MergeTask mergeTask = randomFrom(scheduledToRunMergeTasks);
scheduledToRunMergeTasks.remove(mergeTask);
mergeTask.run();
mergesToRun--;
mergesOutstanding--;
if (isUsingMaxTargetIORate.get() && mergesOutstanding > maxMergeCount) {
expectIndexThrottling = true;
} else if (mergesOutstanding <= maxMergeCount) {
expectIndexThrottling = false;
}
}
// assert IO throttle state
assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(expectIndexThrottling));
}
// all merges done
assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(false));
}

public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception {
// test with min 2 allowed concurrent merges
int mergeExecutorThreadCount = randomIntBetween(2, 5);
Expand Down Expand Up @@ -493,4 +652,49 @@ private static MergeInfo getNewMergeInfo(long estimatedMergeBytes) {
private static MergeInfo getNewMergeInfo(long estimatedMergeBytes, int maxNumSegments) {
return new MergeInfo(randomNonNegativeInt(), estimatedMergeBytes, randomBoolean(), maxNumSegments);
}

static class TestThreadPoolMergeScheduler extends ThreadPoolMergeScheduler {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have to override a class to test it (or for it), in this case, probably what you need is a field you can change in the original class itself to trigger a different behaviour. The reason is a refactor in the code could give unexpected side effects not easy to spot by not testing agains the actual implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We typically try to avoid such artificial test pieces in production code. I think such an override as here is quite common so do not mind the current form. Feel free to maybe open a draft PR after this is merged to illustrate your point more (I am not sure I fully understood it).

AtomicBoolean isIndexingThrottlingEnabled = new AtomicBoolean(false);

TestThreadPoolMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
) {
super(shardId, indexSettings, threadPoolMergeExecutorService);
}

@Override
protected void enableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {
isIndexingThrottlingEnabled.set(true);
}

@Override
protected void disableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {
isIndexingThrottlingEnabled.set(false);
}

boolean isIndexingThrottlingEnabled() {
return isIndexingThrottlingEnabled.get();
}
}

static ThreadPoolMergeExecutorService mockThreadPoolMergeExecutorService(
List<MergeTask> submittedMergeTasks,
AtomicBoolean isUsingMaxTargetIORate
) {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class);
doAnswer(invocation -> {
MergeTask mergeTask = (MergeTask) invocation.getArguments()[0];
submittedMergeTasks.add(mergeTask);
return null;
}).when(threadPoolMergeExecutorService).submitMergeTask(any(MergeTask.class));
doAnswer(invocation -> {
MergeTask mergeTask = (MergeTask) invocation.getArguments()[0];
submittedMergeTasks.add(mergeTask);
return null;
}).when(threadPoolMergeExecutorService).reEnqueueBackloggedMergeTask(any(MergeTask.class));
doAnswer(invocation -> isUsingMaxTargetIORate.get()).when(threadPoolMergeExecutorService).usingMaxTargetIORateBytesPerSec();
return threadPoolMergeExecutorService;
}
}