Skip to content

[BACKPORT 7.x] Add data frame feature (#38934) #39029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1697,7 +1697,7 @@ public void testCRUDIndexTemplateWithTypes() throws Exception {
assertTrue(template2.mappings().containsKey("custom_doc_type"));

List<String> names = randomBoolean()
? Arrays.asList("*-1", "template-2")
? Arrays.asList("*plate-1", "template-2")
: Arrays.asList("template-*");
GetIndexTemplatesRequest getBothRequest = new GetIndexTemplatesRequest(names);
org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse getBoth = execute(
Expand Down Expand Up @@ -1780,7 +1780,7 @@ public void testCRUDIndexTemplate() throws Exception {


List<String> names = randomBoolean()
? Arrays.asList("*-1", "template-2")
? Arrays.asList("*plate-1", "template-2")
: Arrays.asList("template-*");
GetIndexTemplatesRequest getBothRequest = new GetIndexTemplatesRequest(names);
GetIndexTemplatesResponse getBoth = execute(
Expand Down Expand Up @@ -1834,7 +1834,7 @@ public void testIndexTemplatesExist() throws Exception {

{
final List<String> templateNames = randomBoolean()
? Arrays.asList("*-1", "template-2")
? Arrays.asList("*plate-1", "template-2")
: Arrays.asList("template-*");

final IndexTemplatesExistRequest bothRequest = new IndexTemplatesExistRequest(templateNames);
Expand Down
5 changes: 5 additions & 0 deletions docs/reference/rest-api/info.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ Example response:
"available" : true,
"enabled" : true
},
"data_frame" : {
"description" : "Data Frame for the Elastic Stack",
"available" : true,
"enabled" : true
},
"graph" : {
"description" : "Graph Data Exploration for the Elastic Stack",
"available" : true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,15 @@ public static boolean isMachineLearningAllowedForOperationMode(final OperationMo
return isPlatinumOrTrialOperationMode(operationMode);
}

/**
* Data Frame is always available as long as there is a valid license
*
* @return true if the license is active
*/
public synchronized boolean isDataFrameAllowed() {
return status.active;
}

/**
* Rollup is always available as long as there is a valid license
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public final class ClientHelper {
public static final String DEPRECATION_ORIGIN = "deprecation";
public static final String PERSISTENT_TASK_ORIGIN = "persistent_tasks";
public static final String ROLLUP_ORIGIN = "rollup";
public static final String DATA_FRAME_ORIGIN = "data_frame";

private ClientHelper() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.xpack.core.beats.BeatsFeatureSetUsage;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.CCRFeatureSet;
import org.elasticsearch.xpack.core.dataframe.DataFrameFeatureSetUsage;
import org.elasticsearch.xpack.core.deprecation.DeprecationInfoAction;
import org.elasticsearch.xpack.core.graph.GraphFeatureSetUsage;
import org.elasticsearch.xpack.core.graph.action.GraphExploreAction;
Expand Down Expand Up @@ -439,8 +440,9 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new NamedWriteableRegistry.Entry(LifecycleAction.class, DeleteAction.NAME, DeleteAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, FreezeAction.NAME, FreezeAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, SetPriorityAction.NAME, SetPriorityAction::new),
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new)
);
new NamedWriteableRegistry.Entry(LifecycleAction.class, UnfollowAction.NAME, UnfollowAction::new),
// Data Frame
new NamedWriteableRegistry.Entry(XPackFeatureSet.Usage.class, XPackField.DATA_FRAME, DataFrameFeatureSetUsage::new));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public final class XPackField {
public static final String INDEX_LIFECYCLE = "ilm";
/** Name constant for the CCR feature. */
public static final String CCR = "ccr";
/** Name constant for the data frame feature. */
public static final String DATA_FRAME = "data_frame";

private XPackField() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ private XPackSettings() {
*/
public static final Setting<Boolean> CCR_ENABLED_SETTING = Setting.boolSetting("xpack.ccr.enabled", true, Property.NodeScope);

/** Setting for enabling or disabling data frame. Defaults to true. */
public static final Setting<Boolean> DATA_FRAME_ENABLED = Setting.boolSetting("xpack.data_frame.enabled", true,
Setting.Property.NodeScope);

/** Setting for enabling or disabling security. Defaults to true. */
public static final Setting<Boolean> SECURITY_ENABLED = Setting.boolSetting("xpack.security.enabled", true, Setting.Property.NodeScope);

Expand Down Expand Up @@ -209,6 +213,7 @@ public static List<Setting<?>> getAllSettings() {
settings.add(ROLLUP_ENABLED);
settings.add(PASSWORD_HASHING_ALGORITHM);
settings.add(INDEX_LIFECYCLE_ENABLED);
settings.add(DATA_FRAME_ENABLED);
return Collections.unmodifiableList(settings);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe;

import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.XPackFeatureSet.Usage;
import org.elasticsearch.xpack.core.XPackField;
import org.elasticsearch.xpack.core.dataframe.transform.DataFrameIndexerTransformStats;

import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;

public class DataFrameFeatureSetUsage extends Usage {

private final Map<String, Long> transformCountByState;
private final DataFrameIndexerTransformStats accumulatedStats;

public DataFrameFeatureSetUsage(StreamInput in) throws IOException {
super(in);
this.transformCountByState = in.readMap(StreamInput::readString, StreamInput::readLong);
this.accumulatedStats = new DataFrameIndexerTransformStats(in);
}

public DataFrameFeatureSetUsage(boolean available, boolean enabled, Map<String, Long> transformCountByState,
DataFrameIndexerTransformStats accumulatedStats) {
super(XPackField.DATA_FRAME, available, enabled);
this.transformCountByState = Objects.requireNonNull(transformCountByState);
this.accumulatedStats = Objects.requireNonNull(accumulatedStats);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(transformCountByState, StreamOutput::writeString, StreamOutput::writeLong);
accumulatedStats.writeTo(out);
}

@Override
protected void innerXContent(XContentBuilder builder, Params params) throws IOException {
super.innerXContent(builder, params);
if (transformCountByState.isEmpty() == false) {
builder.startObject(DataFrameField.TRANSFORMS.getPreferredName());
long all = 0L;
for (Entry<String, Long> entry : transformCountByState.entrySet()) {
builder.field(entry.getKey(), entry.getValue());
all+=entry.getValue();
}
builder.field(MetaData.ALL, all);
builder.endObject();

// if there are no transforms, do not show any stats
builder.field(DataFrameField.STATS_FIELD.getPreferredName(), accumulatedStats);
}
}

@Override
public int hashCode() {
return Objects.hash(enabled, available, transformCountByState, accumulatedStats);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
DataFrameFeatureSetUsage other = (DataFrameFeatureSetUsage) obj;
return Objects.equals(name, other.name) && available == other.available && enabled == other.enabled
&& Objects.equals(transformCountByState, other.transformCountByState)
&& Objects.equals(accumulatedStats, other.accumulatedStats);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe;

import org.elasticsearch.common.ParseField;

/*
* Utility class to hold common fields and strings for data frame.
*/
public final class DataFrameField {

// common parse fields
public static final ParseField AGGREGATIONS = new ParseField("aggregations");
public static final ParseField AGGS = new ParseField("aggs");
public static final ParseField ID = new ParseField("id");
public static final ParseField TRANSFORMS = new ParseField("transforms");
public static final ParseField COUNT = new ParseField("count");
public static final ParseField GROUP_BY = new ParseField("group_by");
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ParseField WAIT_FOR_COMPLETION = new ParseField("wait_for_completion");
public static final ParseField STATS_FIELD = new ParseField("stats");

// common strings
public static final String TASK_NAME = "data_frame/transforms";
public static final String REST_BASE_PATH = "/_data_frame/";
public static final String REST_BASE_PATH_TRANSFORMS_BY_ID = REST_BASE_PATH + "transforms/{id}/";

// note: this is used to match tasks
public static final String PERSISTENT_TASK_DESCRIPTION_PREFIX = "data_frame_";

// strings for meta information
public static final String META_FIELDNAME = "_data_frame";
public static final String CREATION_DATE_MILLIS = "creation_date_in_millis";
public static final String VERSION = "version";
public static final String CREATED = "created";
public static final String CREATED_BY = "created_by";
public static final String TRANSFORM = "transform";
public static final String DATA_FRAME_SIGNATURE = "data-frame-transform";

private DataFrameField() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.core.dataframe;

import java.text.MessageFormat;
import java.util.Locale;

public class DataFrameMessages {

public static final String REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_TIMEOUT =
"Timed out after [{0}] while waiting for data frame transform [{1}] to stop";
public static final String REST_STOP_TRANSFORM_WAIT_FOR_COMPLETION_INTERRUPT =
"Interrupted while waiting for data frame transform [{0}] to stop";
public static final String REST_PUT_DATA_FRAME_TRANSFORM_EXISTS = "Transform with id [{0}] already exists";
public static final String REST_DATA_FRAME_UNKNOWN_TRANSFORM = "Transform with id [{0}] could not be found";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION =
"Failed to validate data frame configuration";
public static final String REST_PUT_DATA_FRAME_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist data frame configuration";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_TARGET_MAPPINGS = "Failed to deduce target mappings";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_TARGET_INDEX = "Failed to create target index";
public static final String REST_PUT_DATA_FRAME_FAILED_TO_START_PERSISTENT_TASK =
"Failed to start persistent task, configuration has been cleaned up: [{0}]";
public static final String REST_DATA_FRAME_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]";

public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform[{1}]";
public static final String FAILED_TO_LOAD_TRANSFORM_CONFIGURATION =
"Failed to load data frame transform configuration for transform [{0}]";
public static final String FAILED_TO_PARSE_TRANSFORM_CONFIGURATION =
"Failed to parse transform configuration for data frame transform [{0}]";
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_NO_TRANSFORM =
"Data frame transform configuration must specify exactly 1 function";
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_PIVOT_NO_GROUP_BY =
"Data frame pivot transform configuration must specify at least 1 group_by";
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_PIVOT_NO_AGGREGATION =
"Data frame pivot transform configuration must specify at least 1 aggregation";
public static final String DATA_FRAME_TRANSFORM_PIVOT_FAILED_TO_CREATE_COMPOSITE_AGGREGATION =
"Failed to create composite aggregation from pivot function";
public static final String DATA_FRAME_TRANSFORM_CONFIGURATION_INVALID =
"Data frame transform configuration [{0}] has invalid elements";

public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_QUERY =
"Failed to parse query for data frame transform";
public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_GROUP_BY =
"Failed to parse group_by for data frame pivot transform";
public static final String LOG_DATA_FRAME_TRANSFORM_CONFIGURATION_BAD_AGGREGATION =
"Failed to parse aggregation for data frame pivot transform";

private DataFrameMessages() {
}

/**
* Returns the message parameter
*
* @param message Should be one of the statics defined in this class
*/
public static String getMessage(String message) {
return message;
}

/**
* Format the message with the supplied arguments
*
* @param message Should be one of the statics defined in this class
* @param args MessageFormat arguments. See {@linkplain MessageFormat#format(Object)}]
*/
public static String getMessage(String message, Object... args) {
return new MessageFormat(message, Locale.ROOT).format(args);
}
}
Loading