4
4
*/
5
5
package org .opensearch .geospatial .ip2geo .processor ;
6
6
7
- import static org .opensearch .cluster .service .ClusterApplierService .CLUSTER_UPDATE_THREAD_NAME ;
8
- import static org .opensearch .ingest .ConfigurationUtils .newConfigurationException ;
9
7
import static org .opensearch .ingest .ConfigurationUtils .readBooleanProperty ;
10
8
import static org .opensearch .ingest .ConfigurationUtils .readOptionalList ;
11
9
import static org .opensearch .ingest .ConfigurationUtils .readStringProperty ;
29
27
import org .opensearch .geospatial .ip2geo .common .DatasourceState ;
30
28
import org .opensearch .geospatial .ip2geo .common .GeoIpDataFacade ;
31
29
import org .opensearch .geospatial .ip2geo .jobscheduler .Datasource ;
30
+ import org .opensearch .index .IndexNotFoundException ;
32
31
import org .opensearch .ingest .AbstractProcessor ;
33
32
import org .opensearch .ingest .IngestDocument ;
34
33
import org .opensearch .ingest .IngestService ;
@@ -153,8 +152,8 @@ protected void executeInternal(
153
152
datasourceFacade .getDatasource (datasourceName , new ActionListener <>() {
154
153
@ Override
155
154
public void onResponse (final Datasource datasource ) {
156
- if (datasource == null ) {
157
- handler .accept (null , new IllegalStateException ("datasource does not exist " ));
155
+ if (datasource == null || DatasourceState . AVAILABLE . equals ( datasource . getState ()) == false ) {
156
+ handler .accept (null , new IllegalStateException ("datasource is not available " ));
158
157
return ;
159
158
}
160
159
@@ -174,6 +173,10 @@ public void onResponse(final Datasource datasource) {
174
173
175
174
@ Override
176
175
public void onFailure (final Exception e ) {
176
+ if (e instanceof IndexNotFoundException ) {
177
+ handler .accept (null , new IllegalStateException ("datasource is not available" ));
178
+ return ;
179
+ }
177
180
handler .accept (null , e );
178
181
}
179
182
});
@@ -241,8 +244,8 @@ protected void executeInternal(
241
244
datasourceFacade .getDatasource (datasourceName , new ActionListener <>() {
242
245
@ Override
243
246
public void onResponse (final Datasource datasource ) {
244
- if (datasource == null ) {
245
- handler .accept (null , new IllegalStateException ("datasource does not exist " ));
247
+ if (datasource == null || DatasourceState . AVAILABLE . equals ( datasource . getState ()) == false ) {
248
+ handler .accept (null , new IllegalStateException ("datasource is not available " ));
246
249
return ;
247
250
}
248
251
@@ -262,6 +265,10 @@ public void onResponse(final Datasource datasource) {
262
265
263
266
@ Override
264
267
public void onFailure (final Exception e ) {
268
+ if (e instanceof IndexNotFoundException ) {
269
+ handler .accept (null , new IllegalStateException ("datasource is not available" ));
270
+ return ;
271
+ }
265
272
handler .accept (null , e );
266
273
}
267
274
});
@@ -319,15 +326,8 @@ public Factory(final IngestService ingestService, final DatasourceFacade datasou
319
326
}
320
327
321
328
/**
322
- * When a user create a processor, this method is called twice. Once to validate the new processor and another
323
- * to apply cluster state change after the processor is added.
324
- *
325
- * The second call is made by ClusterApplierService. Therefore, we cannot access cluster state in the call.
326
- * That means, we cannot even query an index inside the call.
327
- *
328
- * Because the processor is validated in the first call, we skip the validation in the second call.
329
- *
330
- * @see org.opensearch.cluster.service.ClusterApplierService#state()
329
+ * Within this method, blocking request cannot be called because this method is executed in a transport thread.
330
+ * This means, validation using data in an index won't work.
331
331
*/
332
332
@ Override
333
333
public Ip2GeoProcessor create (
@@ -342,11 +342,6 @@ public Ip2GeoProcessor create(
342
342
List <String > propertyNames = readOptionalList (TYPE , processorTag , config , CONFIG_PROPERTIES );
343
343
boolean ignoreMissing = readBooleanProperty (TYPE , processorTag , config , CONFIG_IGNORE_MISSING , false );
344
344
345
- // Skip validation for the call by cluster applier service
346
- if (Thread .currentThread ().getName ().contains (CLUSTER_UPDATE_THREAD_NAME ) == false ) {
347
- validate (processorTag , datasourceName , propertyNames );
348
- }
349
-
350
345
return new Ip2GeoProcessor (
351
346
processorTag ,
352
347
description ,
@@ -360,39 +355,5 @@ public Ip2GeoProcessor create(
360
355
geoIpDataFacade
361
356
);
362
357
}
363
-
364
- private void validate (final String processorTag , final String datasourceName , final List <String > propertyNames ) throws IOException {
365
- Datasource datasource = datasourceFacade .getDatasource (datasourceName );
366
-
367
- if (datasource == null ) {
368
- throw newConfigurationException (TYPE , processorTag , "datasource" , "datasource [" + datasourceName + "] doesn't exist" );
369
- }
370
-
371
- if (DatasourceState .AVAILABLE .equals (datasource .getState ()) == false ) {
372
- throw newConfigurationException (
373
- TYPE ,
374
- processorTag ,
375
- "datasource" ,
376
- "datasource [" + datasourceName + "] is not in an available state"
377
- );
378
- }
379
-
380
- if (propertyNames == null ) {
381
- return ;
382
- }
383
-
384
- // Validate properties are valid. If not add all available properties.
385
- final Set <String > availableProperties = new HashSet <>(datasource .getDatabase ().getFields ());
386
- for (String fieldName : propertyNames ) {
387
- if (availableProperties .contains (fieldName ) == false ) {
388
- throw newConfigurationException (
389
- TYPE ,
390
- processorTag ,
391
- "properties" ,
392
- "property [" + fieldName + "] is not available in the datasource [" + datasourceName + "]"
393
- );
394
- }
395
- }
396
- }
397
358
}
398
359
}
0 commit comments