Skip to content

Script: Reindex & UpdateByQuery Metadata #88665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
01e89dd
Script: Reindex & UpdateByQuery Metadata
stu-elastic Jul 20, 2022
3787226
fix typo receiver
stu-elastic Jul 20, 2022
3c24b3e
add _now to ReindexMetadata and UdpateByQueryMetadata javadoc
stu-elastic Jul 20, 2022
74a49c2
Add comment about version in TransportUpdateByQueryAction
stu-elastic Jul 20, 2022
c19b552
Validate source
stu-elastic Jul 21, 2022
77a4694
Merge branch 'master' of github.com:elastic/elasticsearch into 220720…
stu-elastic Jul 21, 2022
d8aea8d
Avoid double sourcing
stu-elastic Jul 21, 2022
f448639
spotless
stu-elastic Jul 21, 2022
70b8360
Make v7yamlRest tests work
stu-elastic Jul 21, 2022
3da6a36
Fix skips, IllegalArgumentException for junk
stu-elastic Jul 21, 2022
5ee360d
Fix error message for invalid keys
stu-elastic Jul 21, 2022
93d708a
Catch error in Set unsupported operation type - 8.4
stu-elastic Jul 21, 2022
d93bc4b
Update docs/changelog/88665.yaml
stu-elastic Jul 22, 2022
a7d30ca
Remove dupe skip test in reindex build.gradle
stu-elastic Jul 25, 2022
ed86e89
Pull out FieldProps, address other comments
stu-elastic Jul 25, 2022
9007a92
comment index op, move updateRequest
stu-elastic Jul 26, 2022
27fa2df
Merge branch 'main' of github.com:elastic/elasticsearch into 220720-b…
stu-elastic Jul 26, 2022
d1702ba
Merge branch 'main' of github.com:elastic/elasticsearch into 220720-b…
stu-elastic Jul 28, 2022
4e456c4
Fix type params in CtxMap, fix NOW usage
stu-elastic Jul 28, 2022
315e5af
unused import
stu-elastic Jul 28, 2022
c217f3d
Update yamlRestTests with 8.5 and error message updates
stu-elastic Jul 28, 2022
864442d
Merge branch 'main' of github.com:elastic/elasticsearch into 220720-b…
stu-elastic Jul 28, 2022
1920efe
Add skipTest
stu-elastic Jul 28, 2022
90d6407
Cleanup yamlRestTests
stu-elastic Jul 28, 2022
0568250
Fix up blacklist
stu-elastic Jul 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/88665.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 88665
summary: "Script: Reindex & `UpdateByQuery` Metadata"
area: Infra/Scripting
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,22 @@ class org.elasticsearch.painless.api.Json {
String dump(def)
String dump(def, boolean)
}

class org.elasticsearch.script.Metadata {
String getIndex()
void setIndex(String)
String getId()
void setId(String)
String getRouting()
void setRouting(String)
long getVersion()
void setVersion(long)
boolean org.elasticsearch.script.ReindexMetadata isVersionInternal()
void org.elasticsearch.script.ReindexMetadata setVersionToInternal()
String getOp()
void setOp(String)
}

class org.elasticsearch.script.ReindexScript {
Metadata metadata()
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,16 @@ class org.elasticsearch.painless.api.Json {
String dump(def)
String dump(def, boolean)
}

class org.elasticsearch.script.Metadata {
String getIndex()
String getId()
String getRouting()
long getVersion()
String getOp()
void setOp(String)
}

class org.elasticsearch.script.UpdateByQueryScript {
Metadata metadata()
}
9 changes: 9 additions & 0 deletions modules/reindex/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,12 @@ tasks.named("yamlRestTestV7CompatTransform").configure { task ->

task.addAllowedWarningRegex("\\[types removal\\].*")
}

tasks.named("yamlRestTestV7CompatTest").configure {
systemProperty 'tests.rest.blacklist', [
'update_by_query/80_scripting/Can\'t change _id',
'update_by_query/80_scripting/Set unsupported operation type',
'update_by_query/80_scripting/Setting bogus context is an error',

].join(',')
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,15 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.IndexFieldMapper;
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.BulkByScrollTask;
import org.elasticsearch.index.reindex.ClientScrollableHitSource;
import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
import org.elasticsearch.index.reindex.WorkerBulkByScrollTaskState;
import org.elasticsearch.script.CtxMap;
import org.elasticsearch.script.Metadata;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.Scroll;
Expand All @@ -50,20 +47,18 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.LongSupplier;

import static java.lang.Math.max;
import static java.lang.Math.min;
Expand Down Expand Up @@ -819,147 +814,73 @@ public static RequestWrapper<DeleteRequest> wrap(DeleteRequest request) {
/**
* Apply a {@link Script} to a {@link RequestWrapper}
*/
public abstract static class ScriptApplier implements BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> {
public abstract static class ScriptApplier<T extends Metadata>
implements
BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> {

// "index" is the default operation
protected static final String INDEX = "index";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you please add a comment here on what this particular INDEX represents. I believe it's the op, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


private final WorkerBulkByScrollTaskState taskWorker;
protected final ScriptService scriptService;
protected final Script script;
protected final Map<String, Object> params;
protected final LongSupplier nowInMillisSupplier;

public ScriptApplier(
WorkerBulkByScrollTaskState taskWorker,
ScriptService scriptService,
Script script,
Map<String, Object> params
Map<String, Object> params,
LongSupplier nowInMillisSupplier
) {
this.taskWorker = taskWorker;
this.scriptService = scriptService;
this.script = script;
this.params = params;
this.nowInMillisSupplier = nowInMillisSupplier;
}

@Override
@SuppressWarnings("unchecked")
public RequestWrapper<?> apply(RequestWrapper<?> request, ScrollableHitSource.Hit doc) {
if (script == null) {
return request;
}

Map<String, Object> context = new HashMap<>();
context.put(IndexFieldMapper.NAME, doc.getIndex());
context.put(IdFieldMapper.NAME, doc.getId());
Long oldVersion = doc.getVersion();
context.put(VersionFieldMapper.NAME, oldVersion);
String oldRouting = doc.getRouting();
context.put(RoutingFieldMapper.NAME, oldRouting);
context.put(SourceFieldMapper.NAME, request.getSource());

OpType oldOpType = OpType.INDEX;
context.put("op", oldOpType.toString());
CtxMap<T> ctxMap = execute(doc, request.getSource());

execute(context);
T metadata = ctxMap.getMetadata();

String newOp = (String) context.remove("op");
if (newOp == null) {
throw new IllegalArgumentException("Script cleared operation type");
}
request.setSource(ctxMap.getSource());

/*
* It'd be lovely to only set the source if we know its been modified
* but it isn't worth keeping two copies of it around just to check!
*/
request.setSource((Map<String, Object>) context.remove(SourceFieldMapper.NAME));
updateRequest(request, metadata);

Object newValue = context.remove(IndexFieldMapper.NAME);
if (false == doc.getIndex().equals(newValue)) {
scriptChangedIndex(request, newValue);
}
newValue = context.remove(IdFieldMapper.NAME);
if (false == doc.getId().equals(newValue)) {
scriptChangedId(request, newValue);
}
newValue = context.remove(VersionFieldMapper.NAME);
if (false == Objects.equals(oldVersion, newValue)) {
scriptChangedVersion(request, newValue);
}
/*
* Its important that routing comes after parent in case you want to
* change them both.
*/
newValue = context.remove(RoutingFieldMapper.NAME);
if (false == Objects.equals(oldRouting, newValue)) {
scriptChangedRouting(request, newValue);
}
return requestFromOp(request, metadata.getOp());
}

OpType newOpType = OpType.fromString(newOp);
if (newOpType != oldOpType) {
return scriptChangedOpType(request, oldOpType, newOpType);
}
protected abstract CtxMap<T> execute(ScrollableHitSource.Hit doc, Map<String, Object> source);

if (false == context.isEmpty()) {
throw new IllegalArgumentException("Invalid fields added to context [" + String.join(",", context.keySet()) + ']');
}
return request;
}
protected abstract void updateRequest(RequestWrapper<?> request, T metadata);

protected RequestWrapper<?> scriptChangedOpType(RequestWrapper<?> request, OpType oldOpType, OpType newOpType) {
switch (newOpType) {
case NOOP -> {
protected RequestWrapper<?> requestFromOp(RequestWrapper<?> request, String op) {
switch (op) {
case "noop" -> {
taskWorker.countNoop();
return null;
}
case DELETE -> {
case "delete" -> {
RequestWrapper<DeleteRequest> delete = wrap(new DeleteRequest(request.getIndex(), request.getId()));
delete.setVersion(request.getVersion());
delete.setVersionType(VersionType.INTERNAL);
delete.setRouting(request.getRouting());
return delete;
}
default -> throw new IllegalArgumentException(
"Unsupported operation type change from [" + oldOpType + "] to [" + newOpType + "]"
);
case INDEX -> {
return request;
}
default -> throw new IllegalArgumentException("Unsupported operation type change from [" + INDEX + "] to [" + op + "]");
}
}

protected abstract void scriptChangedIndex(RequestWrapper<?> request, Object to);

protected abstract void scriptChangedId(RequestWrapper<?> request, Object to);

protected abstract void scriptChangedVersion(RequestWrapper<?> request, Object to);

protected abstract void scriptChangedRouting(RequestWrapper<?> request, Object to);

protected abstract void execute(Map<String, Object> ctx);
}

public enum OpType {

NOOP("noop"),
INDEX("index"),
DELETE("delete");

private final String id;

OpType(String id) {
this.id = id;
}

public static OpType fromString(String opType) {
String lowerOpType = opType.toLowerCase(Locale.ROOT);
return switch (lowerOpType) {
case "noop" -> OpType.NOOP;
case "index" -> OpType.INDEX;
case "delete" -> OpType.DELETE;
default -> throw new IllegalArgumentException(
"Operation type [" + lowerOpType + "] not allowed, only " + Arrays.toString(values()) + " are allowed"
);
};
}

@Override
public String toString() {
return id.toLowerCase(Locale.ROOT);
}
}

static class ScrollConsumableHitsResponse {
Expand Down
Loading