-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Start indexing throttling only after disk IO unthrottling does not keep up with the merge load #125654
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Start indexing throttling only after disk IO unthrottling does not keep up with the merge load #125654
Changes from all commits
7e7e624
5c3ef42
54cc5ab
d1b8193
67fe301
774389d
0c8c149
7864c46
948da81
e381634
f41cb27
0a91ab6
5472e6a
a419917
ea84548
c4cf8b0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -173,6 +173,165 @@ public void testSimpleMergeTaskReEnqueueingBySize() { | |
} | ||
} | ||
|
||
public void testIndexingThrottlingWhenSubmittingMerges() { | ||
final int maxThreadCount = randomIntBetween(1, 5); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of testing with random values, I would recommend running a parametrized test with different parameters set so that we can have reproducible results. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not so much about parameterized testing as it is about just not hard-coding a specific value. It is quite common to do this in our code-base, so I'd prefer to keep this as is. |
||
// settings validation requires maxMergeCount >= maxThreadCount | ||
final int maxMergeCount = maxThreadCount + randomIntBetween(0, 5); | ||
List<MergeTask> submittedMergeTasks = new ArrayList<>(); | ||
AtomicBoolean isUsingMaxTargetIORate = new AtomicBoolean(false); | ||
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mockThreadPoolMergeExecutorService( | ||
submittedMergeTasks, | ||
isUsingMaxTargetIORate | ||
); | ||
Settings mergeSchedulerSettings = Settings.builder() | ||
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount) | ||
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount) | ||
.build(); | ||
TestThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler( | ||
new ShardId("index", "_na_", 1), | ||
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings), | ||
threadPoolMergeExecutorService | ||
); | ||
// make sure there are more merges submitted than the max merge count limit (which triggers IO throttling) | ||
int excessMerges = randomIntBetween(1, 10); | ||
int mergesToSubmit = maxMergeCount + excessMerges; | ||
boolean expectIndexThrottling = false; | ||
int submittedMerges = 0; | ||
// merges are submitted, while some are also scheduled (but none is run) | ||
while (submittedMerges < mergesToSubmit - 1) { | ||
isUsingMaxTargetIORate.set(randomBoolean()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same as above. I think having success with random number in place could give a false sense of healthy state, that could later end up in failure. I would instead suggest having a parametrized test case with all the combination of parameters sets considered to be interesting for testing. I would suggest some reading regarding table driven unit tests [1] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is ok here, since the test ends up verifying that throttling does kick in when true and the loop here is expected to run a few times in most cases. |
||
if (submittedMergeTasks.isEmpty() == false && randomBoolean()) { | ||
// maybe schedule one submitted merge | ||
MergeTask mergeTask = randomFrom(submittedMergeTasks); | ||
submittedMergeTasks.remove(mergeTask); | ||
mergeTask.schedule(); | ||
} else { | ||
// submit one merge | ||
MergeSource mergeSource = mock(MergeSource.class); | ||
OneMerge oneMerge = mock(OneMerge.class); | ||
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); | ||
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); | ||
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); | ||
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); | ||
submittedMerges++; | ||
if (isUsingMaxTargetIORate.get() && submittedMerges > maxMergeCount) { | ||
expectIndexThrottling = true; | ||
} else if (submittedMerges <= maxMergeCount) { | ||
expectIndexThrottling = false; | ||
} | ||
} | ||
// assert IO throttle state | ||
assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(expectIndexThrottling)); | ||
} | ||
// submit one last merge when IO throttling is at max value | ||
isUsingMaxTargetIORate.set(true); | ||
MergeSource mergeSource = mock(MergeSource.class); | ||
OneMerge oneMerge = mock(OneMerge.class); | ||
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); | ||
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); | ||
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); | ||
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); | ||
// assert index throttling because IO throttling is at max value | ||
assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(true)); | ||
} | ||
|
||
public void testIndexingThrottlingWhileMergesAreRunning() { | ||
final int maxThreadCount = randomIntBetween(1, 5); | ||
// settings validation requires maxMergeCount >= maxThreadCount | ||
final int maxMergeCount = maxThreadCount + randomIntBetween(0, 5); | ||
List<MergeTask> submittedMergeTasks = new ArrayList<>(); | ||
List<MergeTask> scheduledToRunMergeTasks = new ArrayList<>(); | ||
AtomicBoolean isUsingMaxTargetIORate = new AtomicBoolean(false); | ||
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mockThreadPoolMergeExecutorService( | ||
submittedMergeTasks, | ||
isUsingMaxTargetIORate | ||
); | ||
Settings mergeSchedulerSettings = Settings.builder() | ||
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount) | ||
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount) | ||
.build(); | ||
TestThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler( | ||
new ShardId("index", "_na_", 1), | ||
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings), | ||
threadPoolMergeExecutorService | ||
); | ||
int mergesToRun = randomIntBetween(0, 5); | ||
// make sure there are more merges submitted and not run | ||
int excessMerges = randomIntBetween(1, 10); | ||
int mergesToSubmit = maxMergeCount + mergesToRun + excessMerges; | ||
int mergesOutstanding = 0; | ||
boolean expectIndexThrottling = false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not clear to me that every test run invokes index throttling. I wonder if we are overcomplicating the test a little here and whether we should simply make it have X merges submitted and then verify that index-throttling kicks in (twice, one with the max-io-rate-simulation set to false first, then one where it is set to true)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, it's complicated and it not always tests that index throttling kicks in when IO throttling is at max level. I've added another test, that asserts indexing throttling kicks in when more than Let me know if this now looks OK with you. |
||
// merges are submitted, while some are also scheduled and run | ||
while (mergesToSubmit > 0) { | ||
isUsingMaxTargetIORate.set(randomBoolean()); | ||
if (submittedMergeTasks.isEmpty() == false && randomBoolean()) { | ||
// maybe schedule one submitted merge | ||
MergeTask mergeTask = randomFrom(submittedMergeTasks); | ||
submittedMergeTasks.remove(mergeTask); | ||
Schedule schedule = mergeTask.schedule(); | ||
if (schedule == Schedule.RUN) { | ||
scheduledToRunMergeTasks.add(mergeTask); | ||
} | ||
} else { | ||
if (mergesToRun > 0 && scheduledToRunMergeTasks.isEmpty() == false && randomBoolean()) { | ||
// maybe run one scheduled merge | ||
MergeTask mergeTask = randomFrom(scheduledToRunMergeTasks); | ||
scheduledToRunMergeTasks.remove(mergeTask); | ||
mergeTask.run(); | ||
mergesToRun--; | ||
mergesOutstanding--; | ||
} else { | ||
// submit one merge | ||
MergeSource mergeSource = mock(MergeSource.class); | ||
OneMerge oneMerge = mock(OneMerge.class); | ||
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); | ||
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); | ||
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); | ||
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); | ||
mergesToSubmit--; | ||
mergesOutstanding++; | ||
} | ||
if (isUsingMaxTargetIORate.get() && mergesOutstanding > maxMergeCount) { | ||
expectIndexThrottling = true; | ||
} else if (mergesOutstanding <= maxMergeCount) { | ||
expectIndexThrottling = false; | ||
} | ||
} | ||
// assert IO throttle state | ||
assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(expectIndexThrottling)); | ||
} | ||
// execute all remaining merges (submitted or scheduled) | ||
while (mergesToRun > 0 || submittedMergeTasks.isEmpty() == false || scheduledToRunMergeTasks.isEmpty() == false) { | ||
// simulate that the {@link ThreadPoolMergeExecutorService} maybe peaked IO un-throttling | ||
isUsingMaxTargetIORate.set(randomBoolean()); | ||
if (submittedMergeTasks.isEmpty() == false && (scheduledToRunMergeTasks.isEmpty() || randomBoolean())) { | ||
// maybe schedule one submitted merge | ||
MergeTask mergeTask = randomFrom(submittedMergeTasks); | ||
submittedMergeTasks.remove(mergeTask); | ||
Schedule schedule = mergeTask.schedule(); | ||
if (schedule == Schedule.RUN) { | ||
scheduledToRunMergeTasks.add(mergeTask); | ||
} | ||
} else { | ||
// maybe run one scheduled merge | ||
MergeTask mergeTask = randomFrom(scheduledToRunMergeTasks); | ||
scheduledToRunMergeTasks.remove(mergeTask); | ||
mergeTask.run(); | ||
mergesToRun--; | ||
mergesOutstanding--; | ||
if (isUsingMaxTargetIORate.get() && mergesOutstanding > maxMergeCount) { | ||
expectIndexThrottling = true; | ||
} else if (mergesOutstanding <= maxMergeCount) { | ||
expectIndexThrottling = false; | ||
} | ||
} | ||
// assert IO throttle state | ||
assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(expectIndexThrottling)); | ||
} | ||
// all merges done | ||
assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(false)); | ||
} | ||
|
||
public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception { | ||
// test with min 2 allowed concurrent merges | ||
int mergeExecutorThreadCount = randomIntBetween(2, 5); | ||
|
@@ -493,4 +652,49 @@ private static MergeInfo getNewMergeInfo(long estimatedMergeBytes) { | |
private static MergeInfo getNewMergeInfo(long estimatedMergeBytes, int maxNumSegments) { | ||
return new MergeInfo(randomNonNegativeInt(), estimatedMergeBytes, randomBoolean(), maxNumSegments); | ||
} | ||
|
||
static class TestThreadPoolMergeScheduler extends ThreadPoolMergeScheduler { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you have to override a class to test it (or for it), in this case, probably what you need is a field you can change in the original class itself to trigger a different behaviour. The reason is a refactor in the code could give unexpected side effects not easy to spot by not testing agains the actual implementation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We typically try to avoid such artificial test pieces in production code. I think such an override as here is quite common so do not mind the current form. Feel free to maybe open a draft PR after this is merged to illustrate your point more (I am not sure I fully understood it). |
||
AtomicBoolean isIndexingThrottlingEnabled = new AtomicBoolean(false); | ||
|
||
TestThreadPoolMergeScheduler( | ||
ShardId shardId, | ||
IndexSettings indexSettings, | ||
ThreadPoolMergeExecutorService threadPoolMergeExecutorService | ||
) { | ||
super(shardId, indexSettings, threadPoolMergeExecutorService); | ||
} | ||
|
||
@Override | ||
protected void enableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) { | ||
isIndexingThrottlingEnabled.set(true); | ||
} | ||
|
||
@Override | ||
protected void disableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) { | ||
isIndexingThrottlingEnabled.set(false); | ||
} | ||
|
||
boolean isIndexingThrottlingEnabled() { | ||
return isIndexingThrottlingEnabled.get(); | ||
} | ||
} | ||
|
||
static ThreadPoolMergeExecutorService mockThreadPoolMergeExecutorService( | ||
List<MergeTask> submittedMergeTasks, | ||
AtomicBoolean isUsingMaxTargetIORate | ||
) { | ||
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class); | ||
doAnswer(invocation -> { | ||
MergeTask mergeTask = (MergeTask) invocation.getArguments()[0]; | ||
submittedMergeTasks.add(mergeTask); | ||
return null; | ||
}).when(threadPoolMergeExecutorService).submitMergeTask(any(MergeTask.class)); | ||
doAnswer(invocation -> { | ||
MergeTask mergeTask = (MergeTask) invocation.getArguments()[0]; | ||
submittedMergeTasks.add(mergeTask); | ||
return null; | ||
}).when(threadPoolMergeExecutorService).reEnqueueBackloggedMergeTask(any(MergeTask.class)); | ||
doAnswer(invocation -> isUsingMaxTargetIORate.get()).when(threadPoolMergeExecutorService).usingMaxTargetIORateBytesPerSec(); | ||
return threadPoolMergeExecutorService; | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated change, in order to let these log messages go to the
ThreadPoolMergeScheduler
's logger instead of theInternalEngine
's.