25
25
import org .elasticsearch .action .support .ActiveShardCount ;
26
26
import org .elasticsearch .client .Client ;
27
27
import org .elasticsearch .cluster .ClusterState ;
28
+ import org .elasticsearch .cluster .ClusterStateUpdateTask ;
28
29
import org .elasticsearch .cluster .metadata .IndexMetadata ;
29
30
import org .elasticsearch .cluster .metadata .Metadata ;
30
31
import org .elasticsearch .cluster .service .ClusterService ;
37
38
import org .elasticsearch .reindex .ReindexPlugin ;
38
39
import org .elasticsearch .test .ESIntegTestCase ;
39
40
import org .elasticsearch .upgrades .FeatureMigrationResults ;
41
+ import org .elasticsearch .upgrades .SingleFeatureMigrationResult ;
40
42
import org .elasticsearch .xcontent .XContentBuilder ;
41
43
import org .elasticsearch .xcontent .json .JsonXContent ;
42
44
50
52
import java .util .Map ;
51
53
import java .util .Optional ;
52
54
import java .util .Set ;
55
+ import java .util .concurrent .CountDownLatch ;
56
+ import java .util .concurrent .TimeUnit ;
53
57
import java .util .concurrent .atomic .AtomicReference ;
54
58
import java .util .function .BiConsumer ;
55
59
import java .util .function .Function ;
@@ -268,6 +272,67 @@ public void testMigrateIndexWithWriteBlock() throws Exception {
268
272
});
269
273
}
270
274
275
+ public void testMigrationWillRunAfterError () throws Exception {
276
+ createSystemIndexForDescriptor (INTERNAL_MANAGED );
277
+
278
+ TestPlugin .preMigrationHook .set ((state ) -> Collections .emptyMap ());
279
+ TestPlugin .postMigrationHook .set ((state , metadata ) -> {});
280
+
281
+ ensureGreen ();
282
+
283
+ SetOnce <Exception > failure = new SetOnce <>();
284
+ CountDownLatch clusterStateUpdated = new CountDownLatch (1 );
285
+ internalCluster ().getCurrentMasterNodeInstance (ClusterService .class )
286
+ .submitStateUpdateTask (this .getTestName (), new ClusterStateUpdateTask () {
287
+ @ Override
288
+ public ClusterState execute (ClusterState currentState ) throws Exception {
289
+ FeatureMigrationResults newResults = new FeatureMigrationResults (
290
+ Collections .singletonMap (
291
+ FEATURE_NAME ,
292
+ SingleFeatureMigrationResult .failure (INTERNAL_MANAGED_INDEX_NAME , new RuntimeException ("it failed :(" ))
293
+ )
294
+ );
295
+ Metadata newMetadata = Metadata .builder (currentState .metadata ())
296
+ .putCustom (FeatureMigrationResults .TYPE , newResults )
297
+ .build ();
298
+ return ClusterState .builder (currentState ).metadata (newMetadata ).build ();
299
+ }
300
+
301
+ @ Override
302
+ public void clusterStateProcessed (String source , ClusterState oldState , ClusterState newState ) {
303
+ clusterStateUpdated .countDown ();
304
+ }
305
+
306
+ @ Override
307
+ public void onFailure (String source , Exception e ) {
308
+ failure .set (e );
309
+ clusterStateUpdated .countDown ();
310
+ }
311
+ });
312
+
313
+ clusterStateUpdated .await (10 , TimeUnit .SECONDS ); // Should be basically instantaneous
314
+ if (failure .get () != null ) {
315
+ logger .error ("cluster state update to inject migration failure state did not succeed" , failure .get ());
316
+ fail ("cluster state update failed, see log for details" );
317
+ }
318
+
319
+ PostFeatureUpgradeRequest migrationRequest = new PostFeatureUpgradeRequest ();
320
+ PostFeatureUpgradeResponse migrationResponse = client ().execute (PostFeatureUpgradeAction .INSTANCE , migrationRequest ).get ();
321
+ // Make sure we actually started the migration
322
+ assertTrue (
323
+ "could not find [" + FEATURE_NAME + "] in response: " + Strings .toString (migrationResponse ),
324
+ migrationResponse .getFeatures ().stream ().anyMatch (feature -> feature .getFeatureName ().equals (FEATURE_NAME ))
325
+ );
326
+
327
+ // Now wait for the migration to finish (otherwise the test infra explodes)
328
+ assertBusy (() -> {
329
+ GetFeatureUpgradeStatusRequest getStatusRequest = new GetFeatureUpgradeStatusRequest ();
330
+ GetFeatureUpgradeStatusResponse statusResp = client ().execute (GetFeatureUpgradeStatusAction .INSTANCE , getStatusRequest ).get ();
331
+ logger .info (Strings .toString (statusResp ));
332
+ assertThat (statusResp .getUpgradeStatus (), equalTo (GetFeatureUpgradeStatusResponse .UpgradeStatus .NO_MIGRATION_NEEDED ));
333
+ });
334
+ }
335
+
271
336
public void assertIndexHasCorrectProperties (
272
337
Metadata metadata ,
273
338
String indexName ,
@@ -345,6 +410,7 @@ public void createSystemIndexForDescriptor(SystemIndexDescriptor descriptor) thr
345
410
static final String FEATURE_NAME = "A-test-feature" ; // Sorts alphabetically before the feature from MultiFeatureMigrationIT
346
411
static final String ORIGIN = FeatureMigrationIT .class .getSimpleName ();
347
412
static final String FlAG_SETTING_KEY = IndexMetadata .INDEX_PRIORITY_SETTING .getKey ();
413
+ static final String INTERNAL_MANAGED_INDEX_NAME = ".int-man-old" ;
348
414
static final int INDEX_DOC_COUNT = 100 ; // arbitrarily chosen
349
415
public static final Version NEEDS_UPGRADE_VERSION = Version .V_7_0_0 ;
350
416
@@ -355,7 +421,7 @@ public void createSystemIndexForDescriptor(SystemIndexDescriptor descriptor) thr
355
421
static final SystemIndexDescriptor INTERNAL_MANAGED = SystemIndexDescriptor .builder ()
356
422
.setIndexPattern (".int-man-*" )
357
423
.setAliasName (".internal-managed-alias" )
358
- .setPrimaryIndex (".int-man-old" )
424
+ .setPrimaryIndex (INTERNAL_MANAGED_INDEX_NAME )
359
425
.setType (SystemIndexDescriptor .Type .INTERNAL_MANAGED )
360
426
.setSettings (createSimpleSettings (NEEDS_UPGRADE_VERSION , INTERNAL_MANAGED_FLAG_VALUE ))
361
427
.setMappings (createSimpleMapping (true , true ))
0 commit comments