Skip to content

Commit 7f0be38

Browse files
authored
Merge pull request #160 from viqtor/space-v-tab
Add support for pipelines
2 parents 8c5c369 + efb8c56 commit 7f0be38

File tree

9 files changed

+338
-1
lines changed

9 files changed

+338
-1
lines changed

README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,34 @@ By default, Beyonder will not overwrite a template if it already exists.
204204
This can be overridden by setting `force` to `true` in the expanded factory method
205205
`ElasticsearchBeyonder.start()`.
206206

207+
Managing pipelines
208+
------------------
209+
210+
A pipeline is a definition of a series of processors that are to be executed in the same order as they are declared while
211+
documents are being indexed. Please note that this feature is only supported when you use the REST client not the Transport client.
212+
213+
For example, setting one fields value based on another field by using an Set Processor you an add a file named `elasticsearch/_pipeline/set_field_processor`
214+
in your project:
215+
216+
```javascript
217+
{
218+
"description" : "Twitter pipeline",
219+
"processors" : [
220+
{
221+
"set" : {
222+
"field": "copy",
223+
"value": "{{otherField}}"
224+
}
225+
}
226+
]
227+
}
228+
```
229+
230+
By default, Beyonder will not overwrite a pipeline if it already exists.
231+
This can be overridden by setting `force` to `true` in the expanded factory method
232+
`ElasticsearchBeyonder.start()`.
233+
234+
207235
Why this name?
208236
==============
209237

src/main/java/fr/pilato/elasticsearch/tools/ElasticsearchBeyonder.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import fr.pilato.elasticsearch.tools.SettingsFinder.Defaults;
2323
import fr.pilato.elasticsearch.tools.index.IndexFinder;
2424
import fr.pilato.elasticsearch.tools.template.TemplateFinder;
25+
import fr.pilato.elasticsearch.tools.pipeline.PipelineFinder;
2526
import org.elasticsearch.client.Client;
2627
import org.elasticsearch.client.RestClient;
2728
import org.slf4j.Logger;
@@ -33,6 +34,7 @@
3334
import static fr.pilato.elasticsearch.tools.index.IndexElasticsearchUpdater.createIndex;
3435
import static fr.pilato.elasticsearch.tools.index.IndexElasticsearchUpdater.updateSettings;
3536
import static fr.pilato.elasticsearch.tools.template.TemplateElasticsearchUpdater.createTemplate;
37+
import static fr.pilato.elasticsearch.tools.pipeline.PipelineElasticsearchUpdater.createPipeline;
3638

3739
/**
3840
* By default, indexes are created with their default Elasticsearch settings. You can specify
@@ -109,6 +111,12 @@ public static void start(RestClient client, String root, boolean merge, boolean
109111
createTemplate(client, root, templateName, force);
110112
}
111113

114+
// create pipelines
115+
List<String> pipelineNames = PipelineFinder.findPipelines(root);
116+
for (String pipelineName : pipelineNames) {
117+
createPipeline(client, root, pipelineName, force);
118+
}
119+
112120
// create indices
113121
Collection<String> indexNames = IndexFinder.findIndexNames(root);
114122
for (String indexName : indexNames) {

src/main/java/fr/pilato/elasticsearch/tools/SettingsFinder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public static class Defaults {
4646
public static String IndexSettingsFileName = "_settings.json";
4747
public static String UpdateIndexSettingsFileName = "_update_settings.json";
4848
public static String TemplateDir = "_template";
49+
public static String PipelineDir = "_pipeline";
4950

5051
/**
5152
* Default setting of whether or not to merge mappings on start.

src/main/java/fr/pilato/elasticsearch/tools/index/IndexFinder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public static List<String> findIndexNames(final String root) throws IOException,
7070
} else {
7171
key = resource;
7272
}
73-
if (!key.equals(Defaults.TemplateDir) && !keys.contains(key)) {
73+
if (!key.equals(Defaults.TemplateDir) && !key.equals(Defaults.PipelineDir) && !keys.contains(key)) {
7474
logger.trace(" - found [{}].", key);
7575
keys.add(key);
7676
indexNames.add(key);
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to David Pilato (the "Author") under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Author licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package fr.pilato.elasticsearch.tools.pipeline;
21+
22+
import java.io.IOException;
23+
24+
import org.elasticsearch.client.Request;
25+
import org.elasticsearch.client.Response;
26+
import org.elasticsearch.client.ResponseException;
27+
import org.elasticsearch.client.RestClient;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
/**
32+
* Handles ingest pipeline creation.
33+
*
34+
* @author hjk181
35+
*
36+
*/
37+
public class PipelineElasticsearchUpdater {
38+
39+
private static final Logger logger = LoggerFactory.getLogger(PipelineElasticsearchUpdater.class);
40+
41+
/**
42+
* Create a pipeline in Elasticsearch.
43+
*
44+
* @param client Elasticsearch client
45+
* @param root dir within the classpath
46+
* @param pipeline the id of the pipeline
47+
* @param force set it to true if you want to force cleaning pipeline before adding it
48+
* @throws Exception if something goes wrong
49+
*/
50+
public static void createPipeline(RestClient client, String root, String pipeline, boolean force) throws Exception {
51+
String json = PipelineSettingsReader.readPipeline(root, pipeline);
52+
createPipelineWithJson(client, pipeline, json, force);
53+
}
54+
55+
/**
56+
* Create a pipeline in Elasticsearch. Read content from default classpath dir.
57+
*
58+
* @param client Elasticsearch client
59+
* @param pipeline the id of the pipeline
60+
* @param force set it to true if you want to force cleaning pipeline before adding it
61+
* @throws Exception if something goes wrong
62+
*/
63+
public static void createPipeline(RestClient client, String pipeline, boolean force) throws Exception {
64+
String json = PipelineSettingsReader.readPipeline(pipeline);
65+
createPipelineWithJson(client, pipeline, json, force);
66+
}
67+
68+
/**
69+
* Create a new pipeline in Elasticsearch
70+
*
71+
* @param client Elasticsearch client
72+
* @param pipeline the id of the pipeline
73+
* @param json JSon content for the pipeline
74+
* @param force set it to true if you want to force cleaning pipeline before adding it
75+
* @throws Exception if something goes wrong
76+
*/
77+
public static void createPipelineWithJson(RestClient client, String pipeline, String json, boolean force) throws Exception {
78+
if (isPipelineExist(client, pipeline)) {
79+
if (force) {
80+
logger.debug("Pipeline [{}] already exists. Force is set. Overriding it.", pipeline);
81+
createPipelineWithJsonInElasticsearch(client, pipeline, json);
82+
}
83+
else {
84+
logger.debug("Pipeline [{}] already exists.", pipeline);
85+
}
86+
}
87+
88+
if (!isPipelineExist(client, pipeline)) {
89+
logger.debug("Pipeline [{}] doesn't exist. Creating it.", pipeline);
90+
createPipelineWithJsonInElasticsearch(client, pipeline, json);
91+
}
92+
}
93+
94+
/**
95+
* Create a new pipeline in Elasticsearch.
96+
*
97+
* @param client Elasticsearch client
98+
* @param pipeline the id of the pipeline
99+
* @param json JSon content for the pipeline
100+
* @throws Exception if something goes wrong
101+
*/
102+
private static void createPipelineWithJsonInElasticsearch(RestClient client, String pipeline, String json) throws Exception {
103+
logger.trace("createPipeline([{}])", pipeline);
104+
105+
assert client != null;
106+
assert pipeline != null;
107+
108+
Request request = new Request("PUT", "/_ingest/pipeline/" + pipeline);
109+
request.setJsonEntity(json);
110+
Response response = client.performRequest(request);
111+
112+
if (response.getStatusLine().getStatusCode() != 200) {
113+
logger.warn("Could not create pipeline [{}]", pipeline);
114+
throw new Exception("Could not create pipeline [" + pipeline + "].");
115+
}
116+
117+
logger.trace("/createPipeline([{}])", pipeline);
118+
}
119+
120+
/**
121+
* Check if a pipeline exists
122+
*
123+
* @param client Elasticsearch client
124+
* @param pipeline the id of the pipeline
125+
* @return true if the pipeline exists
126+
* @throws IOException if something goes wrong
127+
*/
128+
public static boolean isPipelineExist(RestClient client, String pipeline) throws IOException {
129+
try {
130+
Response response = client.performRequest(new Request("GET", "/_ingest/pipeline/" + pipeline));
131+
return response.getEntity() != null;
132+
}
133+
catch (ResponseException e) {
134+
if (404 != e.getResponse().getStatusLine().getStatusCode()) {
135+
throw e;
136+
}
137+
}
138+
return false;
139+
}
140+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to David Pilato (the "Author") under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Author licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package fr.pilato.elasticsearch.tools.pipeline;
21+
22+
import java.io.IOException;
23+
import java.net.URISyntaxException;
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import fr.pilato.elasticsearch.tools.ResourceList;
31+
import fr.pilato.elasticsearch.tools.SettingsFinder;
32+
import fr.pilato.elasticsearch.tools.template.TemplateFinder;
33+
34+
/**
35+
* Findes ingest pipelines on the classpath.
36+
*
37+
* @author hjk181
38+
*
39+
*/
40+
public class PipelineFinder extends SettingsFinder {
41+
42+
private static final Logger logger = LoggerFactory.getLogger(TemplateFinder.class);
43+
44+
/**
45+
* Find all pipelines in default classpath dir
46+
*
47+
* @return a list of pipelines
48+
* @throws IOException if connection with elasticsearch is failing
49+
* @throws URISyntaxException this should not happen
50+
*/
51+
public static List<String> findPipelines() throws IOException, URISyntaxException {
52+
return findPipelines(Defaults.ConfigDir);
53+
}
54+
55+
/**
56+
* Find all pipelines
57+
*
58+
* @param root dir within the classpath
59+
* @return a list of pipelines
60+
* @throws IOException if connection with elasticsearch is failing
61+
* @throws URISyntaxException this should not happen
62+
*/
63+
public static List<String> findPipelines(String root) throws IOException, URISyntaxException {
64+
if (root == null) {
65+
return findPipelines();
66+
}
67+
68+
logger.debug("Looking for pipelines in classpath under [{}].", root);
69+
70+
final List<String> pipelineNames = new ArrayList<>();
71+
String[] resources = ResourceList.getResources(root + "/" + Defaults.PipelineDir + "/"); // "es/_pipeline/"
72+
for (String resource : resources) {
73+
if (!resource.isEmpty()) {
74+
String withoutIndex = resource.substring(resource.indexOf("/") + 1);
75+
String pipeline = withoutIndex.substring(0, withoutIndex.indexOf(Defaults.JsonFileExtension));
76+
logger.trace(" - found [{}].", pipeline);
77+
pipelineNames.add(pipeline);
78+
}
79+
}
80+
81+
return pipelineNames;
82+
}
83+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to David Pilato (the "Author") under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Author licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package fr.pilato.elasticsearch.tools.pipeline;
21+
22+
import java.io.IOException;
23+
24+
import fr.pilato.elasticsearch.tools.SettingsFinder.Defaults;
25+
import fr.pilato.elasticsearch.tools.SettingsReader;
26+
27+
/**
28+
* Manage elasticsearch pipeline files.
29+
*
30+
* @author hjk181
31+
*/
32+
public class PipelineSettingsReader extends SettingsReader {
33+
34+
/**
35+
* Read a pipeline
36+
* @param root dir within the classpath
37+
* @param pipeline the id of the pipeline (.json will be appended)
38+
* @return The pipeline content
39+
* @throws IOException if the connection with elasticsearch is failing
40+
*/
41+
public static String readPipeline(String root, String pipeline) throws IOException {
42+
if (root == null) {
43+
return readPipeline(pipeline);
44+
}
45+
String settingsFile = root + "/" + Defaults.PipelineDir + "/" + pipeline + Defaults.JsonFileExtension;
46+
return readFileFromClasspath(settingsFile);
47+
}
48+
49+
/**
50+
* Read a pipeline in default classpath dir
51+
* @param pipeline the id of the pipeline (.json will be appended)
52+
* @return The pipeline content
53+
* @throws IOException if the connection with elasticsearch is failing
54+
*/
55+
public static String readPipeline(String pipeline) throws IOException {
56+
return readPipeline(Defaults.ConfigDir, pipeline);
57+
}
58+
}

src/test/java/fr/pilato/elasticsearch/tools/BeyonderRestIT.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import fr.pilato.elasticsearch.tools.alias.AliasElasticsearchUpdater;
2323
import fr.pilato.elasticsearch.tools.index.IndexElasticsearchUpdater;
24+
import fr.pilato.elasticsearch.tools.pipeline.PipelineElasticsearchUpdater;
25+
2426
import org.apache.commons.io.IOUtils;
2527
import org.apache.http.HttpEntity;
2628
import org.elasticsearch.client.Request;
@@ -142,6 +144,12 @@ public void testForceEnabled() throws Exception {
142144
String newMapping = getMapping("twitter");
143145
assertThat(newMapping, is(not(oldMapping)));
144146
}
147+
148+
@Test
149+
public void testPipeline() throws Exception {
150+
ElasticsearchBeyonder.start(client, "models/pipeline");
151+
assertThat(PipelineElasticsearchUpdater.isPipelineExist(client, "twitter_pipeline"), is(true));
152+
}
145153

146154
private String getMapping(String indexName) throws IOException {
147155
HttpEntity response = client.performRequest(new Request("GET", indexName + "/_mapping")).getEntity();

0 commit comments

Comments
 (0)