Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow specify dynamic templates in bulk request #69948

Merged
merged 49 commits into from
Apr 8, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
40e6a57
Add dynamic mapping type hint to bulk request
dnhatn Mar 3, 2021
eed16ef
Merge branch 'master' into dynamic-mapping-hint
dnhatn Mar 8, 2021
27a538b
Remove TODO
dnhatn Mar 8, 2021
6ee5210
Merge branch 'master' into dynamic-mapping-hint
dnhatn Mar 9, 2021
69a3849
match_mapping_hint
dnhatn Mar 9, 2021
02b2e89
fix doc
dnhatn Mar 9, 2021
96795af
fix test
dnhatn Mar 9, 2021
8f2c6c4
Merge branch 'master' into dynamic-mapping-hint
dnhatn Mar 14, 2021
3c43e98
dynamicMappingHints -> dynamicMatchMappingHints
dnhatn Mar 14, 2021
de97281
javadocs
dnhatn Mar 14, 2021
635c395
stronger yaml test
dnhatn Mar 14, 2021
12a87ed
match mapping hint -> mapping hint
dnhatn Mar 14, 2021
f99abcd
add index request test
dnhatn Mar 14, 2021
d126606
revert tests
dnhatn Mar 14, 2021
f2e6c0f
Remove left over
dnhatn Mar 15, 2021
00c6454
use existing methods for test
dnhatn Mar 15, 2021
5679e5e
add test for bulk request
dnhatn Mar 15, 2021
7922692
more rename
dnhatn Mar 15, 2021
ea394f1
stylecheck
dnhatn Mar 15, 2021
d77290c
Merge branch 'master' into dynamic-mapping-hint
dnhatn Mar 18, 2021
290ed5c
feedback
dnhatn Mar 19, 2021
c6468fd
revert
dnhatn Mar 19, 2021
58af624
Use template name
dnhatn Mar 19, 2021
6cea974
Merge branch 'master' into dynamic-mapping-hint
dnhatn Mar 22, 2021
286ce59
fix docs
dnhatn Mar 22, 2021
8c9e402
fix message
dnhatn Mar 22, 2021
d2dfda1
Remove "hint" from the parameter
dnhatn Mar 22, 2021
d5a9231
wording docs
dnhatn Mar 22, 2021
a6f9971
Add a test for dynamic template with a wrong type
dnhatn Mar 22, 2021
9c3ecba
simplify match_mapping_type
dnhatn Mar 22, 2021
00b92c1
Simplify the validation
dnhatn Mar 22, 2021
a179238
minimize changes in serialization
dnhatn Mar 22, 2021
dd84c8a
Merge branch 'master' into dynamic-mapping-hint
dnhatn Mar 22, 2021
315c8da
more yaml tests
dnhatn Mar 23, 2021
0126070
remove hint
dnhatn Mar 23, 2021
b341fcc
Reject dynamic_templates on old nodes
dnhatn Mar 23, 2021
fd87800
Merge branch 'master' into dynamic-mapping-hint
dnhatn Mar 23, 2021
aef927e
Add min node version
dnhatn Mar 23, 2021
8a80841
stylecheck
dnhatn Mar 23, 2021
18d7b39
Revert "stylecheck"
dnhatn Mar 24, 2021
dccfb8e
Revert "Add min node version"
dnhatn Mar 24, 2021
5a22dff
better error message
dnhatn Mar 24, 2021
a69d244
Merge branch 'master' into dynamic-mapping-hint
dnhatn Mar 24, 2021
5900952
Merge 'master' into dynamic-mapping-hint
dnhatn Mar 30, 2021
07029f0
Merge branch 'master' into dynamic-mapping-hint
dnhatn Apr 7, 2021
18bcb64
move yaml test
dnhatn Apr 7, 2021
bf94a83
Update docs/reference/docs/bulk.asciidoc
dnhatn Apr 7, 2021
60d11a4
Allow new dynamic template format with old indices
dnhatn Apr 7, 2021
d0010bd
fix mapping syntax
dnhatn Apr 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
---
"Match mapping type":
- skip:
version: " - 7.99.99"
reason: "introduced in 8.0"

- do:
indices.create:
index: test_index
body:
mappings:
dynamic_templates:
- locations:
custom_mapping_type: location_type
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
mapping:
type: geo_point
- do:
bulk:
refresh: true
body:
- index:
_index: test_index
_id: id_1
match_mapping_types:
location: location_type
- location: [ -71.34, 41.12 ]
- index:
_index: test_index
_id: id_2
match_mapping_types:
location: location_type
- location: "41.12,-71.34"
- do:
search:
index: test_index
body:
query:
geo_bounding_box:
location:
top_left:
lat: 42
lon: -72
bottom_right:
lat: 40
lon: -74
- match: { hits.total.value: 2 }
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,46 @@

import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectPath;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.GeoBoundingBoxQueryBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.hamcrest.Matchers;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

public class DynamicMappingIT extends ESIntegTestCase {

Expand Down Expand Up @@ -77,6 +95,14 @@ private static void assertMappingsHaveField(GetMappingsResponse mappings, String
assertTrue("Could not find [" + field + "] in " + typeMappingsMap.toString(), properties.containsKey(field));
}

private static void assertFieldMappingType(String index, String field, String expectedType) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
final GetMappingsResponse resp = client().admin().indices().prepareGetMappings(index).get();
final Map<String, Object> mappings = resp.mappings().get(index).sourceAsMap();
final String path = "properties." + String.join(".properties.", field.split("\\.")) + ".type";
Object actualType = ObjectPath.eval(path, mappings);
assertThat(Strings.toString(resp), actualType, equalTo(expectedType));
}

public void testConcurrentDynamicUpdates() throws Throwable {
createIndex("index");
final Thread[] indexThreads = new Thread[32];
Expand Down Expand Up @@ -156,4 +182,76 @@ public void testMappingVersionAfterDynamicMappingUpdate() throws Exception {
client().prepareIndex("test").setId("1").setSource("field", "text").get();
assertBusy(() -> assertThat(clusterService.state().metadata().index("test").getMappingVersion(), equalTo(1 + previousVersion)));
}

public void testSimpleTypeHint() throws Exception {
final XContentBuilder mappings = XContentFactory.jsonBuilder();
mappings.startObject();
{
mappings.startArray("dynamic_templates");
{
mappings.startObject();
mappings.startObject("locations");
{
mappings.field("custom_mapping_type", "location");
mappings.startObject("mapping");
{
mappings.field("type", "geo_point");
}
mappings.endObject();
}
mappings.endObject();
mappings.endObject();
}
mappings.endArray();
}
mappings.endObject();
assertAcked(client().admin().indices().prepareCreate("test").setMapping(mappings));
List<IndexRequest> requests = new ArrayList<>();
requests.add(new IndexRequest("test").id("1").source("location", "41.12,-71.34")
.setDynamicMappingTypeHints(Map.of("location", "location")));
requests.add(new IndexRequest("test").id("2").source(
XContentFactory.jsonBuilder()
.startObject()
.startObject("location").field("lat", 41.12).field("lon", -71.34).endObject()
.endObject())
.setDynamicMappingTypeHints(Map.of("location", "location")));
requests.add(new IndexRequest("test").id("3").source("address.location", "41.12,-71.34")
.setDynamicMappingTypeHints(Map.of("address.location", "location")));
requests.add(new IndexRequest("test").id("4").source("location", new double[]{-71.34, 41.12})
.setDynamicMappingTypeHints(Map.of("location", "location")));
requests.add(new IndexRequest("test").id("5").source("array_of_numbers", new double[]{-71.34, 41.12}));

Randomness.shuffle(requests);
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
requests.forEach(bulkRequest::add);
client().bulk(bulkRequest).actionGet();

SearchResponse searchResponse = client().prepareSearch("test")
.setQuery(new GeoBoundingBoxQueryBuilder("location").setCorners(new GeoPoint(42, -72), new GeoPoint(40, -74)))
.get();
assertSearchHits(searchResponse, "1", "2", "4");
searchResponse = client().prepareSearch("test")
.setQuery(new GeoBoundingBoxQueryBuilder("address.location").setCorners(new GeoPoint(42, -72), new GeoPoint(40, -74)))
.get();
assertSearchHits(searchResponse, "3");
}

public void testTypeHintNotFound() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test"));
BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
bulkRequest.add(
new IndexRequest("test").id("1").source(
XContentFactory.jsonBuilder()
.startObject()
.field("my_location", "41.12,-71.34")
.endObject())
.setDynamicMappingTypeHints(Map.of("my_location", "location"))
);
final BulkResponse bulkItemResponses = client().bulk(bulkRequest).actionGet();
assertTrue(bulkItemResponses.hasFailures());
for (BulkItemResponse resp : bulkItemResponses.getItems()) {
assertThat(resp.getFailure().getCause(), instanceOf(MapperParsingException.class));
assertThat(resp.getFailureMessage(), containsString("Can't find template for matching type [location] of field [my_location]"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public final class BulkRequestParser {
private static final ParseField IF_SEQ_NO = new ParseField("if_seq_no");
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS);
private static final ParseField MATCH_MAPPING_TYPES = new ParseField("match_mapping_types");

// TODO: Remove this parameter once the BulkMonitoring endpoint has been removed
private final boolean errorOnType;
Expand Down Expand Up @@ -156,6 +157,7 @@ public void parse(
int retryOnConflict = 0;
String pipeline = defaultPipeline;
boolean requireAlias = defaultRequireAlias != null && defaultRequireAlias;
Map<String, String> matchMappingTypes = Map.of();

// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// or START_OBJECT which will have another set of parameters
Expand Down Expand Up @@ -206,7 +208,10 @@ public void parse(
}
} else if (token == XContentParser.Token.START_ARRAY) {
throw new IllegalArgumentException("Malformed action/metadata line [" + line +
"], expected a simple value for field [" + currentFieldName + "] but found [" + token + "]");
"], expected a simple value for field [" + currentFieldName + "] but found [" + token + "]");
} else if (token == XContentParser.Token.START_OBJECT &&
MATCH_MAPPING_TYPES.match(currentFieldName, parser.getDeprecationHandler())) {
matchMappingTypes = parser.mapStrings();
} else if (token == XContentParser.Token.START_OBJECT && SOURCE.match(currentFieldName,
parser.getDeprecationHandler())) {
fetchSourceContext = FetchSourceContext.fromXContent(parser);
Expand Down Expand Up @@ -238,6 +243,7 @@ public void parse(
.version(version).versionType(versionType)
.setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setDynamicMappingTypeHints(matchMappingTypes)
.setRequireAlias(requireAlias), type);
} else {
indexRequestConsumer.accept(new IndexRequest(index).id(id).routing(routing)
Expand All @@ -252,6 +258,7 @@ public void parse(
.version(version).versionType(versionType)
.create(true).setPipeline(pipeline).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm)
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setDynamicMappingTypeHints(matchMappingTypes)
.setRequireAlias(requireAlias), type);
} else if ("update".equals(action)) {
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,9 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat
request.ifSeqNo(), request.ifPrimaryTerm());
} else {
final IndexRequest request = context.getRequestToExecute();
result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse(
request.index(), request.id(), request.source(), request.getContentType(), request.routing()),
final SourceToParse sourceToParse = new SourceToParse(request.index(), request.id(), request.source(),
request.getContentType(), request.routing(), request.getDynamicMappingTypeHints());
result = primary.applyIndexOperationOnPrimary(version, request.versionType(), sourceToParse,
request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
}
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
Expand Down Expand Up @@ -479,8 +480,8 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse
case INDEX:
final IndexRequest indexRequest = (IndexRequest) docWriteRequest;
final ShardId shardId = replica.shardId();
final SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), indexRequest.id(),
indexRequest.source(), indexRequest.getContentType(), indexRequest.routing());
final SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), indexRequest.id(), indexRequest.source(),
indexRequest.getContentType(), indexRequest.routing(), Map.of());
result = replica.applyIndexOperationOnReplica(primaryResponse.getSeqNo(), primaryResponse.getPrimaryTerm(),
primaryResponse.getVersion(), indexRequest.getAutoGeneratedTimestamp(), indexRequest.isRetry(), sourceToParse);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
private long ifSeqNo = UNASSIGNED_SEQ_NO;
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;

private Map<String, String> dynamicMappingTypeHints = Map.of();

public IndexRequest(StreamInput in) throws IOException {
this(null, in);
}
Expand Down Expand Up @@ -146,6 +148,11 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
} else {
requireAlias = false;
}
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
dynamicMappingTypeHints = in.readMap(StreamInput::readString, StreamInput::readString);
} else {
dynamicMappingTypeHints = Map.of();
}
}

public IndexRequest() {
Expand Down Expand Up @@ -655,6 +662,13 @@ private void writeBody(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_10_0)) {
out.writeBoolean(requireAlias);
}
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
out.writeMap(dynamicMappingTypeHints, StreamOutput::writeString, StreamOutput::writeString);
} else {
if (dynamicMappingTypeHints.isEmpty() == false) {
throw new IllegalStateException("Dynamic mapping type hints requires all nodes in the cluster on 8.0 or later");
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

@Override
Expand Down Expand Up @@ -712,4 +726,13 @@ public IndexRequest setRequireAlias(boolean requireAlias) {
this.requireAlias = requireAlias;
return this;
}

public IndexRequest setDynamicMappingTypeHints(Map<String, String> dynamicMappingTypeHints) {
this.dynamicMappingTypeHints = Objects.requireNonNull(dynamicMappingTypeHints);
return this;
}

dnhatn marked this conversation as resolved.
Show resolved Hide resolved
public Map<String, String> getDynamicMappingTypeHints() {
return dynamicMappingTypeHints;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.index.translog.Translog;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -140,7 +141,8 @@ private LeafReader getDelegate() {
private LeafReader createInMemoryLeafReader() {
assert Thread.holdsLock(this);
final ParsedDocument parsedDocs = mapper.parse(new SourceToParse(shardId.getIndexName(), operation.id(),
operation.source(), XContentHelper.xContentType(operation.source()), operation.routing()));
operation.source(), XContentHelper.xContentType(operation.source()), operation.routing(), Map.of()));

parsedDocs.updateSeqID(operation.seqNo(), operation.primaryTerm());
parsedDocs.version().setLongValue(operation.version());
final IndexWriterConfig writeConfig = new IndexWriterConfig(analyzer).setOpenMode(IndexWriterConfig.OpenMode.CREATE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private GetResult innerGetLoadFromStoredFields(String id, String[] storedFields,
// Slow path: recreate stored fields from original source
assert source != null : "original source in translog must exist";
SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), id, source, XContentHelper.xContentType(source),
fieldVisitor.routing());
fieldVisitor.routing(), Map.of());
ParsedDocument doc = indexShard.mapperService().documentMapper().parse(sourceToParse);
assert doc.dynamicMappingsUpdate() == null : "mapping updates should not be required on already-indexed doc";
// update special fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,8 @@ private static void parseArray(ParseContext context, ObjectMapper parentMapper,
// TODO: shouldn't this skip, not parse?
parseNonDynamicArray(context, parentMapper, lastFieldName, arrayFieldName);
} else {
Mapper objectMapperFromTemplate = dynamic.getDynamicFieldsBuilder().createObjectMapperFromTemplate(context, arrayFieldName);
Mapper objectMapperFromTemplate =
dynamic.getDynamicFieldsBuilder().createFieldOrObjectMapperFromTemplate(context, arrayFieldName);
if (objectMapperFromTemplate == null) {
parseNonDynamicArray(context, parentMapper, lastFieldName, arrayFieldName);
} else {
Expand Down Expand Up @@ -711,7 +712,12 @@ private static Tuple<Integer, ObjectMapper> getDynamicParentMapper(ParseContext
return new Tuple<>(pathsAdded, parent);
} else {
//objects are created under properties even with dynamic: runtime, as the runtime section only holds leaf fields
mapper = (ObjectMapper) dynamic.getDynamicFieldsBuilder().createDynamicObjectMapper(context, paths[i]);
final Mapper fieldMapper = dynamic.getDynamicFieldsBuilder().createDynamicObjectMapper(context, paths[i]);
if (fieldMapper instanceof ObjectMapper == false) {
throw new MapperParsingException("Field [" + context.path().pathAsText(paths[i]) + "] must be an object; " +
"but it's configured as [" + fieldMapper.typeName() + "] in dynamic templates");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: can this happen in reality? If so do we have a test for it? I am wondering if this became relevant with the previous iteration but it may no longer be the case now that hints are separate from types. Maybe we should rather have an assertion in createDynamicObjectMapper and cast there so that its return type can be the right one? This is unrelated to your PR though :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still wonder about this added conditional ;)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test for this in a6f9971.

}
mapper = (ObjectMapper) fieldMapper;
if (mapper.nested() != ObjectMapper.Nested.NO) {
throw new MapperParsingException("It is forbidden to create dynamic nested objects (["
+ context.path().pathAsText(paths[i]) + "]) through `copy_to` or dots in field names");
Expand Down
Loading