Skip to content

Chunked encoding for tasks APIs #91935

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
Expand Down Expand Up @@ -43,17 +41,7 @@ public CancelTasksResponse(
super(tasks, taskFailures, nodeFailures);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return super.toXContent(builder, params);
}

public static CancelTasksResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

@Override
public String toString() {
return Strings.toString(this, true, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,33 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Returns the list of tasks currently running on the nodes
*/
public class ListTasksResponse extends BaseTasksResponse implements ToXContentObject {
public class ListTasksResponse extends BaseTasksResponse {
private static final String TASKS = "tasks";

private final List<TaskInfo> tasks;
Expand Down Expand Up @@ -142,7 +145,7 @@ private void buildTaskGroups() {
topLevelTasks.add(taskGroup);
}
}
this.groups = Collections.unmodifiableList(topLevelTasks.stream().map(TaskGroup.Builder::build).toList());
this.groups = topLevelTasks.stream().map(TaskGroup.Builder::build).toList();
}

/**
Expand All @@ -155,82 +158,98 @@ public List<TaskInfo> getTasks() {
/**
* Convert this task response to XContent grouping by executing nodes.
*/
public XContentBuilder toXContentGroupedByNode(XContentBuilder builder, Params params, DiscoveryNodes discoveryNodes)
throws IOException {
toXContentCommon(builder, params);
builder.startObject("nodes");
for (Map.Entry<String, List<TaskInfo>> entry : getPerNodeTasks().entrySet()) {
DiscoveryNode node = discoveryNodes.get(entry.getKey());
builder.startObject(entry.getKey());
if (node != null) {
// If the node is no longer part of the cluster, oh well, we'll just skip it's useful information.
builder.field("name", node.getName());
builder.field("transport_address", node.getAddress().toString());
builder.field("host", node.getHostName());
builder.field("ip", node.getAddress());

builder.startArray("roles");
for (DiscoveryNodeRole role : node.getRoles()) {
builder.value(role.roleName());
}
builder.endArray();
public ChunkedToXContent groupedByNode(Supplier<DiscoveryNodes> nodesInCluster) {
return ignored -> {
final var discoveryNodes = nodesInCluster.get();
return Iterators.concat(Iterators.single((builder, params) -> {
builder.startObject();
toXContentCommon(builder, params);
builder.startObject("nodes");
return builder;
}), getPerNodeTasks().entrySet().stream().flatMap(entry -> {
DiscoveryNode node = discoveryNodes.get(entry.getKey());
return Stream.<Stream<ToXContent>>of(Stream.of((builder, params) -> {
builder.startObject(entry.getKey());
if (node != null) {
// If the node is no longer part of the cluster, oh well, we'll just skip its useful information.
builder.field("name", node.getName());
builder.field("transport_address", node.getAddress().toString());
builder.field("host", node.getHostName());
builder.field("ip", node.getAddress());

builder.startArray("roles");
for (DiscoveryNodeRole role : node.getRoles()) {
builder.value(role.roleName());
}
builder.endArray();

if (node.getAttributes().isEmpty() == false) {
builder.startObject("attributes");
for (Map.Entry<String, String> attrEntry : node.getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
if (node.getAttributes().isEmpty() == false) {
builder.startObject("attributes");
for (Map.Entry<String, String> attrEntry : node.getAttributes().entrySet()) {
builder.field(attrEntry.getKey(), attrEntry.getValue());
}
builder.endObject();
}
}
builder.startObject(TASKS);
return builder;
}), entry.getValue().stream().<ToXContent>map(task -> (builder, params) -> {
builder.startObject(task.taskId().toString());
task.toXContent(builder, params);
builder.endObject();
}
}
builder.startObject(TASKS);
for (TaskInfo task : entry.getValue()) {
builder.startObject(task.taskId().toString());
task.toXContent(builder, params);
return builder;
}), Stream.of((builder, params) -> {
builder.endObject();
builder.endObject();
return builder;
})).flatMap(Function.identity());
}).iterator(), Iterators.single((builder, params) -> {
builder.endObject();
}
builder.endObject();
builder.endObject();
}
builder.endObject();
return builder;
builder.endObject();
return builder;
}));
};
}

/**
* Convert this response to XContent grouping by parent tasks.
*/
public XContentBuilder toXContentGroupedByParents(XContentBuilder builder, Params params) throws IOException {
toXContentCommon(builder, params);
builder.startObject(TASKS);
for (TaskGroup group : getTaskGroups()) {
public ChunkedToXContent groupedByParent() {
return ignored -> Iterators.concat(Iterators.single((builder, params) -> {
builder.startObject();
toXContentCommon(builder, params);
builder.startObject(TASKS);
return builder;
}), getTaskGroups().stream().<ToXContent>map(group -> (builder, params) -> {
builder.field(group.taskInfo().taskId().toString());
group.toXContent(builder, params);
}
builder.endObject();
return builder;
return builder;
}).iterator(), Iterators.single((builder, params) -> {
builder.endObject();
builder.endObject();
return builder;
}));
}

/**
* Presents a flat list of tasks
*/
public XContentBuilder toXContentGroupedByNone(XContentBuilder builder, Params params) throws IOException {
toXContentCommon(builder, params);
builder.startArray(TASKS);
for (TaskInfo taskInfo : getTasks()) {
public ChunkedToXContent groupedByNone() {
return ignored -> Iterators.concat(Iterators.single((builder, params) -> {
builder.startObject();
toXContentCommon(builder, params);
builder.startArray(TASKS);
return builder;
}), getTasks().stream().<ToXContent>map(taskInfo -> (builder, params) -> {
builder.startObject();
taskInfo.toXContent(builder, params);
builder.endObject();
}
builder.endArray();
return builder;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
toXContentGroupedByNone(builder, params);
builder.endObject();
return builder;
return builder;
}).iterator(), Iterators.single((builder, params) -> {
builder.endArray();
builder.endObject();
return builder;
}));
}

public static ListTasksResponse fromXContent(XContentParser parser) {
Expand All @@ -239,6 +258,7 @@ public static ListTasksResponse fromXContent(XContentParser parser) {

@Override
public String toString() {
return Strings.toString(this, true, true);
return Strings.toString(ChunkedToXContent.wrapAsXContentObject(groupedByNone()), true, true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -80,33 +76,17 @@ public static ListTasksRequest generateListTasksRequest(RestRequest request) {
public static <T extends ListTasksResponse> ActionListener<T> listTasksResponseListener(
Supplier<DiscoveryNodes> nodesInCluster,
String groupBy,
final RestChannel channel
RestChannel channel
) {
if ("nodes".equals(groupBy)) {
return new RestBuilderListener<T>(channel) {
@Override
public RestResponse buildResponse(T response, XContentBuilder builder) throws Exception {
builder.startObject();
response.toXContentGroupedByNode(builder, channel.request(), nodesInCluster.get());
builder.endObject();
return new RestResponse(RestStatus.OK, builder);
}
};
} else if ("parents".equals(groupBy)) {
return new RestBuilderListener<T>(channel) {
@Override
public RestResponse buildResponse(T response, XContentBuilder builder) throws Exception {
builder.startObject();
response.toXContentGroupedByParents(builder, channel.request());
builder.endObject();
return new RestResponse(RestStatus.OK, builder);
}
};
} else if ("none".equals(groupBy)) {
return new RestToXContentListener<>(channel);
} else {
throw new IllegalArgumentException("[group_by] must be one of [nodes], [parents] or [none] but was [" + groupBy + "]");
}
final var listener = new RestChunkedToXContentListener<>(channel);
return switch (groupBy) {
case "nodes" -> listener.map(response -> response.groupedByNode(nodesInCluster));
case "parents" -> listener.map(response -> response.groupedByParent());
case "none" -> listener.map(response -> response.groupedByNone());
default -> throw new IllegalArgumentException(
"[group_by] must be one of [nodes], [parents] or [none] but was [" + groupBy + "]"
);
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -889,14 +890,12 @@ public void testTasksToXContentGrouping() throws Exception {

private Map<String, Object> serialize(ListTasksResponse response, boolean byParents) throws IOException {
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
builder.startObject();
if (byParents) {
DiscoveryNodes nodes = testNodes[0].clusterService.state().nodes();
response.toXContentGroupedByNode(builder, ToXContent.EMPTY_PARAMS, nodes);
ChunkedToXContent.wrapAsXContentObject(response.groupedByNode(() -> nodes)).toXContent(builder, ToXContent.EMPTY_PARAMS);
} else {
response.toXContentGroupedByParents(builder, ToXContent.EMPTY_PARAMS);
ChunkedToXContent.wrapAsXContentObject(response.groupedByParent()).toXContent(builder, ToXContent.EMPTY_PARAMS);
}
builder.endObject();
builder.flush();
logger.info(Strings.toString(builder));
return XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2();
Expand Down
Loading