Skip to content

Moved rollup metadata from cluster state inside the datastream metadata #68465

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.cluster.metadata.MetadataMappingService;
import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RollupMetadata;
import org.elasticsearch.cluster.routing.DelayedAllocationService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
Expand Down Expand Up @@ -65,7 +64,6 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.persistent.PersistentTasksNodeService;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.rollup.RollupV2;
import org.elasticsearch.script.ScriptMetadata;
import org.elasticsearch.snapshots.SnapshotsInfoService;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -132,9 +130,6 @@ public static List<Entry> getNamedWriteables() {
ComposableIndexTemplateMetadata::readDiffFrom);
registerMetadataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom);

if (RollupV2.isEnabled()) {
registerMetadataCustom(entries, RollupMetadata.TYPE, RollupMetadata::new, RollupMetadata::readDiffFrom);
}
// Task Status (not Diffable)
entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new));
return entries;
Expand All @@ -159,10 +154,6 @@ public static List<NamedXContentRegistry.Entry> getNamedXWriteables() {
ComposableIndexTemplateMetadata::fromXContent));
entries.add(new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(DataStreamMetadata.TYPE),
DataStreamMetadata::fromXContent));
if (RollupV2.isEnabled()) {
entries.add(new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(RollupMetadata.TYPE),
RollupMetadata::fromXContent));
}
return entries;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,113 +22,79 @@
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* Object representing information about rollup-v2 indices and their respective original-indexes. These objects
* also include information about their capabilities, like which date-intervals and date-timezones they are configured
* with. Used by {@link RollupMetadata}.
* Object representing a group of rollup-v2 indices that have been computed on their respective original-indexes.
* Used by {@link RollupMetadata}. The rollup group is based on a map with the rollup-index name as a key and
* its rollup information object as value.
*
* The information in this class will be used to decide which index within the <code>group</code> will be chosen
* for a specific aggregation. For example, if there are two indices with different intervals (`1h`, `1d`) and
* a date-histogram aggregation request is sent for daily intervals, then the index with the associated `1d` interval
* will be chosen.
*/
public class RollupGroup extends AbstractDiffable<RollupGroup> implements ToXContentObject {
private static final ParseField GROUP_FIELD = new ParseField("group");
private static final ParseField DATE_INTERVAL_FIELD = new ParseField("interval");
private static final ParseField DATE_TIMEZONE_FIELD = new ParseField("timezone");

// the list of indices part of this rollup group
private List<String> group;
// a map from index-name to the date interval used in the associated index
private Map<String, DateHistogramInterval> dateInterval;
// a map from index-name to timezone used in the associated index
private Map<String, WriteableZoneId> dateTimezone;
/** a map from rollup-index name to its rollup configuration */
private final Map<String, RollupIndexMetadata> group;

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<RollupGroup, Void> PARSER =
new ConstructingObjectParser<>("rollup_group", false,
a -> new RollupGroup((List<String>) a[0], (Map<String, DateHistogramInterval>) a[1], (Map<String, WriteableZoneId>) a[2]));
a -> new RollupGroup((Map<String, RollupIndexMetadata>) a[0]));

static {
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), GROUP_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> {
Map<String, DateHistogramInterval> intervalMap = new HashMap<>();

while (p.nextToken() != XContentParser.Token.END_OBJECT) {
String name = p.currentName();
p.nextToken();
String expression = p.text();
intervalMap.put(name, new DateHistogramInterval(expression));
}
return intervalMap;
}, DATE_INTERVAL_FIELD);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> {
Map<String, WriteableZoneId> zoneMap = new HashMap<>();
Map<String, RollupIndexMetadata> rollupGroups = new HashMap<>();
while (p.nextToken() != XContentParser.Token.END_OBJECT) {
String name = p.currentName();
p.nextToken();
String timezone = p.text();
zoneMap.put(name, WriteableZoneId.of(timezone));
rollupGroups.put(name, RollupIndexMetadata.parse(p));
}
return zoneMap;
}, DATE_TIMEZONE_FIELD);
return rollupGroups;
}, GROUP_FIELD);
}

public RollupGroup(List<String> group, Map<String, DateHistogramInterval> dateInterval, Map<String, WriteableZoneId> dateTimezone) {
public RollupGroup(Map<String, RollupIndexMetadata> group) {
this.group = group;
this.dateInterval = dateInterval;
this.dateTimezone = dateTimezone;
}

public RollupGroup() {
this.group = new ArrayList<>();
this.dateInterval = new HashMap<>();
this.dateTimezone = new HashMap<>();
this.group = new HashMap<>();
}

public RollupGroup(StreamInput in) throws IOException {
this.group = in.readStringList();
this.dateInterval = in.readMap(StreamInput::readString, DateHistogramInterval::new);
this.dateTimezone = in.readMap(StreamInput::readString, WriteableZoneId::new);
this.group = in.readMap(StreamInput::readString, RollupIndexMetadata::new);
}


public static RollupGroup fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

public void add(String name, DateHistogramInterval interval, WriteableZoneId timezone) {
group.add(name);
dateInterval.put(name, interval);
dateTimezone.put(name, timezone);
public void add(String name, RollupIndexMetadata rollupIndexMetadata) {
group.put(name, rollupIndexMetadata);
}

public void remove(String name) {
group.remove(name);
dateInterval.remove(name);
dateTimezone.remove(name);
}

public boolean contains(String name) {
return group.contains(name);
return group.containsKey(name);
}

public DateHistogramInterval getDateInterval(String name) {
return dateInterval.get(name);
RollupIndexMetadata rollupIndex = group.get(name);
return rollupIndex != null ? rollupIndex.getDateInterval() : null;
}

public WriteableZoneId getDateTimezone(String name) {
return dateTimezone.get(name);
RollupIndexMetadata rollupIndex = group.get(name);
return rollupIndex != null ? rollupIndex.getDateTimezone() : null;
}

public List<String> getIndices() {
return group;
public Set<String> getIndices() {
return group.keySet();
}

static Diff<RollupGroup> readDiffFrom(StreamInput in) throws IOException {
Expand All @@ -146,18 +112,14 @@ public String toString() {

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringCollection(group);
out.writeMap(dateInterval, StreamOutput::writeString, (stream, val) -> val.writeTo(stream));
out.writeMap(dateTimezone, StreamOutput::writeString, (stream, val) -> val.writeTo(stream));
out.writeMap(group, StreamOutput::writeString, (stream, val) -> val.writeTo(stream));
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder
.startObject()
.field(GROUP_FIELD.getPreferredName(), group)
.field(DATE_INTERVAL_FIELD.getPreferredName(), dateInterval)
.field(DATE_TIMEZONE_FIELD.getPreferredName(), dateTimezone)
.endObject();
}

Expand All @@ -166,13 +128,11 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RollupGroup that = (RollupGroup) o;
return group.equals(that.group) &&
dateInterval.equals(that.dateInterval) &&
dateTimezone.equals(that.dateTimezone);
return Objects.equals(group, that.group);
}

@Override
public int hashCode() {
return Objects.hash(group, dateInterval, dateTimezone);
return Objects.hash(group);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.time.WriteableZoneId;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Metadata about rollup indices that includes information about their capabilities,
* such as which date-intervals and date-timezones they are configured with and what metric aggregations
* do they support.
*
* The information in this class will be used to decide which index within the {@link RollupGroup} will be chosen
* for a specific aggregation. For example, if there are two indices with different intervals (`1h`, `1d`) and
* a date-histogram aggregation request is sent for daily intervals, then the index with the associated `1d` interval
* will be chosen.
*/
public class RollupIndexMetadata extends AbstractDiffable<RollupIndexMetadata> implements ToXContentObject {

private static final ParseField DATE_INTERVAL_FIELD = new ParseField("interval");
private static final ParseField DATE_TIMEZONE_FIELD = new ParseField("timezone");
private static final ParseField METRICS_FIELD = new ParseField("metrics");

// the date interval used for rolling up data
private final DateHistogramInterval dateInterval;
// the timezone used for the date_histogram
private final WriteableZoneId dateTimezone;
// a map from field name to metrics supported by the rollup for this field
private final Map<String, List<String>> metrics;

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<RollupIndexMetadata, Void> PARSER =
new ConstructingObjectParser<>("rollup_index", false,
a -> new RollupIndexMetadata((DateHistogramInterval) a[0], (WriteableZoneId) a[1], (Map<String, List<String>>) a[2]));

static {
PARSER.declareField(optionalConstructorArg(),
p -> new DateHistogramInterval(p.text()), DATE_INTERVAL_FIELD, ObjectParser.ValueType.STRING);
PARSER.declareField(optionalConstructorArg(),
p -> WriteableZoneId.of(p.text()), DATE_TIMEZONE_FIELD, ObjectParser.ValueType.STRING);

PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> {
Map<String, List<String>> metricsMap = new HashMap<>();
while (p.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = p.currentName();
p.nextToken();
List<Object> metricObj = p.list();
List<String> metrics = new ArrayList<>(metricObj.size());
for (Object o: metricObj) {
metrics.add((String) o);
}
metricsMap.put(fieldName, metrics);
}
return metricsMap;
}, METRICS_FIELD);
}

public RollupIndexMetadata(DateHistogramInterval dateInterval,
WriteableZoneId dateTimezone,
Map<String, List<String>> metrics) {
this.dateInterval = dateInterval;
this.dateTimezone = dateTimezone;
this.metrics = metrics;
}

public RollupIndexMetadata(StreamInput in) throws IOException {
this.dateInterval = new DateHistogramInterval(in);
this.dateTimezone = new WriteableZoneId(in);
this.metrics = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
dateInterval.writeTo(out);
dateTimezone.writeTo(out);
out.writeMapOfLists(metrics, StreamOutput::writeString, StreamOutput::writeString);
}

public static RollupIndexMetadata parse(XContentParser parser) {
return PARSER.apply(parser, null);
}

static RollupIndexMetadata fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

public DateHistogramInterval getDateInterval() {
return dateInterval;
}

public WriteableZoneId getDateTimezone() {
return dateTimezone;
}

public Map<String, List<String>> getMetrics() {
return metrics;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder
.startObject()
.field(DATE_INTERVAL_FIELD.getPreferredName(), dateInterval)
.field(DATE_TIMEZONE_FIELD.getPreferredName(), dateTimezone)
.field(METRICS_FIELD.getPreferredName(), metrics)
.endObject();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RollupIndexMetadata that = (RollupIndexMetadata) o;
return dateInterval.equals(that.dateInterval) &&
dateTimezone.equals(that.dateTimezone) &&
Objects.equals(this.metrics, that.metrics);
}

@Override
public int hashCode() {
return Objects.hash(dateInterval, dateTimezone, metrics);
}

}
Loading