Skip to content

Commit

Permalink
modify based on comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <mariazrr@amazon.com>
  • Loading branch information
ruai0511 committed Sep 23, 2024
1 parent 9714571 commit 2df37a3
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class QueryGroupStatsRequest extends BaseNodesRequest<QueryGroupStatsRequ
private final Set<String> queryGroupIds;
private final Boolean breach;

protected QueryGroupStatsRequest(StreamInput in) throws IOException {
public QueryGroupStatsRequest(StreamInput in) throws IOException {
super(in);
this.queryGroupIds = new HashSet<>(Set.of(in.readStringArray()));
this.breach = in.readOptionalBoolean();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.nodes.TransportNodesAction;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.stats.QueryGroupStats;
Expand All @@ -33,7 +30,7 @@
public class TransportQueryGroupStatsAction extends TransportNodesAction<
QueryGroupStatsRequest,
QueryGroupStatsResponse,
TransportQueryGroupStatsAction.NodeQueryGroupStatsRequest,
QueryGroupStatsRequest,
QueryGroupStats> {

QueryGroupService queryGroupService;
Expand All @@ -53,7 +50,7 @@ public TransportQueryGroupStatsAction(
transportService,
actionFilters,
QueryGroupStatsRequest::new,
NodeQueryGroupStatsRequest::new,
QueryGroupStatsRequest::new,
ThreadPool.Names.MANAGEMENT,
QueryGroupStats.class
);
Expand All @@ -70,8 +67,8 @@ protected QueryGroupStatsResponse newResponse(
}

@Override
protected NodeQueryGroupStatsRequest newNodeRequest(QueryGroupStatsRequest request) {
return new NodeQueryGroupStatsRequest(request);
protected QueryGroupStatsRequest newNodeRequest(QueryGroupStatsRequest request) {
return request;
}

@Override
Expand All @@ -80,37 +77,7 @@ protected QueryGroupStats newNodeResponse(StreamInput in) throws IOException {
}

@Override
protected QueryGroupStats nodeOperation(NodeQueryGroupStatsRequest nodeQueryGroupStatsRequest) {
QueryGroupStatsRequest request = nodeQueryGroupStatsRequest.request;
return queryGroupService.nodeStats(request.getQueryGroupIds(), request.isBreach());
}

/**
* Inner QueryGroupStatsRequest
*
* @opensearch.experimental
*/
public static class NodeQueryGroupStatsRequest extends TransportRequest {

protected QueryGroupStatsRequest request;

public NodeQueryGroupStatsRequest(StreamInput in) throws IOException {
super(in);
request = new QueryGroupStatsRequest(in);
}

NodeQueryGroupStatsRequest(QueryGroupStatsRequest request) {
this.request = request;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
}

public DiscoveryNode[] getDiscoveryNodes() {
return this.request.concreteNodes();
}
protected QueryGroupStats nodeOperation(QueryGroupStatsRequest queryGroupStatsRequest) {
return queryGroupService.nodeStats(queryGroupStatsRequest.getQueryGroupIds(), queryGroupStatsRequest.isBreach());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsRequest;
import org.opensearch.action.admin.cluster.wlm.TransportQueryGroupStatsAction;
import org.opensearch.action.admin.cluster.wlm.TransportQueryGroupStatsAction.NodeQueryGroupStatsRequest;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.io.stream.BytesStreamOutput;
Expand All @@ -35,16 +34,16 @@ public class TransportQueryGroupStatsActionTests extends TransportNodesActionTes
*/
public void testQueryGroupStatsActionWithRetentionOfDiscoveryNodesList() {
QueryGroupStatsRequest request = new QueryGroupStatsRequest();
Map<String, List<NodeQueryGroupStatsRequest>> combinedSentRequest = performQueryGroupStatsAction(request);
Map<String, List<QueryGroupStatsRequest>> combinedSentRequest = performQueryGroupStatsAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> { assertNull(sentRequest.getDiscoveryNodes()); });
capturedRequestList.forEach(sentRequest -> { assertNull(sentRequest.concreteNodes()); });
});
}

private Map<String, List<NodeQueryGroupStatsRequest>> performQueryGroupStatsAction(QueryGroupStatsRequest request) {
private Map<String, List<QueryGroupStatsRequest>> performQueryGroupStatsAction(QueryGroupStatsRequest request) {
TransportNodesAction action = new TransportQueryGroupStatsAction(
THREAD_POOL,
clusterService,
Expand All @@ -55,19 +54,18 @@ private Map<String, List<NodeQueryGroupStatsRequest>> performQueryGroupStatsActi
PlainActionFuture<QueryGroupStatsRequest> listener = new PlainActionFuture<>();
action.new AsyncAction(null, request, listener).start();
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear();
Map<String, List<NodeQueryGroupStatsRequest>> combinedSentRequest = new HashMap<>();
Map<String, List<QueryGroupStatsRequest>> combinedSentRequest = new HashMap<>();

capturedRequests.forEach((node, capturedRequestList) -> {
List<NodeQueryGroupStatsRequest> sentRequestList = new ArrayList<>();
List<QueryGroupStatsRequest> sentRequestList = new ArrayList<>();

capturedRequestList.forEach(preSentRequest -> {
BytesStreamOutput out = new BytesStreamOutput();
try {
TransportQueryGroupStatsAction.NodeQueryGroupStatsRequest QueryGroupStatsRequestFromCoordinator =
(TransportQueryGroupStatsAction.NodeQueryGroupStatsRequest) preSentRequest.request;
QueryGroupStatsRequest QueryGroupStatsRequestFromCoordinator = (QueryGroupStatsRequest) preSentRequest.request;
QueryGroupStatsRequestFromCoordinator.writeTo(out);
StreamInput in = out.bytes().streamInput();
NodeQueryGroupStatsRequest QueryGroupStatsRequest = new NodeQueryGroupStatsRequest(in);
QueryGroupStatsRequest QueryGroupStatsRequest = new QueryGroupStatsRequest(in);
sentRequestList.add(QueryGroupStatsRequest);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down

0 comments on commit 2df37a3

Please sign in to comment.