Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Improve the rejection logic for soft mode query groups during node duress (#16417) #16438

Merged
merged 1 commit into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Improve the rejection logic for soft mode query groups during node du…
…ress (#16417)

* improve the rejection logic for wlm

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add CHANGELOG

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

---------

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
  • Loading branch information
kaushalmahi12 committed Oct 22, 2024
commit a8fac13ce01df4d8965fe0ff97459ca9a9d0b603
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix missing fields in task index mapping to ensure proper task result storage ([#16201](https://github.com/opensearch-project/OpenSearch/pull/16201))
- Fix typo super->sb in method toString() of RemoteStoreNodeAttribute ([#15362](https://github.com/opensearch-project/OpenSearch/pull/15362))
- Fix array hashCode calculation in ResyncReplicationRequest ([#16378](https://github.com/opensearch-project/OpenSearch/pull/16378))
- [Workload Management] Enhance rejection mechanism in workload management ([#16417](https://github.com/opensearch-project/OpenSearch/pull/16417))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,12 @@ public void rejectIfNeeded(String queryGroupId) {
return;
}

// rejections will not happen for SOFT mode QueryGroups
// rejections will not happen for SOFT mode QueryGroups unless node is in duress
Optional<QueryGroup> optionalQueryGroup = activeQueryGroups.stream().filter(x -> x.get_id().equals(queryGroupId)).findFirst();

if (optionalQueryGroup.isPresent() && optionalQueryGroup.get().getResiliencyMode() == MutableQueryGroupFragment.ResiliencyMode.SOFT)
return;
if (optionalQueryGroup.isPresent()
&& (optionalQueryGroup.get().getResiliencyMode() == MutableQueryGroupFragment.ResiliencyMode.SOFT
&& !nodeDuressTrackers.isNodeInDuress())) return;

optionalQueryGroup.ifPresent(queryGroup -> {
boolean reject = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static org.mockito.Mockito.when;

public class QueryGroupServiceTests extends OpenSearchTestCase {
public static final String QUERY_GROUP_ID = "queryGroupId1";
private QueryGroupService queryGroupService;
private QueryGroupTaskCancellationService mockCancellationService;
private ClusterService mockClusterService;
Expand All @@ -68,6 +69,7 @@ public void setUp() throws Exception {
mockNodeDuressTrackers = Mockito.mock(NodeDuressTrackers.class);
mockCancellationService = Mockito.mock(TestQueryGroupCancellationService.class);
mockQueryGroupsStateAccessor = new QueryGroupsStateAccessor();
when(mockNodeDuressTrackers.isNodeInDuress()).thenReturn(false);

queryGroupService = new QueryGroupService(
mockCancellationService,
Expand Down Expand Up @@ -203,26 +205,52 @@ public void testRejectIfNeeded_whenQueryGroupIdIsNullOrDefaultOne() {
verify(spyMap, never()).get(any());
}

public void testRejectIfNeeded_whenSoftModeQueryGroupIsContendedAndNodeInDuress() {
Set<QueryGroup> activeQueryGroups = getActiveQueryGroups(
"testQueryGroup",
QUERY_GROUP_ID,
MutableQueryGroupFragment.ResiliencyMode.SOFT,
Map.of(ResourceType.CPU, 0.10)
);
mockQueryGroupStateMap = new HashMap<>();
mockQueryGroupStateMap.put("queryGroupId1", new QueryGroupState());
QueryGroupState state = new QueryGroupState();
QueryGroupState.ResourceTypeState cpuResourceState = new QueryGroupState.ResourceTypeState(ResourceType.CPU);
cpuResourceState.setLastRecordedUsage(0.10);
state.getResourceState().put(ResourceType.CPU, cpuResourceState);
QueryGroupState spyState = spy(state);
mockQueryGroupStateMap.put(QUERY_GROUP_ID, spyState);

mockQueryGroupsStateAccessor = new QueryGroupsStateAccessor(mockQueryGroupStateMap);

queryGroupService = new QueryGroupService(
mockCancellationService,
mockClusterService,
mockThreadPool,
mockWorkloadManagementSettings,
mockNodeDuressTrackers,
mockQueryGroupsStateAccessor,
activeQueryGroups,
new HashSet<>()
);
when(mockWorkloadManagementSettings.getWlmMode()).thenReturn(WlmMode.ENABLED);
when(mockNodeDuressTrackers.isNodeInDuress()).thenReturn(true);
assertThrows(OpenSearchRejectedExecutionException.class, () -> queryGroupService.rejectIfNeeded("queryGroupId1"));
}

public void testRejectIfNeeded_whenQueryGroupIsSoftMode() {
QueryGroup testQueryGroup = new QueryGroup(
Set<QueryGroup> activeQueryGroups = getActiveQueryGroups(
"testQueryGroup",
"queryGroupId1",
new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.SOFT, Map.of(ResourceType.CPU, 0.10)),
1L
QUERY_GROUP_ID,
MutableQueryGroupFragment.ResiliencyMode.SOFT,
Map.of(ResourceType.CPU, 0.10)
);
Set<QueryGroup> activeQueryGroups = new HashSet<>() {
{
add(testQueryGroup);
}
};
mockQueryGroupStateMap = new HashMap<>();
QueryGroupState spyState = spy(new QueryGroupState());
mockQueryGroupStateMap.put("queryGroupId1", spyState);

mockQueryGroupsStateAccessor = new QueryGroupsStateAccessor(mockQueryGroupStateMap);

Map<String, QueryGroupState> spyMap = spy(mockQueryGroupStateMap);

queryGroupService = new QueryGroupService(
mockCancellationService,
mockClusterService,
Expand All @@ -239,11 +267,11 @@ public void testRejectIfNeeded_whenQueryGroupIsSoftMode() {
}

public void testRejectIfNeeded_whenQueryGroupIsEnforcedMode_andNotBreaching() {
QueryGroup testQueryGroup = new QueryGroup(
QueryGroup testQueryGroup = getQueryGroup(
"testQueryGroup",
"queryGroupId1",
new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.ENFORCED, Map.of(ResourceType.CPU, 0.10)),
1L
MutableQueryGroupFragment.ResiliencyMode.ENFORCED,
Map.of(ResourceType.CPU, 0.10)
);
QueryGroup spuQueryGroup = spy(testQueryGroup);
Set<QueryGroup> activeQueryGroups = new HashSet<>() {
Expand Down Expand Up @@ -464,6 +492,31 @@ public void testShouldSBPHandle() {

}

private static Set<QueryGroup> getActiveQueryGroups(
String name,
String id,
MutableQueryGroupFragment.ResiliencyMode mode,
Map<ResourceType, Double> resourceLimits
) {
QueryGroup testQueryGroup = getQueryGroup(name, id, mode, resourceLimits);
Set<QueryGroup> activeQueryGroups = new HashSet<>() {
{
add(testQueryGroup);
}
};
return activeQueryGroups;
}

private static QueryGroup getQueryGroup(
String name,
String id,
MutableQueryGroupFragment.ResiliencyMode mode,
Map<ResourceType, Double> resourceLimits
) {
QueryGroup testQueryGroup = new QueryGroup(name, id, new MutableQueryGroupFragment(mode, resourceLimits), 1L);
return testQueryGroup;
}

// This is needed to test the behavior of QueryGroupService#doRun method
static class TestQueryGroupCancellationService extends QueryGroupTaskCancellationService {
public TestQueryGroupCancellationService(
Expand Down
Loading