Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Create QueryGroup API Logic #14680

Merged
merged 20 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
modify based on commments
Signed-off-by: Ruirui Zhang <mariazrr@amazon.com>
  • Loading branch information
ruai0511 committed Aug 8, 2024
commit 8f13d41871b3b7338aea316d7056c0150b084a9b
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.inject.Module;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Settings;
Expand All @@ -25,7 +24,6 @@
import org.opensearch.rest.RestController;
import org.opensearch.rest.RestHandler;

import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

Expand Down Expand Up @@ -56,9 +54,4 @@ public List<RestHandler> getRestHandlers(
) {
return List.of(new RestCreateQueryGroupAction());
}

@Override
public Collection<Module> createGuiceModules() {
return List.of(new WorkloadManagementPluginModule());
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,78 +11,35 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.cluster.metadata.QueryGroup.ResiliencyMode;
import org.opensearch.common.UUIDs;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.XContentParseException;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.search.ResourceType;
import org.joda.time.Instant;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* A request for create QueryGroup
* User input schema:
* {
* "name": "analytics",
* "resiliency_mode": "enforced",
* "resourceLimits": {
* "resource_limits": {
* "cpu" : 0.4,
* "memory" : 0.2
* }
* }
*
* @opensearch.experimental
*/
public class CreateQueryGroupRequest extends ActionRequest implements Writeable.Reader<CreateQueryGroupRequest> {
private String name;
private String _id;
private ResiliencyMode resiliencyMode;
private Map<ResourceType, Object> resourceLimits;
private long updatedAtInMillis;

/**
* Default constructor for CreateQueryGroupRequest
*/
public CreateQueryGroupRequest() {}
public class CreateQueryGroupRequest extends ActionRequest {
final QueryGroup queryGroup;

/**
* Constructor for CreateQueryGroupRequest
* @param queryGroup - A {@link QueryGroup} object
*/
public CreateQueryGroupRequest(QueryGroup queryGroup) {
this.name = queryGroup.getName();
this._id = queryGroup.get_id();
this.resourceLimits = queryGroup.getResourceLimits();
this.resiliencyMode = queryGroup.getResiliencyMode();
this.updatedAtInMillis = queryGroup.getUpdatedAtInMillis();
}

/**
* Constructor for CreateQueryGroupRequest
* @param name - QueryGroup name for CreateQueryGroupRequest
* @param _id - QueryGroup _id for CreateQueryGroupRequest
* @param mode - QueryGroup mode for CreateQueryGroupRequest
* @param resourceLimits - QueryGroup resourceLimits for CreateQueryGroupRequest
* @param updatedAtInMillis - QueryGroup updated time in millis for CreateQueryGroupRequest
*/
public CreateQueryGroupRequest(
String name,
String _id,
ResiliencyMode mode,
Map<ResourceType, Object> resourceLimits,
long updatedAtInMillis
) {
this.name = name;
this._id = _id;
this.resourceLimits = resourceLimits;
this.resiliencyMode = mode;
this.updatedAtInMillis = updatedAtInMillis;
this.queryGroup = queryGroup;
}

/**
Expand All @@ -91,153 +48,35 @@ public CreateQueryGroupRequest(
*/
public CreateQueryGroupRequest(StreamInput in) throws IOException {
super(in);
name = in.readString();
_id = in.readString();
resiliencyMode = ResiliencyMode.fromName(in.readString());
resourceLimits = in.readMap((i) -> ResourceType.fromName(i.readString()), StreamInput::readGenericValue);
updatedAtInMillis = in.readLong();
}

@Override
public CreateQueryGroupRequest read(StreamInput in) throws IOException {
return new CreateQueryGroupRequest(in);
queryGroup = new QueryGroup(in);
}

/**
* Generate a CreateQueryGroupRequest from XContent
* @param parser - A {@link XContentParser} object
*/
public static CreateQueryGroupRequest fromXContent(XContentParser parser) throws IOException {

while (parser.currentToken() != XContentParser.Token.START_OBJECT) {
parser.nextToken();
}

if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
throw new XContentParseException("expected start object but got a " + parser.currentToken());
}

XContentParser.Token token;
String fieldName = "";
String name = null;
ResiliencyMode mode = null;

// Map to hold resources
final Map<ResourceType, Object> resourceLimits = new HashMap<>();
while ((token = parser.nextToken()) != null) {
if (token == XContentParser.Token.FIELD_NAME) {
fieldName = parser.currentName();
} else if (token.isValue()) {
if (fieldName.equals("name")) {
name = parser.text();
} else if (fieldName.equals("resiliency_mode")) {
mode = ResiliencyMode.fromName(parser.text());
} else {
throw new XContentParseException("unrecognised [field=" + fieldName + " in QueryGroup");
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (!fieldName.equals("resourceLimits")) {
throw new XContentParseException("Invalid field passed. QueryGroup does not support " + fieldName + ".");
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
fieldName = parser.currentName();
} else {
resourceLimits.put(ResourceType.fromName(fieldName), parser.doubleValue());
}
}
}
}
return new CreateQueryGroupRequest(name, UUIDs.randomBase64UUID(), mode, resourceLimits, Instant.now().getMillis());
QueryGroup queryGroup1 = QueryGroup.fromXContent(parser);
// creating this queryGroup to ensure forceful creation of _id and updatedAt
QueryGroup queryGroup = new QueryGroup(queryGroup1.getName(), queryGroup1.getResiliencyMode(), queryGroup1.getResourceLimits());
return new CreateQueryGroupRequest(queryGroup);
ruai0511 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public ActionRequestValidationException validate() {
return null;
}

/**
* name getter
*/
public String getName() {
return name;
}

/**
* name setter
* @param name - name to be set
*/
public void setName(String name) {
this.name = name;
}

/**
* resourceLimits getter
*/
public Map<ResourceType, Object> getResourceLimits() {
return resourceLimits;
}

/**
* resourceLimits setter
* @param resourceLimits - resourceLimit to be set
*/
public void setResourceLimits(Map<ResourceType, Object> resourceLimits) {
this.resourceLimits = resourceLimits;
}

/**
* mode getter
*/
public ResiliencyMode getResiliencyMode() {
return resiliencyMode;
}

/**
* mode setter
* @param resiliencyMode - mode to be set
*/
public void setResiliencyMode(ResiliencyMode resiliencyMode) {
this.resiliencyMode = resiliencyMode;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
out.writeString(_id);
out.writeString(resiliencyMode.getName());
out.writeMap(resourceLimits, ResourceType::writeTo, StreamOutput::writeGenericValue);
out.writeLong(updatedAtInMillis);
}

/**
* _id getter
*/
public String get_id() {
return _id;
}

/**
* UUID setter
* @param _id - _id to be set
*/
public void set_id(String _id) {
this._id = _id;
}

/**
* updatedAtInMillis getter
*/
public long getUpdatedAtInMillis() {
return updatedAtInMillis;
QueryGroup.writeToOutput(out, queryGroup);
}

/**
* updatedAtInMillis setter
* @param updatedAtInMillis - updatedAtInMillis to be set
* QueryGroup getter
*/
public void setUpdatedAtInMillis(long updatedAtInMillis) {
this.updatedAtInMillis = updatedAtInMillis;
public QueryGroup getQueryGroup() {
return queryGroup;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,13 @@

import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import static org.opensearch.cluster.metadata.QueryGroup.builder;

/**
* Transport action to create QueryGroup
*
Expand Down Expand Up @@ -54,13 +51,7 @@ public TransportCreateQueryGroupAction(

@Override
protected void doExecute(Task task, CreateQueryGroupRequest request, ActionListener<CreateQueryGroupResponse> listener) {
QueryGroup queryGroup = builder().name(request.getName())
._id(request.get_id())
.mode(request.getResiliencyMode().getName())
.resourceLimits(request.getResourceLimits())
.updatedAt(request.getUpdatedAtInMillis())
.build();
threadPool.executor(ThreadPool.Names.GENERIC)
ruai0511 marked this conversation as resolved.
Show resolved Hide resolved
.execute(() -> queryGroupPersistenceService.persistInClusterStateMetadata(queryGroup, listener));
.execute(() -> queryGroupPersistenceService.persistInClusterStateMetadata(request.getQueryGroup(), listener));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
*/

/**
* Package for the action classes for QueryGroup CRUD operations
* Package for the action classes of WorkloadManagementPlugin
*/
package org.opensearch.plugin.wlm.action;
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
*/

/**
* Base package for CRUD API of QueryGroup
* Base package for WorkloadManagementPlugin
*/
package org.opensearch.plugin.wlm;
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,12 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
CreateQueryGroupRequest createQueryGroupRequest = new CreateQueryGroupRequest();
request.applyContentParser((parser) -> parseRestRequest(createQueryGroupRequest, parser));
return channel -> client.execute(CreateQueryGroupAction.INSTANCE, createQueryGroupRequest, createQueryGroupResponse(channel));
}

private void parseRestRequest(CreateQueryGroupRequest request, XContentParser parser) throws IOException {
final CreateQueryGroupRequest createQueryGroupRequest = CreateQueryGroupRequest.fromXContent(parser);
request.setName(createQueryGroupRequest.getName());
request.set_id(createQueryGroupRequest.get_id());
request.setResiliencyMode(createQueryGroupRequest.getResiliencyMode());
request.setResourceLimits(createQueryGroupRequest.getResourceLimits());
request.setUpdatedAtInMillis(createQueryGroupRequest.getUpdatedAtInMillis());
XContentParser parser = request.contentParser();
ruai0511 marked this conversation as resolved.
Show resolved Hide resolved
return channel -> client.execute(
CreateQueryGroupAction.INSTANCE,
CreateQueryGroupRequest.fromXContent(parser),
createQueryGroupResponse(channel)
);
}

private RestResponseListener<CreateQueryGroupResponse> createQueryGroupResponse(final RestChannel channel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
*/

/**
* Package for the rest classes for QueryGroup CRUD operations
* Package for the rest classes of WorkloadManagementPlugin
*/
package org.opensearch.plugin.wlm.rest;
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ ClusterState saveQueryGroupInClusterState(final QueryGroup queryGroup, final Clu
* @param resourceName - the resourceName we want to get the usage for
* @param resourceLimits - the resource limit from which to get the allocation value for resourceName
*/
private double getResourceLimitValue(String resourceName, final Map<ResourceType, Object> resourceLimits) {
private double getResourceLimitValue(String resourceName, final Map<ResourceType, Double> resourceLimits) {
for (ResourceType resourceType : resourceLimits.keySet()) {
if (resourceType.getName().equals(resourceName)) {
return (double) resourceLimits.get(resourceType);
return resourceLimits.get(resourceType);
}
}
return 0.0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
*/

/**
* Package for the service classes for QueryGroup CRUD operations
* Package for the service classes of WorkloadManagementPlugin
*/
package org.opensearch.plugin.wlm.service;
Loading