Skip to content

Commit bd77626

Browse files
committed
Add the ability to require an ingest pipeline (#46847)
This commit adds the ability to require an ingest pipeline on an index. Today we can have a default pipeline, but that could be overridden by a request pipeline parameter. This commit introduces a new index setting index.required_pipeline that acts similarly to index.default_pipeline, except that it can not be overridden by a request pipeline parameter. Additionally, a default pipeline and a request pipeline can not both be set. The required pipeline can be set to _none to ensure that no pipeline ever runs for index requests on that index.
1 parent 251dbd8 commit bd77626

File tree

9 files changed

+513
-32
lines changed

9 files changed

+513
-32
lines changed

docs/reference/index-modules.asciidoc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,13 +234,20 @@ specific index module:
234234
The length of time that a <<delete-versioning,deleted document's version number>> remains available for <<index-versioning,further versioned operations>>.
235235
Defaults to `60s`.
236236

237-
`index.default_pipeline`::
237+
`index.default_pipeline`::
238238

239239
The default <<ingest,ingest node>> pipeline for this index. Index requests will fail
240240
if the default pipeline is set and the pipeline does not exist. The default may be
241241
overridden using the `pipeline` parameter. The special pipeline name `_none` indicates
242242
no ingest pipeline should be run.
243243

244+
`index.required_pipeline`::
245+
The required <<ingest,ingest node>> pipeline for this index. Index requests
246+
will fail if the required pipeline is set and the pipeline does not exist.
247+
The required pipeline can not be overridden with the `pipeline` parameter. A
248+
default pipeline and a required pipeline can not both be set. The special
249+
pipeline name `_none` indicates no ingest pipeline will run.
250+
244251
[float]
245252
=== Settings in other index modules
246253

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
---
2+
teardown:
3+
- do:
4+
ingest.delete_pipeline:
5+
id: "my_pipeline"
6+
ignore: 404
7+
8+
---
9+
"Test index with required pipeline":
10+
- do:
11+
ingest.put_pipeline:
12+
id: "my_pipeline"
13+
body: >
14+
{
15+
"description": "_description",
16+
"processors": [
17+
{
18+
"bytes" : {
19+
"field" : "bytes_source_field",
20+
"target_field" : "bytes_target_field"
21+
}
22+
}
23+
]
24+
}
25+
- match: { acknowledged: true }
26+
# required pipeline via index
27+
- do:
28+
indices.create:
29+
index: test
30+
body:
31+
settings:
32+
index:
33+
required_pipeline: "my_pipeline"
34+
aliases:
35+
test_alias: {}
36+
37+
- do:
38+
index:
39+
index: test
40+
id: 1
41+
body: {bytes_source_field: "1kb"}
42+
43+
- do:
44+
get:
45+
index: test
46+
id: 1
47+
- match: { _source.bytes_source_field: "1kb" }
48+
- match: { _source.bytes_target_field: 1024 }
49+
# required pipeline via alias
50+
- do:
51+
index:
52+
index: test_alias
53+
id: 2
54+
body: {bytes_source_field: "1kb"}
55+
56+
- do:
57+
get:
58+
index: test
59+
id: 2
60+
- match: { _source.bytes_source_field: "1kb" }
61+
- match: { _source.bytes_target_field: 1024 }
62+
# required pipeline via upsert
63+
- do:
64+
update:
65+
index: test
66+
id: 3
67+
body:
68+
script:
69+
source: "ctx._source.ran_script = true"
70+
lang: "painless"
71+
upsert: { "bytes_source_field":"1kb" }
72+
- do:
73+
get:
74+
index: test
75+
id: 3
76+
- match: { _source.bytes_source_field: "1kb" }
77+
- match: { _source.bytes_target_field: 1024 }
78+
# required pipeline via scripted upsert
79+
- do:
80+
update:
81+
index: test
82+
id: 4
83+
body:
84+
script:
85+
source: "ctx._source.bytes_source_field = '1kb'"
86+
lang: "painless"
87+
upsert : {}
88+
scripted_upsert: true
89+
- do:
90+
get:
91+
index: test
92+
id: 4
93+
- match: { _source.bytes_source_field: "1kb" }
94+
- match: { _source.bytes_target_field: 1024 }
95+
# required pipeline via doc_as_upsert
96+
- do:
97+
update:
98+
index: test
99+
id: 5
100+
body:
101+
doc: { "bytes_source_field":"1kb" }
102+
doc_as_upsert: true
103+
- do:
104+
get:
105+
index: test
106+
id: 5
107+
- match: { _source.bytes_source_field: "1kb" }
108+
- match: { _source.bytes_target_field: 1024 }
109+
# required pipeline via bulk upsert
110+
# note - bulk scripted upsert's execute the pipeline before the script, so any data referenced by the pipeline
111+
# needs to be in the upsert, not the script
112+
- do:
113+
bulk:
114+
refresh: true
115+
body: |
116+
{"update":{"_id":"6","_index":"test"}}
117+
{"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}}
118+
{"update":{"_id":"7","_index":"test"}}
119+
{"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
120+
{"update":{"_id":"8","_index":"test"}}
121+
{"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}
122+
{"update":{"_id":"6_alias","_index":"test_alias"}}
123+
{"script":"ctx._source.ran_script = true","upsert":{"bytes_source_field":"1kb"}}
124+
{"update":{"_id":"7_alias","_index":"test_alias"}}
125+
{"doc":{"bytes_source_field":"2kb"}, "doc_as_upsert":true}
126+
{"update":{"_id":"8_alias","_index":"test_alias"}}
127+
{"script": "ctx._source.ran_script = true","upsert":{"bytes_source_field":"3kb"}, "scripted_upsert" : true}
128+
129+
- do:
130+
mget:
131+
body:
132+
docs:
133+
- { _index: "test", _id: "6" }
134+
- { _index: "test", _id: "7" }
135+
- { _index: "test", _id: "8" }
136+
- { _index: "test", _id: "6_alias" }
137+
- { _index: "test", _id: "7_alias" }
138+
- { _index: "test", _id: "8_alias" }
139+
- match: { docs.0._index: "test" }
140+
- match: { docs.0._id: "6" }
141+
- match: { docs.0._source.bytes_source_field: "1kb" }
142+
- match: { docs.0._source.bytes_target_field: 1024 }
143+
- is_false: docs.0._source.ran_script
144+
- match: { docs.1._index: "test" }
145+
- match: { docs.1._id: "7" }
146+
- match: { docs.1._source.bytes_source_field: "2kb" }
147+
- match: { docs.1._source.bytes_target_field: 2048 }
148+
- match: { docs.2._index: "test" }
149+
- match: { docs.2._id: "8" }
150+
- match: { docs.2._source.bytes_source_field: "3kb" }
151+
- match: { docs.2._source.bytes_target_field: 3072 }
152+
- match: { docs.2._source.ran_script: true }
153+
- match: { docs.3._index: "test" }
154+
- match: { docs.3._id: "6_alias" }
155+
- match: { docs.3._source.bytes_source_field: "1kb" }
156+
- match: { docs.3._source.bytes_target_field: 1024 }
157+
- is_false: docs.3._source.ran_script
158+
- match: { docs.4._index: "test" }
159+
- match: { docs.4._id: "7_alias" }
160+
- match: { docs.4._source.bytes_source_field: "2kb" }
161+
- match: { docs.4._source.bytes_target_field: 2048 }
162+
- match: { docs.5._index: "test" }
163+
- match: { docs.5._id: "8_alias" }
164+
- match: { docs.5._source.bytes_source_field: "3kb" }
165+
- match: { docs.5._source.bytes_target_field: 3072 }
166+
- match: { docs.5._source.ran_script: true }
167+
168+
# bad request, request pipeline can not be specified
169+
- do:
170+
catch: /illegal_argument_exception.*request pipeline \[pipeline\] can not override required pipeline \[my_pipeline\]/
171+
index:
172+
index: test
173+
id: 9
174+
pipeline: "pipeline"
175+
body: {bytes_source_field: "1kb"}

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 83 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.logging.log4j.message.ParameterizedMessage;
2323
import org.apache.lucene.util.SparseFixedBitSet;
24+
import org.elasticsearch.Assertions;
2425
import org.elasticsearch.ElasticsearchParseException;
2526
import org.elasticsearch.ExceptionsHelper;
2627
import org.elasticsearch.ResourceAlreadyExistsException;
@@ -76,6 +77,7 @@
7677
import java.util.HashSet;
7778
import java.util.Iterator;
7879
import java.util.List;
80+
import java.util.Locale;
7981
import java.util.Map;
8082
import java.util.Objects;
8183
import java.util.Set;
@@ -160,11 +162,14 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
160162
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = metaData.indices();
161163
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
162164
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
165+
163166
if (indexRequest != null) {
164-
// get pipeline from request
165-
String pipeline = indexRequest.getPipeline();
166-
if (pipeline == null) {
167-
// start to look for default pipeline via settings found in the index meta data
167+
if (indexRequest.isPipelineResolved() == false) {
168+
final String requestPipeline = indexRequest.getPipeline();
169+
indexRequest.setPipeline(IngestService.NOOP_PIPELINE_NAME);
170+
boolean requestCanOverridePipeline = true;
171+
String requiredPipeline = null;
172+
// start to look for default or required pipelines via settings found in the index meta data
168173
IndexMetaData indexMetaData = indicesMetaData.get(actionRequest.index());
169174
// check the alias for the index request (this is how normal index requests are modeled)
170175
if (indexMetaData == null && indexRequest.index() != null) {
@@ -183,34 +188,86 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
183188
}
184189
}
185190
if (indexMetaData != null) {
186-
// Find the default pipeline if one is defined from and existing index.
187-
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetaData.getSettings());
188-
indexRequest.setPipeline(defaultPipeline);
189-
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
190-
hasIndexRequestsWithPipelines = true;
191+
final Settings indexSettings = indexMetaData.getSettings();
192+
if (IndexSettings.REQUIRED_PIPELINE.exists(indexSettings)) {
193+
// find the required pipeline if one is defined from an existing index
194+
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(indexSettings);
195+
assert IndexSettings.DEFAULT_PIPELINE.get(indexSettings).equals(IngestService.NOOP_PIPELINE_NAME) :
196+
IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
197+
indexRequest.setPipeline(requiredPipeline);
198+
requestCanOverridePipeline = false;
199+
} else {
200+
// find the default pipeline if one is defined from an existing index
201+
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexSettings);
202+
indexRequest.setPipeline(defaultPipeline);
191203
}
192204
} else if (indexRequest.index() != null) {
193-
// No index exists yet (and is valid request), so matching index templates to look for a default pipeline
205+
// the index does not exist yet (and is valid request), so match index templates to look for a default pipeline
194206
List<IndexTemplateMetaData> templates = MetaDataIndexTemplateService.findTemplates(metaData, indexRequest.index());
195207
assert (templates != null);
196-
String defaultPipeline = IngestService.NOOP_PIPELINE_NAME;
197-
// order of templates are highest order first, break if we find a default_pipeline
208+
// order of templates are highest order first, we have to iterate through them all though
209+
String defaultPipeline = null;
198210
for (IndexTemplateMetaData template : templates) {
199211
final Settings settings = template.settings();
200-
if (IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
212+
if (requiredPipeline == null && IndexSettings.REQUIRED_PIPELINE.exists(settings)) {
213+
requiredPipeline = IndexSettings.REQUIRED_PIPELINE.get(settings);
214+
requestCanOverridePipeline = false;
215+
// we can not break in case a lower-order template has a default pipeline that we need to reject
216+
} else if (defaultPipeline == null && IndexSettings.DEFAULT_PIPELINE.exists(settings)) {
201217
defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(settings);
202-
break;
218+
// we can not break in case a lower-order template has a required pipeline that we need to reject
203219
}
204220
}
205-
indexRequest.setPipeline(defaultPipeline);
206-
if (IngestService.NOOP_PIPELINE_NAME.equals(defaultPipeline) == false) {
207-
hasIndexRequestsWithPipelines = true;
221+
if (requiredPipeline != null && defaultPipeline != null) {
222+
// we can not have picked up a required and a default pipeline from applying templates
223+
final String message = String.format(
224+
Locale.ROOT,
225+
"required pipeline [%s] and default pipeline [%s] can not both be set",
226+
requiredPipeline,
227+
defaultPipeline);
228+
throw new IllegalArgumentException(message);
229+
}
230+
final String pipeline;
231+
if (requiredPipeline != null) {
232+
pipeline = requiredPipeline;
233+
} else {
234+
pipeline = defaultPipeline != null ? defaultPipeline : IngestService.NOOP_PIPELINE_NAME;
235+
}
236+
indexRequest.setPipeline(pipeline);
237+
}
238+
239+
if (requestPipeline != null) {
240+
if (requestCanOverridePipeline == false) {
241+
final String message = String.format(
242+
Locale.ROOT,
243+
"request pipeline [%s] can not override required pipeline [%s]",
244+
requestPipeline,
245+
requiredPipeline);
246+
throw new IllegalArgumentException(message);
247+
} else {
248+
indexRequest.setPipeline(requestPipeline);
208249
}
209250
}
210-
} else if (IngestService.NOOP_PIPELINE_NAME.equals(pipeline) == false) {
251+
252+
if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
253+
hasIndexRequestsWithPipelines = true;
254+
}
255+
/*
256+
* We have to track whether or not the pipeline for this request has already been resolved. It can happen that the
257+
* pipeline for this request has already been derived yet we execute this loop again. That occurs if the bulk request
258+
* has been forwarded by a non-ingest coordinating node to an ingest node. In this case, the coordinating node will have
259+
* already resolved the pipeline for this request. It is important that we are able to distinguish this situation as we
260+
* can not double-resolve the pipeline because we will not be able to distinguish the case of the pipeline having been
261+
* set from a request pipeline parameter versus having been set by the resolution. We need to be able to distinguish
262+
* these cases as we need to reject the request if the pipeline was set by a required pipeline and there is a request
263+
* pipeline parameter too.
264+
*/
265+
indexRequest.isPipelineResolved(true);
266+
} else if (IngestService.NOOP_PIPELINE_NAME.equals(indexRequest.getPipeline()) == false) {
211267
hasIndexRequestsWithPipelines = true;
212268
}
213269
}
270+
214271
}
215272

216273
if (hasIndexRequestsWithPipelines) {
@@ -221,6 +278,14 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
221278
if (clusterService.localNode().isIngestNode()) {
222279
processBulkIndexIngestRequest(task, bulkRequest, listener);
223280
} else {
281+
if (Assertions.ENABLED) {
282+
final boolean allAreForwardedRequests = bulkRequest.requests()
283+
.stream()
284+
.map(TransportBulkAction::getIndexWriteRequest)
285+
.filter(Objects::nonNull)
286+
.allMatch(IndexRequest::isPipelineResolved);
287+
assert allAreForwardedRequests : bulkRequest;
288+
}
224289
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
225290
}
226291
} catch (Exception e) {

0 commit comments

Comments
 (0)