20
20
import org .elasticsearch .action .support .master .TransportMasterNodeAction ;
21
21
import org .elasticsearch .cluster .AckedClusterStateUpdateTask ;
22
22
import org .elasticsearch .cluster .ClusterState ;
23
+ import org .elasticsearch .cluster .ClusterStateTaskExecutor ;
24
+ import org .elasticsearch .cluster .ClusterStateTaskExecutor .ClusterTasksResult ;
23
25
import org .elasticsearch .cluster .block .ClusterBlockException ;
24
26
import org .elasticsearch .cluster .block .ClusterBlockLevel ;
25
27
import org .elasticsearch .cluster .metadata .ComposableIndexTemplate ;
29
31
import org .elasticsearch .cluster .metadata .MetadataCreateDataStreamService .CreateDataStreamClusterStateUpdateRequest ;
30
32
import org .elasticsearch .cluster .metadata .MetadataCreateIndexService ;
31
33
import org .elasticsearch .cluster .metadata .MetadataIndexTemplateService ;
34
+ import org .elasticsearch .cluster .routing .allocation .AllocationService ;
32
35
import org .elasticsearch .cluster .service .ClusterService ;
33
36
import org .elasticsearch .common .Priority ;
34
37
import org .elasticsearch .common .inject .Inject ;
@@ -69,6 +72,8 @@ public static final class TransportAction extends TransportMasterNodeAction<Crea
69
72
private final AutoCreateIndex autoCreateIndex ;
70
73
private final SystemIndices systemIndices ;
71
74
75
+ private final ClusterStateTaskExecutor <AckedClusterStateUpdateTask > executor ;
76
+
72
77
@ Inject
73
78
public TransportAction (
74
79
TransportService transportService ,
@@ -79,7 +84,8 @@ public TransportAction(
79
84
MetadataCreateIndexService createIndexService ,
80
85
MetadataCreateDataStreamService metadataCreateDataStreamService ,
81
86
AutoCreateIndex autoCreateIndex ,
82
- SystemIndices systemIndices
87
+ SystemIndices systemIndices ,
88
+ AllocationService allocationService
83
89
) {
84
90
super (
85
91
NAME ,
@@ -97,6 +103,20 @@ public TransportAction(
97
103
this .createIndexService = createIndexService ;
98
104
this .metadataCreateDataStreamService = metadataCreateDataStreamService ;
99
105
this .autoCreateIndex = autoCreateIndex ;
106
+ executor = (currentState , tasks ) -> {
107
+ ClusterTasksResult .Builder <AckedClusterStateUpdateTask > builder = ClusterTasksResult .builder ();
108
+ ClusterState state = currentState ;
109
+ for (AckedClusterStateUpdateTask task : tasks ) {
110
+ try {
111
+ state = task .execute (state );
112
+ builder .success (task );
113
+ } catch (Exception e ) {
114
+ builder .failure (task , e );
115
+ }
116
+ }
117
+ state = allocationService .reroute (state , "auto-create" );
118
+ return builder .build (state );
119
+ };
100
120
}
101
121
102
122
@ Override
@@ -122,149 +142,144 @@ protected void masterOperation(
122
142
finalListener .onResponse (new CreateIndexResponse (false , false , indexName ));
123
143
}
124
144
}, finalListener ::onFailure );
125
- clusterService .submitStateUpdateTask (
126
- "auto create [" + request .index () + "]" ,
127
- new AckedClusterStateUpdateTask (Priority .URGENT , request , listener ) {
145
+ // TODO: move this to custom class instead of AckedClusterStateUpdateTask
146
+ AckedClusterStateUpdateTask clusterTask = new AckedClusterStateUpdateTask (Priority .URGENT , request , listener ) {
147
+
148
+ @ Override
149
+ public ClusterState execute (ClusterState currentState ) throws Exception {
150
+ final SystemDataStreamDescriptor dataStreamDescriptor = systemIndices .validateDataStreamAccess (
151
+ request .index (),
152
+ threadPool .getThreadContext ()
153
+ );
154
+ final boolean isSystemDataStream = dataStreamDescriptor != null ;
155
+ final boolean isSystemIndex = isSystemDataStream == false && systemIndices .isSystemIndex (request .index ());
156
+ final ComposableIndexTemplate template = resolveTemplate (request , currentState .metadata ());
157
+ final boolean isDataStream = isSystemIndex == false
158
+ && (isSystemDataStream || (template != null && template .getDataStreamTemplate () != null ));
128
159
129
- @ Override
130
- public ClusterState execute (ClusterState currentState ) throws Exception {
131
- final SystemDataStreamDescriptor dataStreamDescriptor = systemIndices .validateDataStreamAccess (
160
+ if (isDataStream ) {
161
+ // This expression only evaluates to true when the argument is non-null and false
162
+ if (isSystemDataStream == false && Boolean .FALSE .equals (template .getAllowAutoCreate ())) {
163
+ throw new IndexNotFoundException (
164
+ "composable template " + template .indexPatterns () + " forbids index auto creation"
165
+ );
166
+ }
167
+
168
+ CreateDataStreamClusterStateUpdateRequest createRequest = new CreateDataStreamClusterStateUpdateRequest (
132
169
request .index (),
133
- threadPool .getThreadContext ()
170
+ dataStreamDescriptor ,
171
+ request .masterNodeTimeout (),
172
+ request .timeout ()
134
173
);
135
- final boolean isSystemDataStream = dataStreamDescriptor != null ;
136
- final boolean isSystemIndex = isSystemDataStream == false && systemIndices .isSystemIndex (request .index ());
137
- final ComposableIndexTemplate template = resolveTemplate (request , currentState .metadata ());
138
- final boolean isDataStream = isSystemIndex == false
139
- && (isSystemDataStream || (template != null && template .getDataStreamTemplate () != null ));
140
-
141
- if (isDataStream ) {
142
- // This expression only evaluates to true when the argument is non-null and false
143
- if (isSystemDataStream == false && Boolean .FALSE .equals (template .getAllowAutoCreate ())) {
144
- throw new IndexNotFoundException (
145
- "composable template " + template .indexPatterns () + " forbids index auto creation"
146
- );
174
+ ClusterState clusterState = metadataCreateDataStreamService .createDataStream (createRequest , currentState );
175
+ indexNameRef .set (clusterState .metadata ().dataStreams ().get (request .index ()).getIndices ().get (0 ).getName ());
176
+ return clusterState ;
177
+ } else {
178
+ String indexName = indexNameExpressionResolver .resolveDateMathExpression (request .index ());
179
+ indexNameRef .set (indexName );
180
+ if (isSystemIndex ) {
181
+ if (indexName .equals (request .index ()) == false ) {
182
+ throw new IllegalStateException ("system indices do not support date math expressions" );
147
183
}
148
-
149
- CreateDataStreamClusterStateUpdateRequest createRequest = new CreateDataStreamClusterStateUpdateRequest (
150
- request .index (),
151
- dataStreamDescriptor ,
152
- request .masterNodeTimeout (),
153
- request .timeout ()
154
- );
155
- ClusterState clusterState = metadataCreateDataStreamService .createDataStream (createRequest , currentState );
156
- indexNameRef .set (clusterState .metadata ().dataStreams ().get (request .index ()).getIndices ().get (0 ).getName ());
157
- return clusterState ;
158
184
} else {
159
- String indexName = indexNameExpressionResolver .resolveDateMathExpression (request .index ());
160
- indexNameRef .set (indexName );
161
- if (isSystemIndex ) {
162
- if (indexName .equals (request .index ()) == false ) {
163
- throw new IllegalStateException ("system indices do not support date math expressions" );
164
- }
165
- } else {
166
- // This will throw an exception if the index does not exist and creating it is prohibited
167
- final boolean shouldAutoCreate = autoCreateIndex .shouldAutoCreate (indexName , currentState );
168
-
169
- if (shouldAutoCreate == false ) {
170
- // The index already exists.
171
- return currentState ;
172
- }
185
+ // This will throw an exception if the index does not exist and creating it is prohibited
186
+ final boolean shouldAutoCreate = autoCreateIndex .shouldAutoCreate (indexName , currentState );
187
+
188
+ if (shouldAutoCreate == false ) {
189
+ // The index already exists.
190
+ return currentState ;
173
191
}
192
+ }
193
+
194
+ final SystemIndexDescriptor mainDescriptor = isSystemIndex ? systemIndices .findMatchingDescriptor (indexName ) : null ;
195
+ final boolean isManagedSystemIndex = mainDescriptor != null && mainDescriptor .isAutomaticallyManaged ();
174
196
175
- final SystemIndexDescriptor mainDescriptor = isSystemIndex
176
- ? systemIndices .findMatchingDescriptor (indexName )
177
- : null ;
178
- final boolean isManagedSystemIndex = mainDescriptor != null && mainDescriptor .isAutomaticallyManaged ();
179
-
180
- final CreateIndexClusterStateUpdateRequest updateRequest ;
181
-
182
- if (isManagedSystemIndex ) {
183
- final SystemIndexDescriptor descriptor = mainDescriptor .getDescriptorCompatibleWith (
184
- state .nodes ().getSmallestNonClientNodeVersion ()
185
- );
186
- if (descriptor == null ) {
187
- final String message = mainDescriptor .getMinimumNodeVersionMessage ("auto-create index" );
188
- logger .warn (message );
189
- throw new IllegalStateException (message );
190
- }
191
-
192
- updateRequest = buildSystemIndexUpdateRequest (indexName , descriptor );
193
- } else if (isSystemIndex ) {
194
- updateRequest = buildUpdateRequest (indexName );
195
-
196
- if (Objects .isNull (request .settings ())) {
197
- updateRequest .settings (SystemIndexDescriptor .DEFAULT_SETTINGS );
198
- } else if (false == request .settings ().hasValue (SETTING_INDEX_HIDDEN )) {
199
- updateRequest .settings (
200
- Settings .builder ().put (request .settings ()).put (SETTING_INDEX_HIDDEN , true ).build ()
201
- );
202
- } else if ("false" .equals (request .settings ().get (SETTING_INDEX_HIDDEN ))) {
203
- final String message = "Cannot auto-create system index ["
204
- + indexName
205
- + "] with [index.hidden] set to 'false'" ;
206
- logger .warn (message );
207
- throw new IllegalStateException (message );
208
- }
209
- } else {
210
- updateRequest = buildUpdateRequest (indexName );
197
+ final CreateIndexClusterStateUpdateRequest updateRequest ;
198
+
199
+ if (isManagedSystemIndex ) {
200
+ final SystemIndexDescriptor descriptor = mainDescriptor .getDescriptorCompatibleWith (
201
+ state .nodes ().getSmallestNonClientNodeVersion ()
202
+ );
203
+ if (descriptor == null ) {
204
+ final String message = mainDescriptor .getMinimumNodeVersionMessage ("auto-create index" );
205
+ logger .warn (message );
206
+ throw new IllegalStateException (message );
211
207
}
212
208
213
- return createIndexService .applyCreateIndexRequest (currentState , updateRequest , false );
209
+ updateRequest = buildSystemIndexUpdateRequest (indexName , descriptor );
210
+ } else if (isSystemIndex ) {
211
+ updateRequest = buildUpdateRequest (indexName );
212
+
213
+ if (Objects .isNull (request .settings ())) {
214
+ updateRequest .settings (SystemIndexDescriptor .DEFAULT_SETTINGS );
215
+ } else if (false == request .settings ().hasValue (SETTING_INDEX_HIDDEN )) {
216
+ updateRequest .settings (Settings .builder ().put (request .settings ()).put (SETTING_INDEX_HIDDEN , true ).build ());
217
+ } else if ("false" .equals (request .settings ().get (SETTING_INDEX_HIDDEN ))) {
218
+ final String message = "Cannot auto-create system index ["
219
+ + indexName
220
+ + "] with [index.hidden] set to 'false'" ;
221
+ logger .warn (message );
222
+ throw new IllegalStateException (message );
223
+ }
224
+ } else {
225
+ updateRequest = buildUpdateRequest (indexName );
214
226
}
215
- }
216
227
217
- private CreateIndexClusterStateUpdateRequest buildUpdateRequest (String indexName ) {
218
- CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest (
219
- request .cause (),
220
- indexName ,
221
- request .index ()
222
- ).ackTimeout (request .timeout ()).masterNodeTimeout (request .masterNodeTimeout ());
223
- logger .debug ("Auto-creating index {}" , indexName );
224
- return updateRequest ;
228
+ return createIndexService .applyCreateIndexRequest (currentState , updateRequest , false );
225
229
}
230
+ }
226
231
227
- private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest (
228
- String indexName ,
229
- SystemIndexDescriptor descriptor
230
- ) {
231
- String mappings = descriptor .getMappings ();
232
- Settings settings = descriptor .getSettings ();
233
- String aliasName = descriptor .getAliasName ();
232
+ private CreateIndexClusterStateUpdateRequest buildUpdateRequest (String indexName ) {
233
+ CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest (
234
+ request .cause (),
235
+ indexName ,
236
+ request .index ()
237
+ ).ackTimeout (request .timeout ()).performReroute (false ).masterNodeTimeout (request .masterNodeTimeout ());
238
+ logger .debug ("Auto-creating index {}" , indexName );
239
+ return updateRequest ;
240
+ }
234
241
235
- // if we are writing to the alias name, we should create the primary index here
236
- String concreteIndexName = indexName .equals (aliasName ) ? descriptor .getPrimaryIndex () : indexName ;
242
+ private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest (
243
+ String indexName ,
244
+ SystemIndexDescriptor descriptor
245
+ ) {
246
+ String mappings = descriptor .getMappings ();
247
+ Settings settings = descriptor .getSettings ();
248
+ String aliasName = descriptor .getAliasName ();
237
249
238
- CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest (
239
- request .cause (),
240
- concreteIndexName ,
241
- request .index ()
242
- ).ackTimeout (request .timeout ()).masterNodeTimeout (request .masterNodeTimeout ());
250
+ // if we are writing to the alias name, we should create the primary index here
251
+ String concreteIndexName = indexName .equals (aliasName ) ? descriptor .getPrimaryIndex () : indexName ;
243
252
244
- updateRequest .waitForActiveShards (ActiveShardCount .ALL );
253
+ CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest (
254
+ request .cause (),
255
+ concreteIndexName ,
256
+ request .index ()
257
+ ).ackTimeout (request .timeout ()).masterNodeTimeout (request .masterNodeTimeout ()).performReroute (false );
245
258
246
- if (mappings != null ) {
247
- updateRequest .mappings (mappings );
248
- }
249
- if (settings != null ) {
250
- updateRequest .settings (settings );
251
- }
252
- if (aliasName != null ) {
253
- updateRequest .aliases (Set .of (new Alias (aliasName ).isHidden (true )));
254
- }
259
+ updateRequest .waitForActiveShards (ActiveShardCount .ALL );
255
260
256
- if (logger .isDebugEnabled ()) {
257
- if (concreteIndexName .equals (indexName ) == false ) {
258
- logger .debug ("Auto-creating backing system index {} for alias {}" , concreteIndexName , indexName );
259
- } else {
260
- logger .debug ("Auto-creating system index {}" , concreteIndexName );
261
- }
262
- }
261
+ if (mappings != null ) {
262
+ updateRequest .mappings (mappings );
263
+ }
264
+ if (settings != null ) {
265
+ updateRequest .settings (settings );
266
+ }
267
+ if (aliasName != null ) {
268
+ updateRequest .aliases (Set .of (new Alias (aliasName ).isHidden (true )));
269
+ }
263
270
264
- return updateRequest ;
271
+ if (logger .isDebugEnabled ()) {
272
+ if (concreteIndexName .equals (indexName ) == false ) {
273
+ logger .debug ("Auto-creating backing system index {} for alias {}" , concreteIndexName , indexName );
274
+ } else {
275
+ logger .debug ("Auto-creating system index {}" , concreteIndexName );
276
+ }
265
277
}
278
+
279
+ return updateRequest ;
266
280
}
267
- );
281
+ };
282
+ clusterService .submitStateUpdateTask ("auto create [" + request .index () + "]" , clusterTask , clusterTask , executor , clusterTask );
268
283
}
269
284
270
285
@ Override
0 commit comments