|
7 | 7 |
|
8 | 8 | import org.elasticsearch.action.ActionListener;
|
9 | 9 | import org.elasticsearch.action.DocWriteResponse;
|
| 10 | +import org.elasticsearch.action.index.IndexRequest; |
| 11 | +import org.elasticsearch.action.index.IndexResponse; |
10 | 12 | import org.elasticsearch.action.support.ActionFilters;
|
11 | 13 | import org.elasticsearch.action.support.WriteRequest;
|
12 | 14 | import org.elasticsearch.action.update.UpdateRequest;
|
@@ -97,24 +99,44 @@ protected void masterOperation(PutWatchRequest request, ClusterState state,
|
97 | 99 | try (XContentBuilder builder = jsonBuilder()) {
|
98 | 100 | watch.toXContent(builder, DEFAULT_PARAMS);
|
99 | 101 |
|
100 |
| - UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId()); |
101 |
| - updateRequest.docAsUpsert(isUpdate == false); |
102 |
| - updateRequest.version(request.getVersion()); |
103 |
| - updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); |
104 |
| - updateRequest.doc(builder); |
| 102 | + if (isUpdate) { |
| 103 | + UpdateRequest updateRequest = new UpdateRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId()); |
| 104 | + updateRequest.version(request.getVersion()); |
| 105 | + updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); |
| 106 | + updateRequest.doc(builder); |
105 | 107 |
|
106 |
| - executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest, |
107 |
| - ActionListener.<UpdateResponse>wrap(response -> { |
| 108 | + executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, updateRequest, |
| 109 | + ActionListener.<UpdateResponse>wrap(response -> { |
| 110 | + boolean created = response.getResult() == DocWriteResponse.Result.CREATED; |
| 111 | + if (shouldBeTriggeredLocally(request, watch)) { |
| 112 | + triggerService.add(watch); |
| 113 | + } |
| 114 | + listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created)); |
| 115 | + }, listener::onFailure), |
| 116 | + client::update); |
| 117 | + } else { |
| 118 | + IndexRequest indexRequest = new IndexRequest(Watch.INDEX, Watch.DOC_TYPE, request.getId()); |
| 119 | + indexRequest.source(builder); |
| 120 | + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); |
| 121 | + executeAsyncWithOrigin(client.threadPool().getThreadContext(), WATCHER_ORIGIN, indexRequest, |
| 122 | + ActionListener.<IndexResponse>wrap(response -> { |
108 | 123 | boolean created = response.getResult() == DocWriteResponse.Result.CREATED;
|
109 |
| - if (localExecute(request) == false && watch.status().state().isActive()) { |
| 124 | + // if not yet in distributed mode (mixed 5/6 version in cluster), only trigger on the master node |
| 125 | + if (shouldBeTriggeredLocally(request, watch)) { |
110 | 126 | triggerService.add(watch);
|
111 | 127 | }
|
112 | 128 | listener.onResponse(new PutWatchResponse(response.getId(), response.getVersion(), created));
|
113 | 129 | }, listener::onFailure),
|
114 |
| - client::update); |
| 130 | + client::index); |
| 131 | + } |
115 | 132 | }
|
116 | 133 | } catch (Exception e) {
|
117 | 134 | listener.onFailure(e);
|
118 | 135 | }
|
119 | 136 | }
|
| 137 | + |
| 138 | + private boolean shouldBeTriggeredLocally(PutWatchRequest request, Watch watch) { |
| 139 | + return localExecute(request) == false && |
| 140 | + watch.status().state().isActive(); |
| 141 | + } |
120 | 142 | }
|
0 commit comments