@@ -453,7 +453,7 @@ public void testDerivedSourceRollingRestart() throws Exception {
453453        rollingRestartWithVerification (docCount );
454454    }
455455
456-     public  void  testDerivedSourceWithMixedVersionRollingRestart () throws  Exception  {
456+     public  void  testDerivedSourceWithMultiFieldsRollingRestart () throws  Exception  {
457457        String  mapping  = """ 
458458            { 
459459              "properties": { 
@@ -503,12 +503,12 @@ public void testDerivedSourceWithMixedVersionRollingRestart() throws Exception {
503503
504504        // Add replicas before starting new nodes 
505505        assertAcked (
506-             client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , 2 ))
506+             client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , 1 ))
507507        );
508508
509509        // Add nodes and additional documents 
510510        int  totalDocs  = docCount ;
511-         for  (int  i  = 0 ; i  < 2 ; i ++) {
511+         for  (int  i  = 0 ; i  < 1 ; i ++) {
512512            internalCluster ().startNode ();
513513
514514            // Add more documents 
@@ -564,7 +564,7 @@ public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Except
564564            prepareCreate (
565565                "test" ,
566566                Settings .builder ()
567-                     .put ("index.number_of_shards" , 3 )
567+                     .put ("index.number_of_shards" , 2 )
568568                    .put ("index.number_of_replicas" , 0 )
569569                    .put ("index.derived_source.enabled" , true )
570570                    .put ("index.refresh_interval" , "1s" )
@@ -601,74 +601,86 @@ public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Except
601601
602602        // Start concurrent updates during rolling restart 
603603        logger .info ("--> starting rolling restart with concurrent updates" );
604-         concurrentUpdatesRollingRestartWithVerification (docCount );
605-     }
606604
607-     private  void  concurrentUpdatesRollingRestartWithVerification (int  initialDocCount ) throws  Exception  {
608-         AtomicBoolean  stop  = new  AtomicBoolean (false );
609-         AtomicInteger  totalUpdates  = new  AtomicInteger (0 );
610-         CountDownLatch  updateLatch  = new  CountDownLatch (1 );
611- 
612-         // Start concurrent update thread 
613-         Thread  updateThread  = new  Thread (() -> {
605+         final  AtomicBoolean  stop  = new  AtomicBoolean (false );
606+         final  AtomicInteger  successfulUpdates  = new  AtomicInteger (0 );
607+         final  CountDownLatch  updateLatch  = new  CountDownLatch (1 );
608+         final  Thread  updateThread  = new  Thread (() -> {
614609            try  {
615-                 updateLatch .await ();  // Wait for cluster to be ready 
610+                 updateLatch .await ();
616611                while  (stop .get () == false ) {
617612                    try  {
618-                         int  docId  = randomIntBetween (0 , initialDocCount  - 1 );
619-                         client ().prepareUpdate ("test" , String .valueOf (docId ))
620-                             .setRetryOnConflict (3 )
621-                             .setDoc ("counter" , randomIntBetween (0 , 1000 ), "last_updated" , System .currentTimeMillis (), "version" , 1 )
622-                             .execute ()
623-                             .actionGet ();
624-                         totalUpdates .incrementAndGet ();
625-                         Thread .sleep (10 );
613+                         // Update documents sequentially to avoid conflicts 
614+                         for  (int  i  = 0 ; i  < docCount  && !stop .get (); i ++) {
615+                             client ().prepareUpdate ("test" , String .valueOf (i ))
616+                                 .setRetryOnConflict (3 )
617+                                 .setDoc ("counter" , successfulUpdates .get () + 1 , "last_updated" , System .currentTimeMillis (), "version" , 1 )
618+                                 .execute ()
619+                                 .actionGet (TimeValue .timeValueSeconds (5 ));
620+                             successfulUpdates .incrementAndGet ();
621+                             Thread .sleep (50 ); // Larger delay between updates 
622+                         }
626623                    } catch  (Exception  e ) {
627-                         if  (e   instanceof   InterruptedException ) {
628-                             break ;
624+                         if  (stop . get () ==  false ) {
625+                             logger . warn ( "Error in update thread" ,  e ) ;
629626                        }
630-                         logger .warn ("Error in update thread" , e );
631627                    }
632628                }
633629            } catch  (InterruptedException  e ) {
634630                Thread .currentThread ().interrupt ();
635631            }
636632        });
637-         updateThread .start ();
638633
639634        try  {
640635            // Add replicas 
641636            assertAcked (
642-                 client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , 2 ))
637+                 client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , 1 ))
643638            );
644639
645640            // Start additional nodes 
646-             for  (int  i  = 0 ; i  < 2 ; i ++) {
641+             for  (int  i  = 0 ; i  < 1 ; i ++) {
647642                internalCluster ().startNode ();
648643            }
649644            ensureGreen ("test" );
650645
651-             // Start updates 
646+             // Start updates after cluster is stable 
647+             updateThread .start ();
652648            updateLatch .countDown ();
653649
654650            // Wait for some updates to occur 
655-             Thread . sleep ( 2000 );
651+             assertBusy (() -> {  assertTrue ( "No successful updates occurred" ,  successfulUpdates . get () >  0 ); },  30 ,  TimeUnit . SECONDS );
656652
657653            // Rolling restart of all nodes 
658654            for  (String  node  : internalCluster ().getNodeNames ()) {
655+                 // Stop updates temporarily during node restart 
656+                 stop .set (true );
657+                 Thread .sleep (1000 ); // Wait for in-flight operations to complete 
658+ 
659659                internalCluster ().restartNode (node );
660-                 ensureGreen (TimeValue .timeValueMinutes (2 ));
661-                 verifyDerivedSourceWithUpdates (initialDocCount );
660+                 ensureGreen (TimeValue .timeValueSeconds (60 ));
661+ 
662+                 // Verify data consistency 
663+                 refresh ("test" );
664+                 verifyDerivedSourceWithUpdates (docCount );
665+ 
666+                 // Resume updates 
667+                 stop .set (false );
662668            }
669+ 
663670        } finally  {
671+             // Clean shutdown 
664672            stop .set (true );
665-             updateThread .join ();
673+             updateThread .join (TimeValue .timeValueSeconds (30 ).millis ());
674+             if  (updateThread .isAlive ()) {
675+                 updateThread .interrupt ();
676+                 updateThread .join (TimeValue .timeValueSeconds (5 ).millis ());
677+             }
666678        }
667679
668-         logger .info ("--> performed {} concurrent  updates during rolling restart" , totalUpdates .get ());
680+         logger .info ("--> performed {} successful  updates during rolling restart" , successfulUpdates .get ());
669681        refresh ("test" );
670682        flush ("test" );
671-         verifyDerivedSourceWithUpdates (initialDocCount );
683+         verifyDerivedSourceWithUpdates (docCount );
672684    }
673685
674686    private  void  verifyDerivedSourceWithUpdates (int  expectedDocs ) throws  Exception  {
@@ -687,7 +699,8 @@ private void verifyDerivedSourceWithUpdates(int expectedDocs) throws Exception {
687699                assertEquals ("text value "  + id , source .get ("text_field" ));
688700                assertNotNull ("counter missing for doc "  + id , source .get ("counter" ));
689701                assertFalse (((String ) source .get ("last_updated" )).isEmpty ());
690-                 assertEquals (1 , source .get ("version" ));
702+                 Integer  counter  = (Integer ) source .get ("counter" );
703+                 assertEquals (counter  == 0  ? 0  : 1 , source .get ("version" ));
691704
692705                // Verify text_field maintains original value 
693706                assertEquals ("text value "  + id , source .get ("text_field" ));
0 commit comments