-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deletepit API #3
base: createpit
Are you sure you want to change the base?
Changes from 6 commits
34da6ea
30cb6c0
43fed74
57232fb
ece4c00
d67e389
65c123a
a938e26
6789468
98451da
975b871
2a8e4ff
77be351
849d1d3
51ce82f
68e210d
81ff93d
2acb465
648402e
480bdc2
04532f4
6ceaf61
fe4e41a
1dd62b4
af9b143
dc111f5
87cb8a5
76b5ea6
892e984
f8b102c
248f188
829d79e
4a1cbd9
45a897f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.action.search; | ||
|
||
import org.opensearch.action.ActionType; | ||
|
||
public class DeletePITAction extends ActionType<DeletePITResponse> { | ||
|
||
public static final DeletePITAction INSTANCE = new DeletePITAction(); | ||
public static final String NAME = "indices:admin/delete/pit"; | ||
|
||
private DeletePITAction() { | ||
super(NAME, DeletePITResponse::new); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.action.search; | ||
|
||
import org.opensearch.action.ActionRequest; | ||
import org.opensearch.action.ActionRequestValidationException; | ||
import org.opensearch.common.io.stream.StreamInput; | ||
import org.opensearch.common.io.stream.StreamOutput; | ||
import org.opensearch.common.xcontent.ToXContent; | ||
import org.opensearch.common.xcontent.ToXContentObject; | ||
import org.opensearch.common.xcontent.XContentBuilder; | ||
import org.opensearch.common.xcontent.XContentParser; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
|
||
import static org.opensearch.action.ValidateActions.addValidationError; | ||
|
||
/** | ||
* Request to delete one or more PIT contexts based on IDs. | ||
*/ | ||
public class DeletePITRequest extends ActionRequest implements ToXContentObject { | ||
|
||
/** | ||
* List of PIT IDs to be deleted , and use "_all" to delete all PIT reader contexts | ||
*/ | ||
private List<String> pitIds; | ||
|
||
public DeletePITRequest(StreamInput in) throws IOException { | ||
super(in); | ||
pitIds = Arrays.asList(in.readStringArray()); | ||
} | ||
|
||
public DeletePITRequest(String... pitIds) { | ||
if (pitIds != null) { | ||
this.pitIds = Arrays.asList(pitIds); | ||
} | ||
} | ||
|
||
public DeletePITRequest(List<String> pitIds) { | ||
if (pitIds != null) { | ||
this.pitIds = pitIds; | ||
} | ||
} | ||
|
||
public DeletePITRequest() {} | ||
|
||
public List<String> getPitIds() { | ||
return pitIds; | ||
} | ||
|
||
public void setPitIds(List<String> pitIds) { | ||
this.pitIds = pitIds; | ||
} | ||
|
||
@Override | ||
public ActionRequestValidationException validate() { | ||
ActionRequestValidationException validationException = null; | ||
if (pitIds == null || pitIds.isEmpty()) { | ||
validationException = addValidationError("no pit ids specified", validationException); | ||
} | ||
return validationException; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
if (pitIds == null) { | ||
out.writeVInt(0); | ||
} else { | ||
out.writeStringArray(pitIds.toArray(new String[pitIds.size()])); | ||
} | ||
} | ||
|
||
public void addPitId(String pitId) { | ||
if (pitIds == null) { | ||
pitIds = new ArrayList<>(); | ||
} | ||
pitIds.add(pitId); | ||
} | ||
|
||
@Override | ||
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { | ||
builder.startObject(); | ||
builder.startArray("pit_id"); | ||
for (String pitId : pitIds) { | ||
builder.value(pitId); | ||
} | ||
builder.endArray(); | ||
builder.endObject(); | ||
return builder; | ||
} | ||
|
||
public void fromXContent(XContentParser parser) throws IOException { | ||
pitIds = null; | ||
if (parser.nextToken() != XContentParser.Token.START_OBJECT) { | ||
throw new IllegalArgumentException("Malformed content, must start with an object"); | ||
} else { | ||
XContentParser.Token token; | ||
String currentFieldName = null; | ||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { | ||
if (token == XContentParser.Token.FIELD_NAME) { | ||
currentFieldName = parser.currentName(); | ||
} else if ("pit_id".equals(currentFieldName)) { | ||
if (token == XContentParser.Token.START_ARRAY) { | ||
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { | ||
if (token.isValue() == false) { | ||
throw new IllegalArgumentException("pit_id array element should only contain pit_id"); | ||
} | ||
addPitId(parser.text()); | ||
} | ||
} else { | ||
if (token.isValue() == false) { | ||
throw new IllegalArgumentException("pit_id element should only contain pit_id"); | ||
} | ||
addPitId(parser.text()); | ||
} | ||
} else { | ||
throw new IllegalArgumentException( | ||
"Unknown parameter [" + currentFieldName + "] in request body or parameter is of the wrong type[" + token + "] " | ||
); | ||
} | ||
} | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
package org.opensearch.action.search; | ||
|
||
import org.opensearch.action.ActionResponse; | ||
import org.opensearch.common.ParseField; | ||
import org.opensearch.common.io.stream.StreamInput; | ||
import org.opensearch.common.io.stream.StreamOutput; | ||
import org.opensearch.common.xcontent.ConstructingObjectParser; | ||
import org.opensearch.common.xcontent.ObjectParser; | ||
import org.opensearch.common.xcontent.StatusToXContentObject; | ||
import org.opensearch.common.xcontent.ToXContent; | ||
import org.opensearch.common.xcontent.XContentBuilder; | ||
import org.opensearch.common.xcontent.XContentParser; | ||
import org.opensearch.rest.RestStatus; | ||
|
||
import java.io.IOException; | ||
|
||
import static org.opensearch.common.xcontent.ConstructingObjectParser.constructorArg; | ||
import static org.opensearch.rest.RestStatus.NOT_FOUND; | ||
import static org.opensearch.rest.RestStatus.OK; | ||
|
||
public class DeletePITResponse extends ActionResponse implements StatusToXContentObject { | ||
|
||
/** | ||
* This will be true if all PIT reader contexts are deleted. | ||
*/ | ||
private final boolean succeeded; | ||
|
||
public DeletePITResponse(boolean succeeded) { | ||
this.succeeded = succeeded; | ||
} | ||
|
||
public DeletePITResponse(StreamInput in) throws IOException { | ||
super(in); | ||
succeeded = in.readBoolean(); | ||
} | ||
|
||
/** | ||
* @return Whether the attempt to delete PIT was successful. | ||
*/ | ||
public boolean isSucceeded() { | ||
return succeeded; | ||
} | ||
|
||
@Override | ||
public RestStatus status() { | ||
return succeeded ? OK : NOT_FOUND; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
out.writeBoolean(succeeded); | ||
} | ||
|
||
private static final ParseField SUCCEEDED = new ParseField("succeeded"); | ||
|
||
private static final ConstructingObjectParser<DeletePITResponse, Void> PARSER = new ConstructingObjectParser<>( | ||
"delete_pit", | ||
true, | ||
a -> new DeletePITResponse((boolean) a[0]) | ||
); | ||
static { | ||
PARSER.declareField(constructorArg(), (parser, context) -> parser.booleanValue(), SUCCEEDED, ObjectParser.ValueType.BOOLEAN); | ||
} | ||
|
||
@Override | ||
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { | ||
builder.startObject(); | ||
builder.field(SUCCEEDED.getPreferredName(), succeeded); | ||
builder.endObject(); | ||
return builder; | ||
} | ||
|
||
/** | ||
* Parse the delete PIT response body into a new {@link DeletePITResponse} object | ||
*/ | ||
public static DeletePITResponse fromXContent(XContentParser parser) throws IOException { | ||
return PARSER.apply(parser, null); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -83,8 +83,10 @@ | |
public class SearchTransportService { | ||
|
||
public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]"; | ||
public static final String FREE_CONTEXT_PIT_ACTION_NAME = "indices:data/read/search[free_context/pit]"; | ||
public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]"; | ||
public static final String CLEAR_SCROLL_CONTEXTS_ACTION_NAME = "indices:data/read/search[clear_scroll_contexts]"; | ||
public static final String DELETE_ALL_PIT_CONTEXTS_ACTION_NAME = "indices:data/read/search[delete_pit_contexts]"; | ||
public static final String DFS_ACTION_NAME = "indices:data/read/search[phase/dfs]"; | ||
public static final String QUERY_ACTION_NAME = "indices:data/read/search[phase/query]"; | ||
public static final String QUERY_ID_ACTION_NAME = "indices:data/read/search[phase/query/id]"; | ||
|
@@ -142,6 +144,20 @@ public void sendFreeContext( | |
); | ||
} | ||
|
||
public void sendPitFreeContext( | ||
Transport.Connection connection, | ||
ShardSearchContextId contextId, | ||
ActionListener<SearchFreeContextResponse> listener | ||
) { | ||
transportService.sendRequest( | ||
connection, | ||
FREE_CONTEXT_PIT_ACTION_NAME, | ||
new ScrollFreeContextRequest(contextId), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this |
||
TransportRequestOptions.EMPTY, | ||
new ActionListenerResponseHandler<>(listener, SearchFreeContextResponse::new) | ||
); | ||
} | ||
|
||
public void updatePitContext( | ||
Transport.Connection connection, | ||
UpdatePITContextRequest request, | ||
|
@@ -198,6 +214,16 @@ public void sendClearAllScrollContexts(Transport.Connection connection, final Ac | |
); | ||
} | ||
|
||
public void sendDeleteAllPitContexts(Transport.Connection connection, final ActionListener<TransportResponse> listener) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's make it uniform and call it freeAllPitContexts maybe |
||
transportService.sendRequest( | ||
connection, | ||
DELETE_ALL_PIT_CONTEXTS_ACTION_NAME, | ||
TransportRequest.Empty.INSTANCE, | ||
TransportRequestOptions.EMPTY, | ||
new ActionListenerResponseHandler<>(listener, (in) -> TransportResponse.Empty.INSTANCE) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this returning an empty transport response? |
||
); | ||
} | ||
|
||
public void sendExecuteDfs( | ||
Transport.Connection connection, | ||
final ShardSearchRequest request, | ||
|
@@ -437,6 +463,18 @@ public static void registerRequestHandler(TransportService transportService, Sea | |
} | ||
); | ||
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new); | ||
|
||
transportService.registerRequestHandler( | ||
FREE_CONTEXT_PIT_ACTION_NAME, | ||
ThreadPool.Names.SAME, | ||
ScrollFreeContextRequest::new, | ||
(request, channel, task) -> { | ||
boolean freed = searchService.freeReaderContextIfFound(request.id()); | ||
channel.sendResponse(new SearchFreeContextResponse(freed)); | ||
} | ||
); | ||
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_PIT_ACTION_NAME, SearchFreeContextResponse::new); | ||
|
||
transportService.registerRequestHandler( | ||
FREE_CONTEXT_ACTION_NAME, | ||
ThreadPool.Names.SAME, | ||
|
@@ -620,6 +658,21 @@ public static void registerRequestHandler(TransportService transportService, Sea | |
); | ||
TransportActionProxy.registerProxyAction(transportService, UPDATE_READER_CONTEXT_ACTION_NAME, UpdatePitContextResponse::new); | ||
|
||
transportService.registerRequestHandler( | ||
DELETE_ALL_PIT_CONTEXTS_ACTION_NAME, | ||
ThreadPool.Names.SAME, | ||
TransportRequest.Empty::new, | ||
(request, channel, task) -> { | ||
searchService.freeAllPitContexts(); | ||
channel.sendResponse(TransportResponse.Empty.INSTANCE); | ||
} | ||
); | ||
TransportActionProxy.registerProxyAction( | ||
transportService, | ||
DELETE_ALL_PIT_CONTEXTS_ACTION_NAME, | ||
(in) -> TransportResponse.Empty.INSTANCE | ||
); | ||
|
||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
variable name and action name should be consistent for delete all should be consistent with
public static final String FREE_CONTEXT_PIT_ACTION_NAME = "indices:data/read/search[free_context/pit]";