Skip to content

Commit d52997b

Browse files
INGEST: Add Pipeline Processor (#32473) (#33368)
* INGEST: Add Pipeline Processor (#32473) * Adds Processor capable of invoking other pipelines * Closes #31842
1 parent 8c9b90a commit d52997b

File tree

8 files changed

+339
-12
lines changed

8 files changed

+339
-12
lines changed

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
8282
processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory());
8383
processors.put(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory());
8484
processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory());
85+
processors.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService));
8586
return Collections.unmodifiableMap(processors);
8687
}
8788

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.ingest.common;
21+
22+
import java.util.Map;
23+
import org.elasticsearch.ingest.AbstractProcessor;
24+
import org.elasticsearch.ingest.ConfigurationUtils;
25+
import org.elasticsearch.ingest.IngestDocument;
26+
import org.elasticsearch.ingest.IngestService;
27+
import org.elasticsearch.ingest.Pipeline;
28+
import org.elasticsearch.ingest.Processor;
29+
30+
public class PipelineProcessor extends AbstractProcessor {
31+
32+
public static final String TYPE = "pipeline";
33+
34+
private final String pipelineName;
35+
36+
private final IngestService ingestService;
37+
38+
private PipelineProcessor(String tag, String pipelineName, IngestService ingestService) {
39+
super(tag);
40+
this.pipelineName = pipelineName;
41+
this.ingestService = ingestService;
42+
}
43+
44+
@Override
45+
public void execute(IngestDocument ingestDocument) throws Exception {
46+
Pipeline pipeline = ingestService.getPipeline(pipelineName);
47+
if (pipeline == null) {
48+
throw new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']');
49+
}
50+
ingestDocument.executePipeline(pipeline);
51+
}
52+
53+
@Override
54+
public String getType() {
55+
return TYPE;
56+
}
57+
58+
public static final class Factory implements Processor.Factory {
59+
60+
private final IngestService ingestService;
61+
62+
public Factory(IngestService ingestService) {
63+
this.ingestService = ingestService;
64+
}
65+
66+
@Override
67+
public PipelineProcessor create(Map<String, Processor.Factory> registry, String processorTag,
68+
Map<String, Object> config) throws Exception {
69+
String pipeline =
70+
ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "pipeline");
71+
return new PipelineProcessor(processorTag, pipeline, ingestService);
72+
}
73+
}
74+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.ingest.common;
20+
21+
import java.util.Collections;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import java.util.concurrent.CompletableFuture;
25+
import org.elasticsearch.ElasticsearchException;
26+
import org.elasticsearch.ingest.CompoundProcessor;
27+
import org.elasticsearch.ingest.IngestDocument;
28+
import org.elasticsearch.ingest.IngestService;
29+
import org.elasticsearch.ingest.Pipeline;
30+
import org.elasticsearch.ingest.Processor;
31+
import org.elasticsearch.ingest.RandomDocumentPicks;
32+
import org.elasticsearch.test.ESTestCase;
33+
34+
import static org.mockito.Mockito.mock;
35+
import static org.mockito.Mockito.when;
36+
37+
public class PipelineProcessorTests extends ESTestCase {
38+
39+
public void testExecutesPipeline() throws Exception {
40+
String pipelineId = "pipeline";
41+
IngestService ingestService = mock(IngestService.class);
42+
CompletableFuture<IngestDocument> invoked = new CompletableFuture<>();
43+
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
44+
Pipeline pipeline = new Pipeline(
45+
pipelineId, null, null,
46+
new CompoundProcessor(new Processor() {
47+
@Override
48+
public void execute(final IngestDocument ingestDocument) throws Exception {
49+
invoked.complete(ingestDocument);
50+
}
51+
52+
@Override
53+
public String getType() {
54+
return null;
55+
}
56+
57+
@Override
58+
public String getTag() {
59+
return null;
60+
}
61+
})
62+
);
63+
when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
64+
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
65+
Map<String, Object> config = new HashMap<>();
66+
config.put("pipeline", pipelineId);
67+
factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument);
68+
assertEquals(testIngestDocument, invoked.get());
69+
}
70+
71+
public void testThrowsOnMissingPipeline() throws Exception {
72+
IngestService ingestService = mock(IngestService.class);
73+
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
74+
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
75+
Map<String, Object> config = new HashMap<>();
76+
config.put("pipeline", "missingPipelineId");
77+
IllegalStateException e = expectThrows(
78+
IllegalStateException.class,
79+
() -> factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument)
80+
);
81+
assertEquals(
82+
"Pipeline processor configured for non-existent pipeline [missingPipelineId]", e.getMessage()
83+
);
84+
}
85+
86+
public void testThrowsOnRecursivePipelineInvocations() throws Exception {
87+
String innerPipelineId = "inner";
88+
String outerPipelineId = "outer";
89+
IngestService ingestService = mock(IngestService.class);
90+
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
91+
Map<String, Object> outerConfig = new HashMap<>();
92+
outerConfig.put("pipeline", innerPipelineId);
93+
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
94+
Pipeline outer = new Pipeline(
95+
outerPipelineId, null, null,
96+
new CompoundProcessor(factory.create(Collections.emptyMap(), null, outerConfig))
97+
);
98+
Map<String, Object> innerConfig = new HashMap<>();
99+
innerConfig.put("pipeline", outerPipelineId);
100+
Pipeline inner = new Pipeline(
101+
innerPipelineId, null, null,
102+
new CompoundProcessor(factory.create(Collections.emptyMap(), null, innerConfig))
103+
);
104+
when(ingestService.getPipeline(outerPipelineId)).thenReturn(outer);
105+
when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner);
106+
outerConfig.put("pipeline", innerPipelineId);
107+
ElasticsearchException e = expectThrows(
108+
ElasticsearchException.class,
109+
() -> factory.create(Collections.emptyMap(), null, outerConfig).execute(testIngestDocument)
110+
);
111+
assertEquals(
112+
"Recursive invocation of pipeline [inner] detected.", e.getRootCause().getMessage()
113+
);
114+
}
115+
}

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/10_basic.yml

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@
2323
- match: { nodes.$master.ingest.processors.11.type: json }
2424
- match: { nodes.$master.ingest.processors.12.type: kv }
2525
- match: { nodes.$master.ingest.processors.13.type: lowercase }
26-
- match: { nodes.$master.ingest.processors.14.type: remove }
27-
- match: { nodes.$master.ingest.processors.15.type: rename }
28-
- match: { nodes.$master.ingest.processors.16.type: script }
29-
- match: { nodes.$master.ingest.processors.17.type: set }
30-
- match: { nodes.$master.ingest.processors.18.type: sort }
31-
- match: { nodes.$master.ingest.processors.19.type: split }
32-
- match: { nodes.$master.ingest.processors.20.type: trim }
33-
- match: { nodes.$master.ingest.processors.21.type: uppercase }
26+
- match: { nodes.$master.ingest.processors.14.type: pipeline }
27+
- match: { nodes.$master.ingest.processors.15.type: remove }
28+
- match: { nodes.$master.ingest.processors.16.type: rename }
29+
- match: { nodes.$master.ingest.processors.17.type: script }
30+
- match: { nodes.$master.ingest.processors.18.type: set }
31+
- match: { nodes.$master.ingest.processors.19.type: sort }
32+
- match: { nodes.$master.ingest.processors.20.type: split }
33+
- match: { nodes.$master.ingest.processors.21.type: trim }
34+
- match: { nodes.$master.ingest.processors.22.type: uppercase }
3435

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
---
2+
teardown:
3+
- do:
4+
ingest.delete_pipeline:
5+
id: "inner"
6+
ignore: 404
7+
8+
- do:
9+
ingest.delete_pipeline:
10+
id: "outer"
11+
ignore: 404
12+
13+
---
14+
"Test Pipeline Processor with Simple Inner Pipeline":
15+
- do:
16+
ingest.put_pipeline:
17+
id: "inner"
18+
body: >
19+
{
20+
"description" : "inner pipeline",
21+
"processors" : [
22+
{
23+
"set" : {
24+
"field": "foo",
25+
"value": "bar"
26+
}
27+
},
28+
{
29+
"set" : {
30+
"field": "baz",
31+
"value": "blub"
32+
}
33+
}
34+
]
35+
}
36+
- match: { acknowledged: true }
37+
38+
- do:
39+
ingest.put_pipeline:
40+
id: "outer"
41+
body: >
42+
{
43+
"description" : "outer pipeline",
44+
"processors" : [
45+
{
46+
"pipeline" : {
47+
"pipeline": "inner"
48+
}
49+
}
50+
]
51+
}
52+
- match: { acknowledged: true }
53+
54+
- do:
55+
index:
56+
index: test
57+
type: test
58+
id: 1
59+
pipeline: "outer"
60+
body: {}
61+
62+
- do:
63+
get:
64+
index: test
65+
type: test
66+
id: 1
67+
- match: { _source.foo: "bar" }
68+
- match: { _source.baz: "blub" }
69+
70+
---
71+
"Test Pipeline Processor with Circular Pipelines":
72+
- do:
73+
ingest.put_pipeline:
74+
id: "outer"
75+
body: >
76+
{
77+
"description" : "outer pipeline",
78+
"processors" : [
79+
{
80+
"pipeline" : {
81+
"pipeline": "inner"
82+
}
83+
}
84+
]
85+
}
86+
- match: { acknowledged: true }
87+
88+
- do:
89+
ingest.put_pipeline:
90+
id: "inner"
91+
body: >
92+
{
93+
"description" : "inner pipeline",
94+
"processors" : [
95+
{
96+
"pipeline" : {
97+
"pipeline": "outer"
98+
}
99+
}
100+
]
101+
}
102+
- match: { acknowledged: true }
103+
104+
- do:
105+
catch: /illegal_state_exception/
106+
index:
107+
index: test
108+
type: test
109+
id: 1
110+
pipeline: "outer"
111+
body: {}
112+
- match: { error.root_cause.0.type: "exception" }
113+
- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Recursive invocation of pipeline [inner] detected." }

server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
package org.elasticsearch.ingest;
2121

22+
import java.util.Collections;
23+
import java.util.IdentityHashMap;
24+
import java.util.Set;
2225
import org.elasticsearch.common.Strings;
2326
import org.elasticsearch.index.VersionType;
2427
import org.elasticsearch.index.mapper.IdFieldMapper;
@@ -56,6 +59,9 @@ public final class IngestDocument {
5659
private final Map<String, Object> sourceAndMetadata;
5760
private final Map<String, Object> ingestMetadata;
5861

62+
// Contains all pipelines that have been executed for this document
63+
private final Set<Pipeline> executedPipelines = Collections.newSetFromMap(new IdentityHashMap<>());
64+
5965
public IngestDocument(String index, String type, String id, String routing, String parent,
6066
Long version, VersionType versionType, Map<String, Object> source) {
6167
this.sourceAndMetadata = new HashMap<>();
@@ -636,6 +642,19 @@ private static Object deepCopy(Object value) {
636642
}
637643
}
638644

645+
/**
646+
* Executes the given pipeline with for this document unless the pipeline has already been executed
647+
* for this document.
648+
* @param pipeline Pipeline to execute
649+
* @throws Exception On exception in pipeline execution
650+
*/
651+
public void executePipeline(Pipeline pipeline) throws Exception {
652+
if (this.executedPipelines.add(pipeline) == false) {
653+
throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected.");
654+
}
655+
pipeline.execute(this);
656+
}
657+
639658
@Override
640659
public boolean equals(Object obj) {
641660
if (obj == this) { return true; }

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public IngestService(ClusterService clusterService, ThreadPool threadPool,
9393
threadPool.getThreadContext(), threadPool::relativeTimeInMillis,
9494
(delay, command) -> threadPool.schedule(
9595
TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command
96-
)
96+
), this
9797
)
9898
);
9999
this.threadPool = threadPool;

0 commit comments

Comments
 (0)