Skip to content

Commit aa60559

Browse files
committed
[ML] addresses preview bug, and adds check to PUT
1 parent 482bd98 commit aa60559

File tree

5 files changed

+208
-16
lines changed

5 files changed

+208
-16
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameMessages.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ public class DataFrameMessages {
2121
"Failed to validate data frame configuration";
2222
public static final String REST_PUT_DATA_FRAME_FAILED_PERSIST_TRANSFORM_CONFIGURATION = "Failed to persist data frame configuration";
2323
public static final String REST_PUT_DATA_FRAME_FAILED_TO_DEDUCE_DEST_MAPPINGS = "Failed to deduce dest mappings";
24-
public static final String REST_PUT_DATA_FRAME_FAILED_TO_CREATE_DEST_INDEX = "Failed to create dest index";
2524
public static final String REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING = "Source index [{0}] does not exist";
25+
public static final String REST_PUT_DATA_FRAME_DEST_IN_SOURCE = "Destination index [{0}] is included in source expression [{1}]";
26+
public static final String REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX = "Destination index [{0}] should refer to a single index";
2627
public static final String REST_PUT_DATA_FRAME_INCONSISTENT_ID =
2728
"Inconsistent id; ''{0}'' specified in the body differs from ''{1}'' specified as a URL argument";
2829

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPreviewDataFrameTransformAction.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,27 @@
66

77
package org.elasticsearch.xpack.dataframe.action;
88

9+
import org.elasticsearch.ElasticsearchStatusException;
910
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.action.search.SearchAction;
1112
import org.elasticsearch.action.support.ActionFilters;
1213
import org.elasticsearch.action.support.HandledTransportAction;
14+
import org.elasticsearch.action.support.IndicesOptions;
1315
import org.elasticsearch.client.Client;
16+
import org.elasticsearch.cluster.ClusterState;
17+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
18+
import org.elasticsearch.cluster.service.ClusterService;
1419
import org.elasticsearch.common.inject.Inject;
15-
import org.elasticsearch.common.io.stream.Writeable;
1620
import org.elasticsearch.license.LicenseUtils;
1721
import org.elasticsearch.license.XPackLicenseState;
22+
import org.elasticsearch.rest.RestStatus;
1823
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
1924
import org.elasticsearch.tasks.Task;
2025
import org.elasticsearch.threadpool.ThreadPool;
2126
import org.elasticsearch.transport.TransportService;
2227
import org.elasticsearch.xpack.core.ClientHelper;
2328
import org.elasticsearch.xpack.core.XPackField;
29+
import org.elasticsearch.xpack.core.dataframe.DataFrameMessages;
2430
import org.elasticsearch.xpack.core.dataframe.action.PreviewDataFrameTransformAction;
2531
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
2632
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
@@ -40,15 +46,20 @@ public class TransportPreviewDataFrameTransformAction extends
4046
private final XPackLicenseState licenseState;
4147
private final Client client;
4248
private final ThreadPool threadPool;
49+
private final IndexNameExpressionResolver indexNameExpressionResolver;
50+
private final ClusterService clusterService;
4351

4452
@Inject
4553
public TransportPreviewDataFrameTransformAction(TransportService transportService, ActionFilters actionFilters,
46-
Client client, ThreadPool threadPool, XPackLicenseState licenseState) {
47-
super(PreviewDataFrameTransformAction.NAME,transportService, actionFilters,
48-
(Writeable.Reader<PreviewDataFrameTransformAction.Request>) PreviewDataFrameTransformAction.Request::new);
54+
Client client, ThreadPool threadPool, XPackLicenseState licenseState,
55+
IndexNameExpressionResolver indexNameExpressionResolver,
56+
ClusterService clusterService) {
57+
super(PreviewDataFrameTransformAction.NAME,transportService, actionFilters, PreviewDataFrameTransformAction.Request::new);
4958
this.licenseState = licenseState;
5059
this.client = client;
5160
this.threadPool = threadPool;
61+
this.clusterService = clusterService;
62+
this.indexNameExpressionResolver = indexNameExpressionResolver;
5263
}
5364

5465
@Override
@@ -60,7 +71,18 @@ protected void doExecute(Task task,
6071
return;
6172
}
6273

74+
ClusterState clusterState = clusterService.state();
75+
6376
final DataFrameTransformConfig config = request.getConfig();
77+
for(String src : config.getSource().getIndex()) {
78+
String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src);
79+
if (concreteNames.length == 0) {
80+
listener.onFailure(new ElasticsearchStatusException(
81+
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src),
82+
RestStatus.BAD_REQUEST));
83+
return;
84+
}
85+
}
6486

6587
Pivot pivot = new Pivot(config.getPivotConfig());
6688

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.cluster.service.ClusterService;
2424
import org.elasticsearch.common.Strings;
2525
import org.elasticsearch.common.inject.Inject;
26+
import org.elasticsearch.common.regex.Regex;
2627
import org.elasticsearch.common.settings.Settings;
2728
import org.elasticsearch.common.xcontent.XContentBuilder;
2829
import org.elasticsearch.common.xcontent.json.JsonXContent;
@@ -51,7 +52,12 @@
5152
import org.elasticsearch.xpack.dataframe.transforms.pivot.Pivot;
5253

5354
import java.io.IOException;
55+
import java.util.ArrayList;
56+
import java.util.Arrays;
57+
import java.util.HashSet;
58+
import java.util.List;
5459
import java.util.Map;
60+
import java.util.Set;
5561
import java.util.stream.Collectors;
5662

5763
public class TransportPutDataFrameTransformAction
@@ -114,14 +120,54 @@ protected void masterOperation(Request request, ClusterState clusterState, Actio
114120
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_TRANSFORM_EXISTS, transformId)));
115121
return;
116122
}
117-
123+
final String destIndex = config.getDestination().getIndex();
124+
Set<String> concreteSourceIndexNames = new HashSet<>();
118125
for(String src : config.getSource().getIndex()) {
119-
if (indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src).length == 0) {
126+
String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src);
127+
if (concreteNames.length == 0) {
120128
listener.onFailure(new ElasticsearchStatusException(
121129
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src),
122130
RestStatus.BAD_REQUEST));
123131
return;
124132
}
133+
if (Regex.simpleMatch(src, destIndex)) {
134+
listener.onFailure(new ElasticsearchStatusException(
135+
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, destIndex, src),
136+
RestStatus.BAD_REQUEST
137+
));
138+
return;
139+
}
140+
concreteSourceIndexNames.addAll(Arrays.asList(concreteNames));
141+
}
142+
143+
if (concreteSourceIndexNames.contains(destIndex)) {
144+
listener.onFailure(new ElasticsearchStatusException(
145+
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
146+
destIndex,
147+
Strings.arrayToCommaDelimitedString(config.getSource().getIndex())),
148+
RestStatus.BAD_REQUEST
149+
));
150+
return;
151+
}
152+
153+
final String[] concreteDest =
154+
indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destIndex);
155+
156+
if (concreteDest.length > 1 || Regex.isSimpleMatchPattern(destIndex)) {
157+
listener.onFailure(new ElasticsearchStatusException(
158+
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX, destIndex),
159+
RestStatus.BAD_REQUEST
160+
));
161+
return;
162+
}
163+
if (concreteDest.length > 0 && concreteSourceIndexNames.contains(concreteDest[0])) {
164+
listener.onFailure(new ElasticsearchStatusException(
165+
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
166+
concreteDest[0],
167+
Strings.arrayToCommaDelimitedString(concreteSourceIndexNames.toArray(new String[0]))),
168+
RestStatus.BAD_REQUEST
169+
));
170+
return;
125171
}
126172

127173
// Early check to verify that the user can create the destination index and can read from the source
@@ -131,18 +177,16 @@ protected void masterOperation(Request request, ClusterState clusterState, Actio
131177
.indices(config.getSource().getIndex())
132178
.privileges("read")
133179
.build();
134-
String[] destPrivileges = new String[3];
135-
destPrivileges[0] = "read";
136-
destPrivileges[1] = "index";
180+
List<String> destPrivileges = new ArrayList<>(3);
181+
destPrivileges.add("read");
182+
destPrivileges.add("index");
137183
// If the destination index does not exist, we can assume that we may have to create it on start.
138184
// We should check that the creating user has the privileges to create the index.
139-
if (indexNameExpressionResolver.concreteIndexNames(clusterState,
140-
IndicesOptions.lenientExpandOpen(),
141-
config.getDestination().getIndex()).length == 0) {
142-
destPrivileges[2] = "create_index";
185+
if (concreteDest.length == 0) {
186+
destPrivileges.add("create_index");
143187
}
144188
RoleDescriptor.IndicesPrivileges destIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
145-
.indices(config.getDestination().getIndex())
189+
.indices(destIndex)
146190
.privileges(destPrivileges)
147191
.build();
148192

@@ -151,7 +195,6 @@ protected void masterOperation(Request request, ClusterState clusterState, Actio
151195
privRequest.username(username);
152196
privRequest.clusterPrivileges(Strings.EMPTY_ARRAY);
153197
privRequest.indexPrivileges(sourceIndexPrivileges, destIndexPrivileges);
154-
155198
ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
156199
r -> handlePrivsResponse(username, config, r, listener),
157200
listener::onFailure);

x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/preview_transforms.yml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,18 @@ setup:
102102
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
103103
}
104104
}
105+
---
106+
"Test preview with non-existing source index":
107+
- do:
108+
catch: /Source index \[does_not_exist\] does not exist/
109+
data_frame.preview_data_frame_transform:
110+
body: >
111+
{
112+
"source": { "index": ["airline-data", "does_not_exist"] },
113+
"pivot": {
114+
"group_by": {
115+
"airline": {"terms": {"field": "airline"}},
116+
"by-hour": {"date_histogram": {"interval": "1h", "field": "time", "format": "yyyy-MM-DD HH"}}},
117+
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
118+
}
119+
}

x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,4 +190,115 @@ setup:
190190
transform_id: "_all"
191191
from: 0
192192
size: 10000
193+
---
194+
"Test transform where dest is included in source":
195+
- do:
196+
catch: /Destination index \[airline-data-by-airline\] is included in source expression \[airline-data/
197+
data_frame.put_data_frame_transform:
198+
transform_id: "airline-transform"
199+
body: >
200+
{
201+
"source": {
202+
"index": ["airline-data*"]
203+
},
204+
"dest": { "index": "airline-data-by-airline" },
205+
"pivot": {
206+
"group_by": { "airline": {"terms": {"field": "airline"}}},
207+
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
208+
}
209+
}
210+
---
211+
"Test transform where dest is a simple index pattern":
212+
- do:
213+
catch: /Destination index .* should refer to a single index/
214+
data_frame.put_data_frame_transform:
215+
transform_id: "airline-transform"
216+
body: >
217+
{
218+
"source": {
219+
"index": ["airline-data*"]
220+
},
221+
"dest": { "index": "destination*" },
222+
"pivot": {
223+
"group_by": { "airline": {"terms": {"field": "airline"}}},
224+
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
225+
}
226+
}
227+
---
228+
"Test alias scenarios":
229+
- do:
230+
indices.create:
231+
index: created-destination-index
232+
- do:
233+
indices.create:
234+
index: second-created-destination-index
235+
- do:
236+
indices.put_alias:
237+
index: airline-data
238+
name: source-index
239+
- do:
240+
indices.put_alias:
241+
index: created-destination-index
242+
name: dest-index
243+
- do:
244+
data_frame.put_data_frame_transform:
245+
transform_id: "transform-from-aliases"
246+
body: >
247+
{
248+
"source": {
249+
"index": "source-index"
250+
},
251+
"dest": { "index": "dest-index" },
252+
"pivot": {
253+
"group_by": { "airline": {"terms": {"field": "airline"}}},
254+
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
255+
}
256+
}
257+
- match: { acknowledged: true }
258+
259+
- do:
260+
indices.put_alias:
261+
index: created-destination-index
262+
name: source-index
263+
264+
- do:
265+
catch: /Destination index \[created-destination-index\] is included in source expression \[airline-data,created-destination-index\]/
266+
data_frame.put_data_frame_transform:
267+
transform_id: "transform-from-aliases-failures"
268+
body: >
269+
{
270+
"source": {
271+
"index": "source-index"
272+
},
273+
"dest": { "index": "dest-index" },
274+
"pivot": {
275+
"group_by": { "airline": {"terms": {"field": "airline"}}},
276+
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
277+
}
278+
}
279+
280+
- do:
281+
indices.delete_alias:
282+
index: created-destination-index
283+
name: source-index
193284

285+
- do:
286+
indices.put_alias:
287+
index: second-created-destination-index
288+
name: dest-index
289+
290+
- do:
291+
catch: /Destination index \[dest-index\] should refer to a single index/
292+
data_frame.put_data_frame_transform:
293+
transform_id: "airline-transform"
294+
body: >
295+
{
296+
"source": {
297+
"index": ["source-index"]
298+
},
299+
"dest": { "index": "dest-index" },
300+
"pivot": {
301+
"group_by": { "airline": {"terms": {"field": "airline"}}},
302+
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
303+
}
304+
}

0 commit comments

Comments
 (0)