Skip to content

Commit

Permalink
Expose dynamic_templates parameter in Ingest (#71716)
Browse files Browse the repository at this point in the history
This change exposes the newly introduced parameter `dynamic_templates`
in ingest. This parameter can be set by a set processor or a script processor.

Relates #69948
  • Loading branch information
dnhatn authored Apr 19, 2021
1 parent 06664d5 commit 46ada22
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 2 deletions.
25 changes: 25 additions & 0 deletions docs/reference/ingest.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@ Processors can access the following metadata fields by name:
* `_index`
* `_id`
* `_routing`
* `_dynamic_templates`

[source,console]
----
Expand All @@ -539,6 +540,30 @@ PUT _ingest/pipeline/my-pipeline
Use a Mustache template snippet to access metadata field values. For example,
`{{{_routing}}}` retrieves a document's routing value.

[source,console]
----
PUT _ingest/pipeline/my-pipeline
{
"processors": [
{
"set": {
"description": "Use geo_point dynamic template for address field",
"field": "_dynamic_templates",
"value": {
"address": "geo_point"
}
}
}
]
}
----

The set processor above tells ES to use the dynamic template named `geo_point`
for the field `address` if this field is not defined in the mapping of the index
yet. This processor overrides the dynamic template for the field `address` if
already defined in the bulk request, but has no effect on other dynamic
templates defined in the bulk request.

WARNING: If you <<create-document-ids-automatically,automatically generate>>
document IDs, you cannot use `{{{_id}}}` in a processor. {es} assigns
auto-generated `_id` values after ingest.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.core.Is.is;

Expand All @@ -45,6 +46,7 @@ Script.DEFAULT_SCRIPT_LANG, new MockScriptEngine(
Integer bytesIn = (Integer) ctx.get("bytes_in");
Integer bytesOut = (Integer) ctx.get("bytes_out");
ctx.put("bytes_total", bytesIn + bytesOut);
ctx.put("_dynamic_templates", Map.of("foo", "bar"));
return null;
}
),
Expand Down Expand Up @@ -84,5 +86,6 @@ private void assertIngestDocument(IngestDocument ingestDocument) {
assertThat(ingestDocument.getSourceAndMetadata(), hasKey("bytes_total"));
int bytesTotal = ingestDocument.getFieldValue("bytes_in", Integer.class) + ingestDocument.getFieldValue("bytes_out", Integer.class);
assertThat(ingestDocument.getSourceAndMetadata().get("bytes_total"), is(bytesTotal));
assertThat(ingestDocument.getSourceAndMetadata().get("_dynamic_templates"), equalTo(Map.of("foo", "bar")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -136,6 +138,18 @@ public void testSetMetadataIfPrimaryTerm() throws Exception {
assertThat(ingestDocument.getFieldValue(Metadata.IF_PRIMARY_TERM.getFieldName(), Long.class), Matchers.equalTo(ifPrimaryTerm));
}

public void testSetDynamicTemplates() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
int iters = between(1, 3);
for (int i = 0; i < iters; i++) {
Map<String, String> dynamicTemplates = IntStream.range(0, between(0, 3)).boxed()
.collect(Collectors.toMap(n -> "field-" + n, n -> randomFrom("int", "geo_point", "keyword")));
Processor processor = createSetProcessor(Metadata.DYNAMIC_TEMPLATES.getFieldName(), dynamicTemplates, null, true, false);
processor.execute(ingestDocument);
assertThat(ingestDocument.getFieldValue(Metadata.DYNAMIC_TEMPLATES.getFieldName(), Map.class), equalTo(dynamicTemplates));
}
}

public void testCopyFromOtherField() throws Exception {
Map<String, Object> document = new HashMap<>();
Object fieldValue = RandomDocumentPicks.randomFieldValue(random());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
---
"Dynamic templates":
- do:
indices.create:
index: test_index
body:
mappings:
dynamic_templates:
- location:
mapping:
type: geo_point

- do:
ingest.put_pipeline:
id: "my_set_pipeline"
body: >
{
"description": "Use location dynamic template for home_address and work_address",
"processors": [
{
"set" : {
"field": "_dynamic_templates",
"value": {
"home_address": "location",
"work_address": "location"
}
}
}
]
}
- match: { acknowledged: true }

- do:
ingest.put_pipeline:
id: "my_script_pipeline"
body: >
{
"description": "Use location dynamic template for home_address and work_address",
"processors": [
{
"script" : {
"source": "ctx._dynamic_templates = [params.f1: params.type, params.f2: params.type]",
"params": {
"f1": "home_address",
"f2": "work_address",
"type": "location"
}
}
}
]
}
- match: { acknowledged: true }

- do:
bulk:
refresh: true
body:
- index:
_index: test_index
_id: id_1
pipeline: my_set_pipeline
- { "home_address": [ -71.34, 41.12 ]}
- index:
_index: test_index
_id: id_2
pipeline: my_script_pipeline
- { "work_address": "41.12,-71.34"}
- match: { errors: false }
- match: { items.0.index.result: created }
- match: { items.1.index.result: created }

- do:
index:
index: test_index
id: id_3
body: { "home_address": [ -71.34, 41.12 ]}
refresh: true
pipeline: my_script_pipeline

- do:
index:
index: test_index
id: id_4
body: { "work_address": [ -71.34, 41.12 ]}
refresh: true
pipeline: my_set_pipeline

- do:
search:
index: test_index
body:
query:
geo_bounding_box:
home_address:
top_left:
lat: 42
lon: -72
bottom_right:
lat: 40
lon: -74
- match: { hits.total.value: 2 }
- match: { hits.hits.0._id: id_1 }
- match: { hits.hits.1._id: id_3 }

- do:
search:
index: test_index
body:
query:
geo_bounding_box:
work_address:
top_left:
lat: 42
lon: -72
bottom_right:
lat: 40
lon: -74
- match: { hits.total.value: 2 }
- match: { hits.hits.0._id: id_2 }
- match: { hits.hits.1._id: id_4 }

- do:
bulk:
refresh: true
body:
- index:
_index: test_index
_id: id_5
pipeline: my_set_pipeline
dynamic_templates:
school_address: location
- { "school_address": [ -71.34, 41.12 ]}
- match: { errors: false }
- match: { items.0.index.result: created }

- do:
search:
index: test_index
body:
query:
geo_bounding_box:
school_address:
top_left:
lat: 42
lon: -72
bottom_right:
lat: 40
lon: -74
- match: { hits.total.value: 1 }
- match: { hits.hits.0._id: id_5 }
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -210,6 +211,15 @@ private static List<IngestDocument> parseDocs(Map<String, Object> config) {
throw new IllegalArgumentException("[_if_primary_term] cannot be null");
}
}
if (dataMap.containsKey(Metadata.DYNAMIC_TEMPLATES.getFieldName())) {
Map<String, String> dynamicTemplates = ConfigurationUtils.readMap(
null, null, dataMap, Metadata.DYNAMIC_TEMPLATES.getFieldName());
if (dynamicTemplates != null) {
ingestDocument.setFieldValue(Metadata.DYNAMIC_TEMPLATES.getFieldName(), new HashMap<>(dynamicTemplates));
} else {
throw new IllegalArgumentException("[_dynamic_templates] cannot be null");
}
}
ingestDocumentList.add(ingestDocument);
}
return ingestDocumentList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public IngestDocument(String index, String id, String routing,
if (versionType != null) {
sourceAndMetadata.put(Metadata.VERSION_TYPE.getFieldName(), VersionType.toString(versionType));
}

this.ingestMetadata = new HashMap<>();
this.ingestMetadata.put(TIMESTAMP, ZonedDateTime.now(ZoneOffset.UTC));
}
Expand Down Expand Up @@ -816,7 +815,8 @@ public enum Metadata {
VERSION(VersionFieldMapper.NAME),
VERSION_TYPE("_version_type"),
IF_SEQ_NO("_if_seq_no"),
IF_PRIMARY_TERM("_if_primary_term");
IF_PRIMARY_TERM("_if_primary_term"),
DYNAMIC_TEMPLATES("_dynamic_templates");

private final String fieldName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,11 @@ private void innerExecute(int slot, IndexRequest indexRequest, Pipeline pipeline
indexRequest.setIfPrimaryTerm(((Number) metadataMap.get(IngestDocument.Metadata.IF_PRIMARY_TERM)).longValue());
}
indexRequest.source(ingestDocument.getSourceAndMetadata(), indexRequest.getContentType());
if (metadataMap.get(IngestDocument.Metadata.DYNAMIC_TEMPLATES) != null) {
Map<String, String> mergedDynamicTemplates = new HashMap<>(indexRequest.getDynamicTemplates());
mergedDynamicTemplates.putAll((Map<String, String>) metadataMap.get(IngestDocument.Metadata.DYNAMIC_TEMPLATES));
indexRequest.setDynamicTemplates(mergedDynamicTemplates);
}
handler.accept(null);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -720,6 +721,28 @@ public void testExecuteSuccess() {
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
}

public void testDynamicTemplates() throws Exception {
IngestService ingestService = createWithProcessors(Collections.singletonMap(
"set", (factories, tag, description, config) ->
new FakeProcessor("set", "", "", (ingestDocument) -> ingestDocument.setFieldValue("_dynamic_templates",
Map.of("foo", "bar", "foo.bar", "baz")))));
PutPipelineRequest putRequest = new PutPipelineRequest("_id",
new BytesArray("{\"processors\": [{\"set\" : {}}]}"), XContentType.JSON);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty
ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState);
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
final IndexRequest indexRequest =
new IndexRequest("_index").id("_id").source(emptyMap()).setPipeline("_id").setFinalPipeline("_none");
CountDownLatch latch = new CountDownLatch(1);
final BiConsumer<Integer, Exception> failureHandler = (v, e) -> { throw new AssertionError("must never fail", e);};
final BiConsumer<Thread, Exception> completionHandler = (t, e) -> latch.countDown();
ingestService.executeBulkRequest(1, Collections.singletonList(indexRequest), failureHandler, completionHandler,
indexReq -> {}, Names.WRITE);
latch.await();
assertThat(indexRequest.getDynamicTemplates(), equalTo(Map.of("foo", "bar", "foo.bar", "baz")));
}

public void testExecuteEmptyPipeline() throws Exception {
IngestService ingestService = createWithProcessors(emptyMap());
PutPipelineRequest putRequest =
Expand Down Expand Up @@ -765,6 +788,8 @@ public void testExecutePropagateAllMetadataUpdates() throws Exception {
ingestDocument.setFieldValue(metadata.getFieldName(), ifSeqNo);
} else if (metadata == IngestDocument.Metadata.IF_PRIMARY_TERM) {
ingestDocument.setFieldValue(metadata.getFieldName(), ifPrimaryTerm);
} else if (metadata == IngestDocument.Metadata.DYNAMIC_TEMPLATES) {
ingestDocument.setFieldValue(metadata.getFieldName(), Map.of("foo", "bar"));
} else {
ingestDocument.setFieldValue(metadata.getFieldName(), "update" + metadata.getFieldName());
}
Expand Down

0 comments on commit 46ada22

Please sign in to comment.