diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsResponse.java index 8fa958a97634f..16da94c742da5 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsResponse.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsResponse.java @@ -49,6 +49,7 @@ protected void writeNodesTo(StreamOutput out, List nodes) throw @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { for (QueryGroupStats queryGroupStats : getNodes()) { + System.out.println(queryGroupStats.getNode().getId()); builder.startObject(queryGroupStats.getNode().getId()); queryGroupStats.toXContent(builder, params); builder.endObject(); diff --git a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java index b4905508e134d..c37ae6cd4c04a 100644 --- a/server/src/main/java/org/opensearch/wlm/QueryGroupService.java +++ b/server/src/main/java/org/opensearch/wlm/QueryGroupService.java @@ -89,7 +89,7 @@ public boolean resourceLimitBreached(String id, QueryGroupState currentState) { .anyMatch( entry -> entry.getValue().getLastRecordedUsage() > queryGroup.getMutableQueryGroupFragment() .getResourceLimits() - .get(entry.getKey()) + .getOrDefault(entry.getKey(), 0.0) ); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsResponseTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsResponseTests.java new file mode 100644 index 0000000000000..b09f05e839ac3 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/admin/cluster/wlm/QueryGroupStatsResponseTests.java @@ -0,0 +1,108 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.wlm; + +import org.opensearch.Version; +import org.opensearch.action.FailedNodeException; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.wlm.ResourceType; +import org.opensearch.wlm.stats.QueryGroupStats; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class QueryGroupStatsResponseTests extends OpenSearchTestCase { + ClusterName clusterName = new ClusterName("test-cluster"); + String testQueryGroupId = "safjgagnaeekg-3r3fads"; + DiscoveryNode node = new DiscoveryNode( + "node-1", + buildNewFakeTransportAddress(), + new HashMap<>(), + Set.of(DiscoveryNodeRole.DATA_ROLE), + Version.CURRENT + ); + Map statsHolderMap = new HashMap<>(); + QueryGroupStats queryGroupStats = new QueryGroupStats( + node, + Map.of( + testQueryGroupId, + new QueryGroupStats.QueryGroupStatsHolder( + 0, + 0, + 1, + 0, + Map.of( + ResourceType.CPU, + new QueryGroupStats.ResourceStats(0, 0, 0), + ResourceType.MEMORY, + new QueryGroupStats.ResourceStats(0, 0, 0) + ) + ) + ) + ); + List queryGroupStatsList = List.of(queryGroupStats); + List failedNodeExceptionList = new ArrayList<>(); + + public void testSerializationAndDeserialization() throws IOException { + QueryGroupStatsResponse queryGroupStatsResponse = new QueryGroupStatsResponse( + clusterName, + queryGroupStatsList, + failedNodeExceptionList + ); + BytesStreamOutput out = new BytesStreamOutput(); + queryGroupStatsResponse.writeTo(out); + StreamInput in = out.bytes().streamInput(); + QueryGroupStatsResponse deserializedResponse = new QueryGroupStatsResponse(in); + assertEquals(queryGroupStatsResponse.getClusterName(), deserializedResponse.getClusterName()); + assertEquals(queryGroupStatsResponse.getNodes().size(), deserializedResponse.getNodes().size()); + } + + public void testToString() { + QueryGroupStatsResponse queryGroupStatsResponse = new QueryGroupStatsResponse( + clusterName, + queryGroupStatsList, + failedNodeExceptionList + ); + String responseString = queryGroupStatsResponse.toString(); + assertEquals( + "{\n" + + " \"node-1\" : {\n" + + " \"query_groups\" : {\n" + + " \"safjgagnaeekg-3r3fads\" : {\n" + + " \"completions\" : 0,\n" + + " \"rejections\" : 0,\n" + + " \"failures\" : 1,\n" + + " \"total_cancellations\" : 0,\n" + + " \"cpu\" : {\n" + + " \"current_usage\" : 0.0,\n" + + " \"cancellations\" : 0,\n" + + " \"rejections\" : 0\n" + + " },\n" + + " \"memory\" : {\n" + + " \"current_usage\" : 0.0,\n" + + " \"cancellations\" : 0,\n" + + " \"rejections\" : 0\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}", + responseString + ); + } +} diff --git a/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListenerTests.java b/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListenerTests.java index b24a4834a1d76..ecea043d09e4b 100644 --- a/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListenerTests.java +++ b/server/src/test/java/org/opensearch/wlm/listeners/QueryGroupRequestOperationListenerTests.java @@ -8,6 +8,9 @@ package org.opensearch.wlm.listeners; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.QueryGroup; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.concurrent.ThreadContext; @@ -16,6 +19,7 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; +import org.opensearch.wlm.MutableQueryGroupFragment; import org.opensearch.wlm.QueryGroupService; import org.opensearch.wlm.QueryGroupTask; import org.opensearch.wlm.ResourceType; @@ -97,6 +101,25 @@ public void testValidQueryGroupRequestFailure() throws IOException { assertSuccess(testQueryGroupId, queryGroupStateMap, expectedStats, testQueryGroupId); } + public void testResourceLimitBreached() throws IOException { + QueryGroup queryGroup = new QueryGroup( + "testgroup", + new MutableQueryGroupFragment(MutableQueryGroupFragment.ResiliencyMode.MONITOR, Map.of(ResourceType.MEMORY, 0.5)) + ); + queryGroupStateMap.put(testQueryGroupId, new QueryGroupState()); + TransportService mockTransportService = mock(TransportService.class); + DiscoveryNode mockDiscoveryNode = mock(DiscoveryNode.class); + when(mockTransportService.getLocalNode()).thenReturn(mockDiscoveryNode); + ClusterService clusterService = mock(ClusterService.class); + ClusterState clusterState = mock(ClusterState.class); + when(clusterService.state()).thenReturn(clusterState); + Metadata metadata = mock(Metadata.class); + when(clusterState.metadata()).thenReturn(metadata); + when(metadata.queryGroups()).thenReturn(Map.of(testQueryGroupId, queryGroup)); + queryGroupService = new QueryGroupService(mockTransportService.getLocalNode(), clusterService, queryGroupStateMap); + assertFalse(queryGroupService.resourceLimitBreached(testQueryGroupId, new QueryGroupState())); + } + public void testMultiThreadedValidQueryGroupRequestFailures() { queryGroupStateMap.put(testQueryGroupId, new QueryGroupState());