Skip to content

[Transform] implement retention policy to delete data from a transform #67832

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Feb 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9118761
create config classes for retention policy configuration
Jan 20, 2021
efd718c
apply spotless
Jan 21, 2021
a732dfd
fix license headers
Jan 21, 2021
35e7c9a
add missing headers
Jan 22, 2021
04bc8c8
checkstyle
Jan 22, 2021
59acf31
fix compile error
Jan 22, 2021
4daf884
more compile errors
Jan 22, 2021
ea5e638
fix test randomization
Jan 25, 2021
4923da4
implement delete by query based on retention policy
Jan 26, 2021
156b7af
add file header
Jan 26, 2021
de13212
refresh the destination index before executing the retention policy
Jan 28, 2021
3b5ded7
add validation hook
Jan 28, 2021
c1aa7e7
checkstyle
Jan 28, 2021
f2407b8
add some basic validation
Jan 29, 2021
68d84f3
checkstyle
Feb 1, 2021
0c6732e
remove transform dependency from async search tests
Feb 1, 2021
eb37fbe
validate retention config
Feb 1, 2021
7101dad
delete accidentaly added file
Feb 1, 2021
4ed9bb5
add counters for delete to stats
Feb 2, 2021
e7ea5e7
fix test compile errors
Feb 2, 2021
f2d0db8
rename indexer tests
Feb 3, 2021
91fc37f
add test to ensure retention policy gets executed
Feb 3, 2021
ad233b5
add tests for retention policy to DBQ conversion
Feb 3, 2021
3880d88
update license headers
Feb 3, 2021
7e63ad3
make retention policy updateable
Feb 3, 2021
32af5fe
connect stats to measure deletes
Feb 3, 2021
01b7769
spotless
Feb 3, 2021
ed3f242
remove unnecessary new line
Feb 4, 2021
c653b76
use declareNamedObject
Feb 5, 2021
9d6389a
add delete by query paranoia
Feb 5, 2021
94878ab
wrap logger.trace as parameterized message
Feb 5, 2021
9a4bdf6
apply review comments
Feb 5, 2021
dac1df1
remove unnecessary if clause
Feb 5, 2021
e3980e9
add tests for retention policy classes
Feb 8, 2021
79a0d0a
limit refresh to the destination index
Feb 8, 2021
b03b9e6
add new stats to cat and documentation
Feb 8, 2021
69aec68
fix merge conflict
Feb 8, 2021
2b5c051
fail (with retry) if delete by query fails or returns with unexpected…
Feb 8, 2021
90681b3
fix code formating
Feb 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

package org.elasticsearch.client.transform;

import org.elasticsearch.client.transform.transforms.RetentionPolicyConfig;
import org.elasticsearch.client.transform.transforms.SyncConfig;
import org.elasticsearch.client.transform.transforms.TimeRetentionPolicyConfig;
import org.elasticsearch.client.transform.transforms.TimeSyncConfig;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -22,9 +24,13 @@ public class TransformNamedXContentProvider implements NamedXContentProvider {
@Override
public List<NamedXContentRegistry.Entry> getNamedXContentParsers() {
return Arrays.asList(
new NamedXContentRegistry.Entry(SyncConfig.class,
new ParseField(TimeSyncConfig.NAME),
TimeSyncConfig::fromXContent));
new NamedXContentRegistry.Entry(SyncConfig.class, new ParseField(TimeSyncConfig.NAME), TimeSyncConfig::fromXContent),
new NamedXContentRegistry.Entry(
RetentionPolicyConfig.class,
new ParseField(TimeRetentionPolicyConfig.NAME),
TimeRetentionPolicyConfig::fromXContent
)
);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.client.transform.transforms;

import org.elasticsearch.common.xcontent.ToXContentObject;

public interface RetentionPolicyConfig extends ToXContentObject {

/**
* Returns the name of the writeable object
*/
String getName();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.client.transform.transforms;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;

public class TimeRetentionPolicyConfig implements RetentionPolicyConfig {

public static final String NAME = "time";

private static final ParseField FIELD = new ParseField("field");
private static final ParseField MAX_AGE = new ParseField("max_age");

private final String field;
private final TimeValue maxAge;

private static final ConstructingObjectParser<TimeRetentionPolicyConfig, Void> PARSER = new ConstructingObjectParser<>(
"time_retention_policy_config",
true,
args -> new TimeRetentionPolicyConfig((String) args[0], args[1] != null ? (TimeValue) args[1] : TimeValue.ZERO)
);

static {
PARSER.declareString(constructorArg(), FIELD);
PARSER.declareField(
constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_AGE.getPreferredName()),
MAX_AGE,
ObjectParser.ValueType.STRING
);
}

public static TimeRetentionPolicyConfig fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

public TimeRetentionPolicyConfig(String field, TimeValue maxAge) {
this.field = field;
this.maxAge = maxAge;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(FIELD.getPreferredName(), field);
builder.field(MAX_AGE.getPreferredName(), maxAge.getStringRep());
builder.endObject();
return builder;
}

public String getField() {
return field;
}

public TimeValue getMaxAge() {
return maxAge;
}

@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}

if (other == null || getClass() != other.getClass()) {
return false;
}

final TimeRetentionPolicyConfig that = (TimeRetentionPolicyConfig) other;

return Objects.equals(this.field, that.field) && Objects.equals(this.maxAge, that.maxAge);
}

@Override
public int hashCode() {
return Objects.hash(field, maxAge);
}

@Override
public String getName() {
return NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParserUtils;

import java.io.IOException;
import java.time.Instant;
Expand All @@ -41,6 +40,7 @@ public class TransformConfig implements ToXContentObject {
public static final ParseField SETTINGS = new ParseField("settings");
public static final ParseField VERSION = new ParseField("version");
public static final ParseField CREATE_TIME = new ParseField("create_time");
public static final ParseField RETENTION_POLICY = new ParseField("retention_policy");
// types of transforms
public static final ParseField PIVOT_TRANSFORM = new ParseField("pivot");
public static final ParseField LATEST_TRANSFORM = new ParseField("latest");
Expand All @@ -54,6 +54,7 @@ public class TransformConfig implements ToXContentObject {
private final PivotConfig pivotConfig;
private final LatestConfig latestConfig;
private final String description;
private final RetentionPolicyConfig retentionPolicyConfig;
private final Version transformVersion;
private final Instant createTime;

Expand All @@ -70,8 +71,9 @@ public class TransformConfig implements ToXContentObject {
LatestConfig latestConfig = (LatestConfig) args[6];
String description = (String) args[7];
SettingsConfig settings = (SettingsConfig) args[8];
Instant createTime = (Instant) args[9];
String transformVersion = (String) args[10];
RetentionPolicyConfig retentionPolicyConfig = (RetentionPolicyConfig) args[9];
Instant createTime = (Instant) args[10];
String transformVersion = (String) args[11];
return new TransformConfig(
id,
source,
Expand All @@ -82,6 +84,7 @@ public class TransformConfig implements ToXContentObject {
latestConfig,
description,
settings,
retentionPolicyConfig,
createTime,
transformVersion
);
Expand All @@ -98,11 +101,16 @@ public class TransformConfig implements ToXContentObject {
FREQUENCY,
ObjectParser.ValueType.STRING
);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p), SYNC);
PARSER.declareNamedObject(optionalConstructorArg(), (p, c, n) -> p.namedObject(SyncConfig.class, n, c), SYNC);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> LatestConfig.fromXContent(p), LATEST_TRANSFORM);
PARSER.declareString(optionalConstructorArg(), DESCRIPTION);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> SettingsConfig.fromXContent(p), SETTINGS);
PARSER.declareNamedObject(
optionalConstructorArg(),
(p, c, n) -> p.namedObject(RetentionPolicyConfig.class, n, c),
RETENTION_POLICY
);
PARSER.declareField(
optionalConstructorArg(),
p -> TimeUtil.parseTimeFieldToInstant(p, CREATE_TIME.getPreferredName()),
Expand All @@ -112,14 +120,6 @@ public class TransformConfig implements ToXContentObject {
PARSER.declareString(optionalConstructorArg(), VERSION);
}

private static SyncConfig parseSyncConfig(XContentParser parser) throws IOException {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser);
SyncConfig syncConfig = parser.namedObject(SyncConfig.class, parser.currentName(), true);
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.nextToken(), parser);
return syncConfig;
}

public static TransformConfig fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}
Expand All @@ -136,7 +136,7 @@ public static TransformConfig fromXContent(final XContentParser parser) {
* @return A TransformConfig to preview, NOTE it will have a {@code null} id, destination and index.
*/
public static TransformConfig forPreview(final SourceConfig source, final PivotConfig pivotConfig) {
return new TransformConfig(null, source, null, null, null, pivotConfig, null, null, null, null, null);
return new TransformConfig(null, source, null, null, null, pivotConfig, null, null, null, null, null, null);
}

/**
Expand All @@ -151,7 +151,7 @@ public static TransformConfig forPreview(final SourceConfig source, final PivotC
* @return A TransformConfig to preview, NOTE it will have a {@code null} id, destination and index.
*/
public static TransformConfig forPreview(final SourceConfig source, final LatestConfig latestConfig) {
return new TransformConfig(null, source, null, null, null, null, latestConfig, null, null, null, null);
return new TransformConfig(null, source, null, null, null, null, latestConfig, null, null, null, null, null);
}

TransformConfig(
Expand All @@ -164,6 +164,7 @@ public static TransformConfig forPreview(final SourceConfig source, final Latest
final LatestConfig latestConfig,
final String description,
final SettingsConfig settings,
final RetentionPolicyConfig retentionPolicyConfig,
final Instant createTime,
final String version
) {
Expand All @@ -176,6 +177,7 @@ public static TransformConfig forPreview(final SourceConfig source, final Latest
this.latestConfig = latestConfig;
this.description = description;
this.settings = settings;
this.retentionPolicyConfig = retentionPolicyConfig;
this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());
this.transformVersion = version == null ? null : Version.fromString(version);
}
Expand Down Expand Up @@ -226,6 +228,11 @@ public SettingsConfig getSettings() {
return settings;
}

@Nullable
public RetentionPolicyConfig getRetentionPolicyConfig() {
return retentionPolicyConfig;
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
Expand Down Expand Up @@ -258,6 +265,11 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
if (settings != null) {
builder.field(SETTINGS.getPreferredName(), settings);
}
if (retentionPolicyConfig != null) {
builder.startObject(RETENTION_POLICY.getPreferredName());
builder.field(retentionPolicyConfig.getName(), retentionPolicyConfig);
builder.endObject();
}
if (createTime != null) {
builder.timeField(CREATE_TIME.getPreferredName(), CREATE_TIME.getPreferredName() + "_string", createTime.toEpochMilli());
}
Expand Down Expand Up @@ -290,13 +302,26 @@ public boolean equals(Object other) {
&& Objects.equals(this.settings, that.settings)
&& Objects.equals(this.createTime, that.createTime)
&& Objects.equals(this.pivotConfig, that.pivotConfig)
&& Objects.equals(this.latestConfig, that.latestConfig);
&& Objects.equals(this.latestConfig, that.latestConfig)
&& Objects.equals(this.retentionPolicyConfig, that.retentionPolicyConfig);
}

@Override
public int hashCode() {
return Objects.hash(
id, source, dest, frequency, syncConfig, settings, createTime, transformVersion, pivotConfig, latestConfig, description);
id,
source,
dest,
frequency,
syncConfig,
settings,
createTime,
transformVersion,
pivotConfig,
latestConfig,
description,
retentionPolicyConfig
);
}

@Override
Expand All @@ -319,6 +344,7 @@ public static class Builder {
private LatestConfig latestConfig;
private SettingsConfig settings;
private String description;
private RetentionPolicyConfig retentionPolicyConfig;

public Builder setId(String id) {
this.id = id;
Expand Down Expand Up @@ -365,9 +391,26 @@ public Builder setSettings(SettingsConfig settings) {
return this;
}

public Builder setRetentionPolicyConfig(RetentionPolicyConfig retentionPolicyConfig) {
this.retentionPolicyConfig = retentionPolicyConfig;
return this;
}

public TransformConfig build() {
return new TransformConfig(
id, source, dest, frequency, syncConfig, pivotConfig, latestConfig, description, settings, null, null);
id,
source,
dest,
frequency,
syncConfig,
pivotConfig,
latestConfig,
description,
settings,
retentionPolicyConfig,
null,
null
);
}
}
}
Loading