@@ -453,7 +453,7 @@ public void testDerivedSourceRollingRestart() throws Exception {
453453 rollingRestartWithVerification (docCount );
454454 }
455455
456- public void testDerivedSourceWithMixedVersionRollingRestart () throws Exception {
456+ public void testDerivedSourceWithMultiFieldsRollingRestart () throws Exception {
457457 String mapping = """
458458 {
459459 "properties": {
@@ -503,12 +503,12 @@ public void testDerivedSourceWithMixedVersionRollingRestart() throws Exception {
503503
504504 // Add replicas before starting new nodes
505505 assertAcked (
506- client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , 2 ))
506+ client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , 1 ))
507507 );
508508
509509 // Add nodes and additional documents
510510 int totalDocs = docCount ;
511- for (int i = 0 ; i < 2 ; i ++) {
511+ for (int i = 0 ; i < 1 ; i ++) {
512512 internalCluster ().startNode ();
513513
514514 // Add more documents
@@ -564,7 +564,7 @@ public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Except
564564 prepareCreate (
565565 "test" ,
566566 Settings .builder ()
567- .put ("index.number_of_shards" , 3 )
567+ .put ("index.number_of_shards" , 2 )
568568 .put ("index.number_of_replicas" , 0 )
569569 .put ("index.derived_source.enabled" , true )
570570 .put ("index.refresh_interval" , "1s" )
@@ -601,74 +601,86 @@ public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Except
601601
602602 // Start concurrent updates during rolling restart
603603 logger .info ("--> starting rolling restart with concurrent updates" );
604- concurrentUpdatesRollingRestartWithVerification (docCount );
605- }
606604
607- private void concurrentUpdatesRollingRestartWithVerification (int initialDocCount ) throws Exception {
608- AtomicBoolean stop = new AtomicBoolean (false );
609- AtomicInteger totalUpdates = new AtomicInteger (0 );
610- CountDownLatch updateLatch = new CountDownLatch (1 );
611-
612- // Start concurrent update thread
613- Thread updateThread = new Thread (() -> {
605+ final AtomicBoolean stop = new AtomicBoolean (false );
606+ final AtomicInteger successfulUpdates = new AtomicInteger (0 );
607+ final CountDownLatch updateLatch = new CountDownLatch (1 );
608+ final Thread updateThread = new Thread (() -> {
614609 try {
615- updateLatch .await (); // Wait for cluster to be ready
610+ updateLatch .await ();
616611 while (stop .get () == false ) {
617612 try {
618- int docId = randomIntBetween (0 , initialDocCount - 1 );
619- client ().prepareUpdate ("test" , String .valueOf (docId ))
620- .setRetryOnConflict (3 )
621- .setDoc ("counter" , randomIntBetween (0 , 1000 ), "last_updated" , System .currentTimeMillis (), "version" , 1 )
622- .execute ()
623- .actionGet ();
624- totalUpdates .incrementAndGet ();
625- Thread .sleep (10 );
613+ // Update documents sequentially to avoid conflicts
614+ for (int i = 0 ; i < docCount && !stop .get (); i ++) {
615+ client ().prepareUpdate ("test" , String .valueOf (i ))
616+ .setRetryOnConflict (3 )
617+ .setDoc ("counter" , successfulUpdates .get () + 1 , "last_updated" , System .currentTimeMillis (), "version" , 1 )
618+ .execute ()
619+ .actionGet (TimeValue .timeValueSeconds (5 ));
620+ successfulUpdates .incrementAndGet ();
621+ Thread .sleep (50 ); // Larger delay between updates
622+ }
626623 } catch (Exception e ) {
627- if (e instanceof InterruptedException ) {
628- break ;
624+ if (stop . get () == false ) {
625+ logger . warn ( "Error in update thread" , e ) ;
629626 }
630- logger .warn ("Error in update thread" , e );
631627 }
632628 }
633629 } catch (InterruptedException e ) {
634630 Thread .currentThread ().interrupt ();
635631 }
636632 });
637- updateThread .start ();
638633
639634 try {
640635 // Add replicas
641636 assertAcked (
642- client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , 2 ))
637+ client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , 1 ))
643638 );
644639
645640 // Start additional nodes
646- for (int i = 0 ; i < 2 ; i ++) {
641+ for (int i = 0 ; i < 1 ; i ++) {
647642 internalCluster ().startNode ();
648643 }
649644 ensureGreen ("test" );
650645
651- // Start updates
646+ // Start updates after cluster is stable
647+ updateThread .start ();
652648 updateLatch .countDown ();
653649
654650 // Wait for some updates to occur
655- Thread . sleep ( 2000 );
651+ assertBusy (() -> { assertTrue ( "No successful updates occurred" , successfulUpdates . get () > 0 ); }, 30 , TimeUnit . SECONDS );
656652
657653 // Rolling restart of all nodes
658654 for (String node : internalCluster ().getNodeNames ()) {
655+ // Stop updates temporarily during node restart
656+ stop .set (true );
657+ Thread .sleep (1000 ); // Wait for in-flight operations to complete
658+
659659 internalCluster ().restartNode (node );
660- ensureGreen (TimeValue .timeValueMinutes (2 ));
661- verifyDerivedSourceWithUpdates (initialDocCount );
660+ ensureGreen (TimeValue .timeValueSeconds (60 ));
661+
662+ // Verify data consistency
663+ refresh ("test" );
664+ verifyDerivedSourceWithUpdates (docCount );
665+
666+ // Resume updates
667+ stop .set (false );
662668 }
669+
663670 } finally {
671+ // Clean shutdown
664672 stop .set (true );
665- updateThread .join ();
673+ updateThread .join (TimeValue .timeValueSeconds (30 ).millis ());
674+ if (updateThread .isAlive ()) {
675+ updateThread .interrupt ();
676+ updateThread .join (TimeValue .timeValueSeconds (5 ).millis ());
677+ }
666678 }
667679
668- logger .info ("--> performed {} concurrent updates during rolling restart" , totalUpdates .get ());
680+ logger .info ("--> performed {} successful updates during rolling restart" , successfulUpdates .get ());
669681 refresh ("test" );
670682 flush ("test" );
671- verifyDerivedSourceWithUpdates (initialDocCount );
683+ verifyDerivedSourceWithUpdates (docCount );
672684 }
673685
674686 private void verifyDerivedSourceWithUpdates (int expectedDocs ) throws Exception {
@@ -687,7 +699,8 @@ private void verifyDerivedSourceWithUpdates(int expectedDocs) throws Exception {
687699 assertEquals ("text value " + id , source .get ("text_field" ));
688700 assertNotNull ("counter missing for doc " + id , source .get ("counter" ));
689701 assertFalse (((String ) source .get ("last_updated" )).isEmpty ());
690- assertEquals (1 , source .get ("version" ));
702+ Integer counter = (Integer ) source .get ("counter" );
703+ assertEquals (counter == 0 ? 0 : 1 , source .get ("version" ));
691704
692705 // Verify text_field maintains original value
693706 assertEquals ("text value " + id , source .get ("text_field" ));
0 commit comments