Skip to content

[WIP] Add DownSampleAction to downsampling time_series index #83771

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{
"rollup.downsample":{
"documentation":{
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/xpack-rollup.html",
"description":"Downsample an index"
},
"stability":"experimental",
"visibility":"public",
"headers":{
"accept": [ "application/json"],
"content_type": ["application/json"]
},
"url": {
"paths": [
{
"path": "/{index}/_downsample/{downsample_index}",
"methods": [
"POST"
],
"parts": {
"index": {
"type": "string",
"description": "The index to down sample",
"required": true
},
"downsample_index": {
"type": "string",
"description": "The name of the down sample index to create",
"required": true
}
}
}
]
},
"params":{},
"body":{
"description":"The downsample configuration",
"required":true
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.downsample;

import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser.ValueType;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;

import java.io.IOException;
import java.time.ZoneId;
import java.util.Objects;

import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class DownsampleDateHistogramConfig implements Writeable, ToXContentObject {
static final String NAME = "date_histogram";
public static final String FIXED_INTERVAL = "fixed_interval";
public static final String CALENDAR_INTERVAL = "calendar_interval";
public static final String TIME_ZONE = "time_zone";
public static final String DEFAULT_TIMEZONE = ZoneId.of("UTC").getId();

private static final ConstructingObjectParser<DownsampleDateHistogramConfig, Void> PARSER;
static {
PARSER = new ConstructingObjectParser<>(
NAME,
a -> new DownsampleDateHistogramConfig((DateHistogramInterval) a[0], (DateHistogramInterval) a[1], (String) a[2])
);
PARSER.declareField(
optionalConstructorArg(),
p -> new DateHistogramInterval(p.text()),
new ParseField(CALENDAR_INTERVAL),
ValueType.STRING
);
PARSER.declareField(
optionalConstructorArg(),
p -> new DateHistogramInterval(p.text()),
new ParseField(FIXED_INTERVAL),
ValueType.STRING
);
PARSER.declareString(optionalConstructorArg(), new ParseField(TIME_ZONE));
}

private final DateHistogramInterval calendarInterval;
private final DateHistogramInterval fixedInterval;
private final String timeZone;

public DownsampleDateHistogramConfig(
final @Nullable DateHistogramInterval calendarInterval,
final @Nullable DateHistogramInterval fixedInterval,
final @Nullable String timeZone
) {
this.calendarInterval = calendarInterval;
this.fixedInterval = fixedInterval;
this.timeZone = (timeZone != null && timeZone.isEmpty() == false) ? timeZone : DEFAULT_TIMEZONE;
}

public DownsampleDateHistogramConfig(final StreamInput in) throws IOException {
this.calendarInterval = new DateHistogramInterval(in);
this.fixedInterval = new DateHistogramInterval(in);
this.timeZone = in.readString();
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeOptionalWriteable(calendarInterval);
out.writeOptionalWriteable(fixedInterval);
out.writeString(timeZone);
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
{
builder.field(CALENDAR_INTERVAL, calendarInterval);
builder.field(FIXED_INTERVAL, fixedInterval);
builder.field(TIME_ZONE, timeZone);
}
return builder.endObject();
}

public static DownsampleDateHistogramConfig fromXContent(final XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

public DateHistogramInterval getCalendarInterval() {
return calendarInterval;
}

public DateHistogramInterval getFixedInterval() {
return fixedInterval;
}

public String getTimeZone() {
return timeZone;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DownsampleDateHistogramConfig that = (DownsampleDateHistogramConfig) o;
return Objects.equals(calendarInterval, that.calendarInterval)
&& Objects.equals(fixedInterval, that.fixedInterval)
&& Objects.equals(timeZone, that.timeZone);
}

@Override
public int hashCode() {
return Objects.hash(calendarInterval, fixedInterval, timeZone);
}

@Override
public String toString() {
return Strings.toString(this, true, true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.downsample.action;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.downsample.DownsampleDateHistogramConfig;

import java.io.IOException;
import java.util.Objects;

public class DownSampleAction extends ActionType<AcknowledgedResponse> {
public static final DownSampleAction INSTANCE = new DownSampleAction();
public static final String NAME = "indices:admin/xpack/downsample";

private DownSampleAction() {
super(NAME, AcknowledgedResponse::readFrom);
}

public static class Request extends MasterNodeRequest<DownSampleAction.Request> implements IndicesRequest, ToXContentObject {
private String sourceIndex;
private String downsampleIndex;
private DownsampleDateHistogramConfig downsampleConfig;

public Request(String sourceIndex, String downsampleIndex, DownsampleDateHistogramConfig downsampleConfig) {
this.sourceIndex = sourceIndex;
this.downsampleIndex = downsampleIndex;
this.downsampleConfig = downsampleConfig;
}

public Request() {}

public Request(StreamInput in) throws IOException {
super(in);
sourceIndex = in.readString();
downsampleIndex = in.readString();
downsampleConfig = new DownsampleDateHistogramConfig(in);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("source_index", sourceIndex);
builder.field("downsample_index", downsampleIndex);
downsampleConfig.toXContent(builder, params);
builder.endObject();
return builder;
}

@Override
public String[] indices() {
return new String[] { sourceIndex };
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED;
}

public String getSourceIndex() {
return sourceIndex;
}

public String getDownsampleIndex() {
return downsampleIndex;
}

public DownsampleDateHistogramConfig getDownsampleConfig() {
return downsampleConfig;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Request request = (Request) o;
return Objects.equals(sourceIndex, request.sourceIndex)
&& Objects.equals(downsampleIndex, request.downsampleIndex)
&& Objects.equals(downsampleConfig, request.downsampleConfig);
}

@Override
public int hashCode() {
return Objects.hash(sourceIndex, downsampleIndex, downsampleConfig);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
import org.elasticsearch.xpack.core.downsample.action.DownSampleAction;
import org.elasticsearch.xpack.core.rollup.RollupField;
import org.elasticsearch.xpack.core.rollup.action.DeleteRollupJobAction;
import org.elasticsearch.xpack.core.rollup.action.GetRollupCapsAction;
Expand Down Expand Up @@ -66,7 +67,9 @@
import org.elasticsearch.xpack.rollup.rest.RestRollupSearchAction;
import org.elasticsearch.xpack.rollup.rest.RestStartRollupJobAction;
import org.elasticsearch.xpack.rollup.rest.RestStopRollupJobAction;
import org.elasticsearch.xpack.rollup.v2.RestDownSampleAction;
import org.elasticsearch.xpack.rollup.v2.RestRollupAction;
import org.elasticsearch.xpack.rollup.v2.TransportDownSampleAction;
import org.elasticsearch.xpack.rollup.v2.TransportRollupAction;
import org.elasticsearch.xpack.rollup.v2.TransportRollupIndexerAction;

Expand Down Expand Up @@ -142,6 +145,7 @@ public List<RestHandler> getRestHandlers(

if (RollupV2.isEnabled()) {
handlers.add(new RestRollupAction());
handlers.add(new RestDownSampleAction());
}

return handlers;
Expand All @@ -167,6 +171,7 @@ public List<RestHandler> getRestHandlers(
if (RollupV2.isEnabled()) {
actions.add(new ActionHandler<>(RollupIndexerAction.INSTANCE, TransportRollupIndexerAction.class));
actions.add(new ActionHandler<>(RollupAction.INSTANCE, TransportRollupAction.class));
actions.add(new ActionHandler<>(DownSampleAction.INSTANCE, TransportDownSampleAction.class));
}

return actions;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.rollup.v2;

import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.downsample.DownsampleDateHistogramConfig;
import org.elasticsearch.xpack.core.downsample.action.DownSampleAction;

import java.io.IOException;
import java.util.List;

import static org.elasticsearch.rest.RestRequest.Method.POST;

public class RestDownSampleAction extends BaseRestHandler {
@Override
public List<Route> routes() {
return List.of(new Route(POST, "/{index}/_downsample/{downsample_index}"));
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String index = restRequest.param("index");
String downsampleIndex = restRequest.param("downsample_index");
DownsampleDateHistogramConfig config = DownsampleDateHistogramConfig.fromXContent(restRequest.contentParser());
DownSampleAction.Request request = new DownSampleAction.Request(index, downsampleIndex, config);
return channel -> client.execute(DownSampleAction.INSTANCE, request, new RestToXContentListener<>(channel));
}

@Override
public String getName() {
return "downsample_action";
}

}
Loading