Skip to content

[Rest Api Compatibility] Type metadata for docs used in simulate request #74222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
Expand All @@ -33,17 +35,24 @@
import java.util.Objects;

public class SimulatePipelineRequest extends ActionRequest implements ToXContentObject {
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(SimulatePipelineRequest.class);
private String id;
private boolean verbose;
private BytesReference source;
private XContentType xContentType;
private RestApiVersion restApiVersion;

/**
* Creates a new request with the given source and its content type
*/
public SimulatePipelineRequest(BytesReference source, XContentType xContentType) {
this(source, xContentType, RestApiVersion.current());
}

public SimulatePipelineRequest(BytesReference source, XContentType xContentType, RestApiVersion restApiVersion) {
this.source = Objects.requireNonNull(source);
this.xContentType = Objects.requireNonNull(xContentType);
this.restApiVersion = restApiVersion;
}

SimulatePipelineRequest() {
Expand Down Expand Up @@ -133,28 +142,30 @@ public boolean isVerbose() {

static final String SIMULATED_PIPELINE_ID = "_simulate_pipeline";

static Parsed parseWithPipelineId(String pipelineId, Map<String, Object> config, boolean verbose, IngestService ingestService) {
static Parsed parseWithPipelineId(String pipelineId, Map<String, Object> config, boolean verbose, IngestService ingestService,
RestApiVersion restApiVersion) {
if (pipelineId == null) {
throw new IllegalArgumentException("param [pipeline] is null");
}
Pipeline pipeline = ingestService.getPipeline(pipelineId);
if (pipeline == null) {
throw new IllegalArgumentException("pipeline [" + pipelineId + "] does not exist");
}
List<IngestDocument> ingestDocumentList = parseDocs(config);
List<IngestDocument> ingestDocumentList = parseDocs(config, restApiVersion);
return new Parsed(pipeline, ingestDocumentList, verbose);
}

static Parsed parse(Map<String, Object> config, boolean verbose, IngestService ingestService) throws Exception {
static Parsed parse(Map<String, Object> config, boolean verbose, IngestService ingestService, RestApiVersion restApiVersion)
throws Exception {
Map<String, Object> pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE);
Pipeline pipeline = Pipeline.create(
SIMULATED_PIPELINE_ID, pipelineConfig, ingestService.getProcessorFactories(), ingestService.getScriptService()
);
List<IngestDocument> ingestDocumentList = parseDocs(config);
List<IngestDocument> ingestDocumentList = parseDocs(config, restApiVersion);
return new Parsed(pipeline, ingestDocumentList, verbose);
}

private static List<IngestDocument> parseDocs(Map<String, Object> config) {
private static List<IngestDocument> parseDocs(Map<String, Object> config, RestApiVersion restApiVersion) {
List<Map<String, Object>> docs =
ConfigurationUtils.readList(null, null, config, Fields.DOCS);
if (docs.isEmpty()) {
Expand All @@ -174,6 +185,10 @@ private static List<IngestDocument> parseDocs(Map<String, Object> config) {
dataMap, Metadata.ID.getFieldName(), "_id");
String routing = ConfigurationUtils.readOptionalStringOrIntProperty(null, null,
dataMap, Metadata.ROUTING.getFieldName());
if (restApiVersion == RestApiVersion.V_7 && dataMap.containsKey(Metadata.TYPE.getFieldName())) {
deprecationLogger.compatibleApiWarning("simulate_pipeline_with_types",
"[types removal] specifying _type in pipeline simulation requests is deprecated");
}
Long version = null;
if (dataMap.containsKey(Metadata.VERSION.getFieldName())) {
String versionValue = ConfigurationUtils.readOptionalStringOrLongProperty(null, null,
Expand Down Expand Up @@ -224,4 +239,8 @@ private static List<IngestDocument> parseDocs(Map<String, Object> config) {
}
return ingestDocumentList;
}

public RestApiVersion getRestApiVersion() {
return restApiVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ protected void doExecute(Task task, SimulatePipelineRequest request, ActionListe
final SimulatePipelineRequest.Parsed simulateRequest;
try {
if (request.getId() != null) {
simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), ingestService);
simulateRequest = SimulatePipelineRequest.parseWithPipelineId(request.getId(), source, request.isVerbose(), ingestService,
request.getRestApiVersion());
} else {
simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), ingestService);
simulateRequest = SimulatePipelineRequest.parse(source, request.isVerbose(), ingestService, request.getRestApiVersion());
}
} catch (Exception e) {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.IngestDocument.Metadata;

Expand Down Expand Up @@ -118,6 +120,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(metadata.getKey().getFieldName(), metadata.getValue().toString());
}
}
if(builder.getRestApiVersion() == RestApiVersion.V_7) {
builder.field(MapperService.TYPE_FIELD_NAME, MapperService.SINGLE_MAPPING_NAME);
}
Map<String, Object> source = IngestDocument.deepCopyMap(ingestDocument.getSourceAndMetadata());
metadataMap.keySet().forEach(mD -> source.remove(mD.getFieldName()));
builder.field(SOURCE_FIELD, source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ public String toString() {

public enum Metadata {
INDEX(IndexFieldMapper.NAME),
TYPE("_type"),
ID(IdFieldMapper.NAME),
ROUTING(RoutingFieldMapper.NAME),
VERSION(VersionFieldMapper.NAME),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.action.ingest;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.ingest.CompoundProcessor;
import org.elasticsearch.ingest.IngestDocument;
Expand All @@ -31,12 +32,13 @@
import static org.elasticsearch.action.ingest.SimulatePipelineRequest.Fields;
import static org.elasticsearch.action.ingest.SimulatePipelineRequest.SIMULATED_PIPELINE_ID;
import static org.elasticsearch.ingest.IngestDocument.Metadata.ID;
import static org.elasticsearch.ingest.IngestDocument.Metadata.IF_PRIMARY_TERM;
import static org.elasticsearch.ingest.IngestDocument.Metadata.IF_SEQ_NO;
import static org.elasticsearch.ingest.IngestDocument.Metadata.INDEX;
import static org.elasticsearch.ingest.IngestDocument.Metadata.ROUTING;
import static org.elasticsearch.ingest.IngestDocument.Metadata.TYPE;
import static org.elasticsearch.ingest.IngestDocument.Metadata.VERSION;
import static org.elasticsearch.ingest.IngestDocument.Metadata.VERSION_TYPE;
import static org.elasticsearch.ingest.IngestDocument.Metadata.IF_SEQ_NO;
import static org.elasticsearch.ingest.IngestDocument.Metadata.IF_PRIMARY_TERM;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
Expand All @@ -49,7 +51,8 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {

@Before
public void init() throws IOException {
TestProcessor processor = new TestProcessor(ingestDocument -> {});
TestProcessor processor = new TestProcessor(ingestDocument -> {
});
CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor);
Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, null, pipelineCompoundProcessor);
Map<String, Processor.Factory> registry =
Expand Down Expand Up @@ -84,7 +87,8 @@ public void testParseUsingPipelineStore() throws Exception {
}

SimulatePipelineRequest.Parsed actualRequest =
SimulatePipelineRequest.parseWithPipelineId(SIMULATED_PIPELINE_ID, requestContent, false, ingestService);
SimulatePipelineRequest.parseWithPipelineId(SIMULATED_PIPELINE_ID, requestContent, false, ingestService,
RestApiVersion.current());
assertThat(actualRequest.isVerbose(), equalTo(false));
assertThat(actualRequest.getDocuments().size(), equalTo(numDocs));
Iterator<Map<String, Object>> expectedDocsIterator = expectedDocs.iterator();
Expand Down Expand Up @@ -112,7 +116,7 @@ public void testParseWithProvidedPipeline() throws Exception {
Map<String, Object> doc = new HashMap<>();
Map<String, Object> expectedDoc = new HashMap<>();
List<IngestDocument.Metadata> fields = Arrays.asList(INDEX, ID, ROUTING, VERSION, VERSION_TYPE, IF_SEQ_NO, IF_PRIMARY_TERM);
for(IngestDocument.Metadata field : fields) {
for (IngestDocument.Metadata field : fields) {
if (field == VERSION) {
Object value = randomBoolean() ? randomLong() : randomInt();
doc.put(field.getFieldName(), randomBoolean() ? value : value.toString());
Expand Down Expand Up @@ -177,7 +181,8 @@ public void testParseWithProvidedPipeline() throws Exception {

requestContent.put(Fields.PIPELINE, pipelineConfig);

SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parse(requestContent, false, ingestService);
SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parse(requestContent, false, ingestService,
RestApiVersion.current());
assertThat(actualRequest.isVerbose(), equalTo(false));
assertThat(actualRequest.getDocuments().size(), equalTo(numDocs));
Iterator<Map<String, Object>> expectedDocsIterator = expectedDocs.iterator();
Expand All @@ -204,7 +209,7 @@ public void testNullPipelineId() {
List<Map<String, Object>> docs = new ArrayList<>();
requestContent.put(Fields.DOCS, docs);
Exception e = expectThrows(IllegalArgumentException.class,
() -> SimulatePipelineRequest.parseWithPipelineId(null, requestContent, false, ingestService));
() -> SimulatePipelineRequest.parseWithPipelineId(null, requestContent, false, ingestService, RestApiVersion.current()));
assertThat(e.getMessage(), equalTo("param [pipeline] is null"));
}

Expand All @@ -214,7 +219,7 @@ public void testNonExistentPipelineId() {
List<Map<String, Object>> docs = new ArrayList<>();
requestContent.put(Fields.DOCS, docs);
Exception e = expectThrows(IllegalArgumentException.class,
() -> SimulatePipelineRequest.parseWithPipelineId(pipelineId, requestContent, false, ingestService));
() -> SimulatePipelineRequest.parseWithPipelineId(pipelineId, requestContent, false, ingestService, RestApiVersion.current()));
assertThat(e.getMessage(), equalTo("pipeline [" + pipelineId + "] does not exist"));
}

Expand All @@ -227,7 +232,7 @@ public void testNotValidDocs() {
requestContent.put(Fields.DOCS, docs);
requestContent.put(Fields.PIPELINE, pipelineConfig);
Exception e1 = expectThrows(IllegalArgumentException.class,
() -> SimulatePipelineRequest.parse(requestContent, false, ingestService));
() -> SimulatePipelineRequest.parse(requestContent, false, ingestService, RestApiVersion.current()));
assertThat(e1.getMessage(), equalTo("must specify at least one document in [docs]"));

List<String> stringList = new ArrayList<>();
Expand All @@ -236,14 +241,107 @@ public void testNotValidDocs() {
requestContent.put(Fields.DOCS, stringList);
requestContent.put(Fields.PIPELINE, pipelineConfig);
Exception e2 = expectThrows(IllegalArgumentException.class,
() -> SimulatePipelineRequest.parse(requestContent, false, ingestService));
() -> SimulatePipelineRequest.parse(requestContent, false, ingestService, RestApiVersion.current()));
assertThat(e2.getMessage(), equalTo("malformed [docs] section, should include an inner object"));

docs.add(new HashMap<>());
requestContent.put(Fields.DOCS, docs);
requestContent.put(Fields.PIPELINE, pipelineConfig);
Exception e3 = expectThrows(ElasticsearchParseException.class,
() -> SimulatePipelineRequest.parse(requestContent, false, ingestService));
() -> SimulatePipelineRequest.parse(requestContent, false, ingestService, RestApiVersion.current()));
assertThat(e3.getMessage(), containsString("required property is missing"));
}

public void testIngestPipelineWithDocumentsWithType() throws Exception {
int numDocs = randomIntBetween(1, 10);

Map<String, Object> requestContent = new HashMap<>();
List<Map<String, Object>> docs = new ArrayList<>();
List<Map<String, Object>> expectedDocs = new ArrayList<>();
requestContent.put(Fields.DOCS, docs);
for (int i = 0; i < numDocs; i++) {
Map<String, Object> doc = new HashMap<>();
Map<String, Object> expectedDoc = new HashMap<>();
List<IngestDocument.Metadata> fields = Arrays.asList(INDEX, TYPE, ID, ROUTING, VERSION, VERSION_TYPE);
for (IngestDocument.Metadata field : fields) {
if (field == VERSION) {
Long value = randomLong();
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
} else if (field == VERSION_TYPE) {
String value = VersionType.toString(
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE)
);
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
} else if (field == TYPE) {
String value = randomAlphaOfLengthBetween(1, 10);
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
} else {
if (randomBoolean()) {
String value = randomAlphaOfLengthBetween(1, 10);
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), value);
} else {
Integer value = randomIntBetween(1, 1000000);
doc.put(field.getFieldName(), value);
expectedDoc.put(field.getFieldName(), String.valueOf(value));
}
}
}
String fieldName = randomAlphaOfLengthBetween(1, 10);
String fieldValue = randomAlphaOfLengthBetween(1, 10);
doc.put(Fields.SOURCE, Collections.singletonMap(fieldName, fieldValue));
docs.add(doc);
expectedDoc.put(Fields.SOURCE, Collections.singletonMap(fieldName, fieldValue));
expectedDocs.add(expectedDoc);
}
Map<String, Object> pipelineConfig = new HashMap<>();
List<Map<String, Object>> processors = new ArrayList<>();
int numProcessors = randomIntBetween(1, 10);
for (int i = 0; i < numProcessors; i++) {
Map<String, Object> processorConfig = new HashMap<>();
List<Map<String, Object>> onFailureProcessors = new ArrayList<>();
int numOnFailureProcessors = randomIntBetween(0, 1);
for (int j = 0; j < numOnFailureProcessors; j++) {
onFailureProcessors.add(Collections.singletonMap("mock_processor", Collections.emptyMap()));
}
if (numOnFailureProcessors > 0) {
processorConfig.put("on_failure", onFailureProcessors);
}
processors.add(Collections.singletonMap("mock_processor", processorConfig));
}
pipelineConfig.put("processors", processors);
List<Map<String, Object>> onFailureProcessors = new ArrayList<>();
int numOnFailureProcessors = randomIntBetween(0, 1);
for (int i = 0; i < numOnFailureProcessors; i++) {
onFailureProcessors.add(Collections.singletonMap("mock_processor", Collections.emptyMap()));
}
if (numOnFailureProcessors > 0) {
pipelineConfig.put("on_failure", onFailureProcessors);
}
requestContent.put(Fields.PIPELINE, pipelineConfig);
SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parse(requestContent, false, ingestService,
RestApiVersion.V_7);
assertThat(actualRequest.isVerbose(), equalTo(false));
assertThat(actualRequest.getDocuments().size(), equalTo(numDocs));
Iterator<Map<String, Object>> expectedDocsIterator = expectedDocs.iterator();
for (IngestDocument ingestDocument : actualRequest.getDocuments()) {
Map<String, Object> expectedDocument = expectedDocsIterator.next();
Map<IngestDocument.Metadata, Object> metadataMap = ingestDocument.extractMetadata();
assertThat(metadataMap.get(INDEX), equalTo(expectedDocument.get(INDEX.getFieldName())));
assertThat(metadataMap.get(ID), equalTo(expectedDocument.get(ID.getFieldName())));
assertThat(metadataMap.get(ROUTING), equalTo(expectedDocument.get(ROUTING.getFieldName())));
assertThat(metadataMap.get(VERSION), equalTo(expectedDocument.get(VERSION.getFieldName())));
assertThat(metadataMap.get(VERSION_TYPE), equalTo(expectedDocument.get(VERSION_TYPE.getFieldName())));
assertThat(ingestDocument.getSourceAndMetadata(), equalTo(expectedDocument.get(Fields.SOURCE)));
}
assertThat(actualRequest.getPipeline().getId(), equalTo(SIMULATED_PIPELINE_ID));
assertThat(actualRequest.getPipeline().getDescription(), nullValue());
assertThat(actualRequest.getPipeline().getProcessors().size(), equalTo(numProcessors));

assertWarnings("[types removal] specifying _type in pipeline simulation requests is deprecated");

}
}