Skip to content

Commit f690b49

Browse files
INGEST: Add Pipeline Processor (#32473)
* INGEST: Add Pipeline Processor * Adds Processor capable of invoking other pipelines * Closes #31842
1 parent 48b388c commit f690b49

File tree

7 files changed

+330
-4
lines changed

7 files changed

+330
-4
lines changed

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
8282
processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory());
8383
processors.put(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory());
8484
processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory());
85+
processors.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService));
8586
processors.put(DissectProcessor.TYPE, new DissectProcessor.Factory());
8687
return Collections.unmodifiableMap(processors);
8788
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.ingest.common;
21+
22+
import java.util.Map;
23+
import org.elasticsearch.ingest.AbstractProcessor;
24+
import org.elasticsearch.ingest.ConfigurationUtils;
25+
import org.elasticsearch.ingest.IngestDocument;
26+
import org.elasticsearch.ingest.IngestService;
27+
import org.elasticsearch.ingest.Pipeline;
28+
import org.elasticsearch.ingest.Processor;
29+
30+
public class PipelineProcessor extends AbstractProcessor {
31+
32+
public static final String TYPE = "pipeline";
33+
34+
private final String pipelineName;
35+
36+
private final IngestService ingestService;
37+
38+
private PipelineProcessor(String tag, String pipelineName, IngestService ingestService) {
39+
super(tag);
40+
this.pipelineName = pipelineName;
41+
this.ingestService = ingestService;
42+
}
43+
44+
@Override
45+
public void execute(IngestDocument ingestDocument) throws Exception {
46+
Pipeline pipeline = ingestService.getPipeline(pipelineName);
47+
if (pipeline == null) {
48+
throw new IllegalStateException("Pipeline processor configured for non-existent pipeline [" + pipelineName + ']');
49+
}
50+
ingestDocument.executePipeline(pipeline);
51+
}
52+
53+
@Override
54+
public String getType() {
55+
return TYPE;
56+
}
57+
58+
public static final class Factory implements Processor.Factory {
59+
60+
private final IngestService ingestService;
61+
62+
public Factory(IngestService ingestService) {
63+
this.ingestService = ingestService;
64+
}
65+
66+
@Override
67+
public PipelineProcessor create(Map<String, Processor.Factory> registry, String processorTag,
68+
Map<String, Object> config) throws Exception {
69+
String pipeline =
70+
ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "pipeline");
71+
return new PipelineProcessor(processorTag, pipeline, ingestService);
72+
}
73+
}
74+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.ingest.common;
20+
21+
import java.util.Collections;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import java.util.concurrent.CompletableFuture;
25+
import org.elasticsearch.ElasticsearchException;
26+
import org.elasticsearch.ingest.CompoundProcessor;
27+
import org.elasticsearch.ingest.IngestDocument;
28+
import org.elasticsearch.ingest.IngestService;
29+
import org.elasticsearch.ingest.Pipeline;
30+
import org.elasticsearch.ingest.Processor;
31+
import org.elasticsearch.ingest.RandomDocumentPicks;
32+
import org.elasticsearch.test.ESTestCase;
33+
34+
import static org.mockito.Mockito.mock;
35+
import static org.mockito.Mockito.when;
36+
37+
public class PipelineProcessorTests extends ESTestCase {
38+
39+
public void testExecutesPipeline() throws Exception {
40+
String pipelineId = "pipeline";
41+
IngestService ingestService = mock(IngestService.class);
42+
CompletableFuture<IngestDocument> invoked = new CompletableFuture<>();
43+
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
44+
Pipeline pipeline = new Pipeline(
45+
pipelineId, null, null,
46+
new CompoundProcessor(new Processor() {
47+
@Override
48+
public void execute(final IngestDocument ingestDocument) throws Exception {
49+
invoked.complete(ingestDocument);
50+
}
51+
52+
@Override
53+
public String getType() {
54+
return null;
55+
}
56+
57+
@Override
58+
public String getTag() {
59+
return null;
60+
}
61+
})
62+
);
63+
when(ingestService.getPipeline(pipelineId)).thenReturn(pipeline);
64+
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
65+
Map<String, Object> config = new HashMap<>();
66+
config.put("pipeline", pipelineId);
67+
factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument);
68+
assertEquals(testIngestDocument, invoked.get());
69+
}
70+
71+
public void testThrowsOnMissingPipeline() throws Exception {
72+
IngestService ingestService = mock(IngestService.class);
73+
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
74+
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
75+
Map<String, Object> config = new HashMap<>();
76+
config.put("pipeline", "missingPipelineId");
77+
IllegalStateException e = expectThrows(
78+
IllegalStateException.class,
79+
() -> factory.create(Collections.emptyMap(), null, config).execute(testIngestDocument)
80+
);
81+
assertEquals(
82+
"Pipeline processor configured for non-existent pipeline [missingPipelineId]", e.getMessage()
83+
);
84+
}
85+
86+
public void testThrowsOnRecursivePipelineInvocations() throws Exception {
87+
String innerPipelineId = "inner";
88+
String outerPipelineId = "outer";
89+
IngestService ingestService = mock(IngestService.class);
90+
IngestDocument testIngestDocument = RandomDocumentPicks.randomIngestDocument(random(), new HashMap<>());
91+
Map<String, Object> outerConfig = new HashMap<>();
92+
outerConfig.put("pipeline", innerPipelineId);
93+
PipelineProcessor.Factory factory = new PipelineProcessor.Factory(ingestService);
94+
Pipeline outer = new Pipeline(
95+
outerPipelineId, null, null,
96+
new CompoundProcessor(factory.create(Collections.emptyMap(), null, outerConfig))
97+
);
98+
Map<String, Object> innerConfig = new HashMap<>();
99+
innerConfig.put("pipeline", outerPipelineId);
100+
Pipeline inner = new Pipeline(
101+
innerPipelineId, null, null,
102+
new CompoundProcessor(factory.create(Collections.emptyMap(), null, innerConfig))
103+
);
104+
when(ingestService.getPipeline(outerPipelineId)).thenReturn(outer);
105+
when(ingestService.getPipeline(innerPipelineId)).thenReturn(inner);
106+
outerConfig.put("pipeline", innerPipelineId);
107+
ElasticsearchException e = expectThrows(
108+
ElasticsearchException.class,
109+
() -> factory.create(Collections.emptyMap(), null, outerConfig).execute(testIngestDocument)
110+
);
111+
assertEquals(
112+
"Recursive invocation of pipeline [inner] detected.", e.getRootCause().getMessage()
113+
);
114+
}
115+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
---
2+
teardown:
3+
- do:
4+
ingest.delete_pipeline:
5+
id: "inner"
6+
ignore: 404
7+
8+
- do:
9+
ingest.delete_pipeline:
10+
id: "outer"
11+
ignore: 404
12+
13+
---
14+
"Test Pipeline Processor with Simple Inner Pipeline":
15+
- do:
16+
ingest.put_pipeline:
17+
id: "inner"
18+
body: >
19+
{
20+
"description" : "inner pipeline",
21+
"processors" : [
22+
{
23+
"set" : {
24+
"field": "foo",
25+
"value": "bar"
26+
}
27+
},
28+
{
29+
"set" : {
30+
"field": "baz",
31+
"value": "blub"
32+
}
33+
}
34+
]
35+
}
36+
- match: { acknowledged: true }
37+
38+
- do:
39+
ingest.put_pipeline:
40+
id: "outer"
41+
body: >
42+
{
43+
"description" : "outer pipeline",
44+
"processors" : [
45+
{
46+
"pipeline" : {
47+
"pipeline": "inner"
48+
}
49+
}
50+
]
51+
}
52+
- match: { acknowledged: true }
53+
54+
- do:
55+
index:
56+
index: test
57+
type: test
58+
id: 1
59+
pipeline: "outer"
60+
body: {}
61+
62+
- do:
63+
get:
64+
index: test
65+
type: test
66+
id: 1
67+
- match: { _source.foo: "bar" }
68+
- match: { _source.baz: "blub" }
69+
70+
---
71+
"Test Pipeline Processor with Circular Pipelines":
72+
- do:
73+
ingest.put_pipeline:
74+
id: "outer"
75+
body: >
76+
{
77+
"description" : "outer pipeline",
78+
"processors" : [
79+
{
80+
"pipeline" : {
81+
"pipeline": "inner"
82+
}
83+
}
84+
]
85+
}
86+
- match: { acknowledged: true }
87+
88+
- do:
89+
ingest.put_pipeline:
90+
id: "inner"
91+
body: >
92+
{
93+
"description" : "inner pipeline",
94+
"processors" : [
95+
{
96+
"pipeline" : {
97+
"pipeline": "outer"
98+
}
99+
}
100+
]
101+
}
102+
- match: { acknowledged: true }
103+
104+
- do:
105+
catch: /illegal_state_exception/
106+
index:
107+
index: test
108+
type: test
109+
id: 1
110+
pipeline: "outer"
111+
body: {}
112+
- match: { error.root_cause.0.type: "exception" }
113+
- match: { error.root_cause.0.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Recursive invocation of pipeline [inner] detected." }

server/src/main/java/org/elasticsearch/ingest/IngestDocument.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
package org.elasticsearch.ingest;
2121

22+
import java.util.Collections;
23+
import java.util.IdentityHashMap;
24+
import java.util.Set;
2225
import org.elasticsearch.common.Strings;
2326
import org.elasticsearch.index.VersionType;
2427
import org.elasticsearch.index.mapper.IdFieldMapper;
@@ -55,6 +58,9 @@ public final class IngestDocument {
5558
private final Map<String, Object> sourceAndMetadata;
5659
private final Map<String, Object> ingestMetadata;
5760

61+
// Contains all pipelines that have been executed for this document
62+
private final Set<Pipeline> executedPipelines = Collections.newSetFromMap(new IdentityHashMap<>());
63+
5864
public IngestDocument(String index, String type, String id, String routing,
5965
Long version, VersionType versionType, Map<String, Object> source) {
6066
this.sourceAndMetadata = new HashMap<>();
@@ -632,6 +638,19 @@ private static Object deepCopy(Object value) {
632638
}
633639
}
634640

641+
/**
642+
* Executes the given pipeline with for this document unless the pipeline has already been executed
643+
* for this document.
644+
* @param pipeline Pipeline to execute
645+
* @throws Exception On exception in pipeline execution
646+
*/
647+
public void executePipeline(Pipeline pipeline) throws Exception {
648+
if (this.executedPipelines.add(pipeline) == false) {
649+
throw new IllegalStateException("Recursive invocation of pipeline [" + pipeline.getId() + "] detected.");
650+
}
651+
pipeline.execute(this);
652+
}
653+
635654
@Override
636655
public boolean equals(Object obj) {
637656
if (obj == this) { return true; }

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public IngestService(ClusterService clusterService, ThreadPool threadPool,
9292
threadPool.getThreadContext(), threadPool::relativeTimeInMillis,
9393
(delay, command) -> threadPool.schedule(
9494
TimeValue.timeValueMillis(delay), ThreadPool.Names.GENERIC, command
95-
)
95+
), this
9696
)
9797
);
9898
this.threadPool = threadPool;

server/src/main/java/org/elasticsearch/ingest/Processor.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,22 +97,26 @@ class Parameters {
9797
* instances that have run prior to in ingest.
9898
*/
9999
public final ThreadContext threadContext;
100-
100+
101101
public final LongSupplier relativeTimeSupplier;
102-
102+
103+
public final IngestService ingestService;
104+
103105
/**
104106
* Provides scheduler support
105107
*/
106108
public final BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler;
107109

108110
public Parameters(Environment env, ScriptService scriptService, AnalysisRegistry analysisRegistry, ThreadContext threadContext,
109-
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler) {
111+
LongSupplier relativeTimeSupplier, BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler,
112+
IngestService ingestService) {
110113
this.env = env;
111114
this.scriptService = scriptService;
112115
this.threadContext = threadContext;
113116
this.analysisRegistry = analysisRegistry;
114117
this.relativeTimeSupplier = relativeTimeSupplier;
115118
this.scheduler = scheduler;
119+
this.ingestService = ingestService;
116120
}
117121

118122
}

0 commit comments

Comments
 (0)