Skip to content

Commit

Permalink
Wlm create/update REST API bug fix (opensearch-project#16422)
Browse files Browse the repository at this point in the history
* test changes

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* fix the create/update queryGroup REST APIs

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* undo gradle change

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add PR link in CHANGELOG

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* fix javadoc issues

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* remove redundant name param

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* Update CHANGELOG.md

Signed-off-by: Ankit Jain <akjain@amazon.com>

* fix action name in transport class for update query group

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

---------

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
Signed-off-by: Ankit Jain <akjain@amazon.com>
Co-authored-by: Ankit Jain <akjain@amazon.com>
  • Loading branch information
kaushalmahi12 and jainankitk authored Oct 23, 2024
1 parent 6891267 commit 760e676
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix array hashCode calculation in ResyncReplicationRequest ([#16378](https://github.com/opensearch-project/OpenSearch/pull/16378))
- Fix missing fields in task index mapping to ensure proper task result storage ([#16201](https://github.com/opensearch-project/OpenSearch/pull/16201))
- Fix typo super->sb in method toString() of RemoteStoreNodeAttribute ([#15362](https://github.com/opensearch-project/OpenSearch/pull/15362))
- [Workload Management] Fixing Create/Update QueryGroup TransportActions to execute from non-cluster manager nodes ([16422](https://github.com/opensearch-project/OpenSearch/pull/16422))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

package org.opensearch.plugin.wlm.action;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.common.UUIDs;
import org.opensearch.core.common.io.stream.StreamInput;
Expand All @@ -33,7 +33,7 @@
*
* @opensearch.experimental
*/
public class CreateQueryGroupRequest extends ActionRequest {
public class CreateQueryGroupRequest extends ClusterManagerNodeRequest<CreateQueryGroupRequest> {
private final QueryGroup queryGroup;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,82 @@
package org.opensearch.plugin.wlm.action;

import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;

import static org.opensearch.threadpool.ThreadPool.Names.SAME;

/**
* Transport action to create QueryGroup
*
* @opensearch.experimental
*/
public class TransportCreateQueryGroupAction extends HandledTransportAction<CreateQueryGroupRequest, CreateQueryGroupResponse> {
public class TransportCreateQueryGroupAction extends TransportClusterManagerNodeAction<CreateQueryGroupRequest, CreateQueryGroupResponse> {

private final QueryGroupPersistenceService queryGroupPersistenceService;

/**
* Constructor for TransportCreateQueryGroupAction
*
* @param actionName - action name
* @param threadPool - {@link ThreadPool} object
* @param transportService - a {@link TransportService} object
* @param actionFilters - a {@link ActionFilters} object
* @param indexNameExpressionResolver - {@link IndexNameExpressionResolver} object
* @param queryGroupPersistenceService - a {@link QueryGroupPersistenceService} object
*/
@Inject
public TransportCreateQueryGroupAction(
String actionName,
ThreadPool threadPool,
TransportService transportService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
QueryGroupPersistenceService queryGroupPersistenceService
) {
super(CreateQueryGroupAction.NAME, transportService, actionFilters, CreateQueryGroupRequest::new);
super(
CreateQueryGroupAction.NAME,
transportService,
queryGroupPersistenceService.getClusterService(),
threadPool,
actionFilters,
CreateQueryGroupRequest::new,
indexNameExpressionResolver
);
this.queryGroupPersistenceService = queryGroupPersistenceService;
}

@Override
protected void doExecute(Task task, CreateQueryGroupRequest request, ActionListener<CreateQueryGroupResponse> listener) {
protected void clusterManagerOperation(
CreateQueryGroupRequest request,
ClusterState clusterState,
ActionListener<CreateQueryGroupResponse> listener
) {
queryGroupPersistenceService.persistInClusterStateMetadata(request.getQueryGroup(), listener);
}

@Override
protected String executor() {
return SAME;
}

@Override
protected CreateQueryGroupResponse read(StreamInput in) throws IOException {
return new CreateQueryGroupResponse(in);
}

@Override
protected ClusterBlockException checkBlock(CreateQueryGroupRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,43 +9,81 @@
package org.opensearch.plugin.wlm.action;

import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.plugin.wlm.service.QueryGroupPersistenceService;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;

import static org.opensearch.threadpool.ThreadPool.Names.SAME;

/**
* Transport action to update QueryGroup
*
* @opensearch.experimental
*/
public class TransportUpdateQueryGroupAction extends HandledTransportAction<UpdateQueryGroupRequest, UpdateQueryGroupResponse> {
public class TransportUpdateQueryGroupAction extends TransportClusterManagerNodeAction<UpdateQueryGroupRequest, UpdateQueryGroupResponse> {

private final QueryGroupPersistenceService queryGroupPersistenceService;

/**
* Constructor for TransportUpdateQueryGroupAction
*
* @param actionName - action name
* @param threadPool - {@link ThreadPool} object
* @param transportService - a {@link TransportService} object
* @param actionFilters - a {@link ActionFilters} object
* @param indexNameExpressionResolver - {@link IndexNameExpressionResolver} object
* @param queryGroupPersistenceService - a {@link QueryGroupPersistenceService} object
*/
@Inject
public TransportUpdateQueryGroupAction(
String actionName,
ThreadPool threadPool,
TransportService transportService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
QueryGroupPersistenceService queryGroupPersistenceService
) {
super(UpdateQueryGroupAction.NAME, transportService, actionFilters, UpdateQueryGroupRequest::new);
super(
UpdateQueryGroupAction.NAME,
transportService,
queryGroupPersistenceService.getClusterService(),
threadPool,
actionFilters,
UpdateQueryGroupRequest::new,
indexNameExpressionResolver
);
this.queryGroupPersistenceService = queryGroupPersistenceService;
}

@Override
protected void doExecute(Task task, UpdateQueryGroupRequest request, ActionListener<UpdateQueryGroupResponse> listener) {
protected void clusterManagerOperation(
UpdateQueryGroupRequest request,
ClusterState clusterState,
ActionListener<UpdateQueryGroupResponse> listener
) {
queryGroupPersistenceService.updateInClusterStateMetadata(request, listener);
}

@Override
protected String executor() {
return SAME;
}

@Override
protected UpdateQueryGroupResponse read(StreamInput in) throws IOException {
return new UpdateQueryGroupResponse(in);
}

@Override
protected ClusterBlockException checkBlock(UpdateQueryGroupRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

package org.opensearch.plugin.wlm.action;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand All @@ -23,7 +23,7 @@
*
* @opensearch.experimental
*/
public class UpdateQueryGroupRequest extends ActionRequest {
public class UpdateQueryGroupRequest extends ClusterManagerNodeRequest<UpdateQueryGroupRequest> {
private final String name;
private final MutableQueryGroupFragment mutableQueryGroupFragment;

Expand Down

0 comments on commit 760e676

Please sign in to comment.