@@ -453,7 +453,7 @@ public void testDerivedSourceRollingRestart() throws Exception {
453453        rollingRestartWithVerification (docCount );
454454    }
455455
456-     public  void  testDerivedSourceWithMultiFieldsRollingRestart () throws  Exception  {
456+     public  void  testDerivedSourceWithMixedVersionRollingRestart () throws  Exception  {
457457        String  mapping  = """ 
458458            { 
459459              "properties": { 
@@ -503,12 +503,12 @@ public void testDerivedSourceWithMultiFieldsRollingRestart() throws Exception {
503503
504504        // Add replicas before starting new nodes 
505505        assertAcked (
506-             client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , 1 ))
506+             client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , 2 ))
507507        );
508508
509509        // Add nodes and additional documents 
510510        int  totalDocs  = docCount ;
511-         for  (int  i  = 0 ; i  < 1 ; i ++) {
511+         for  (int  i  = 0 ; i  < 2 ; i ++) {
512512            internalCluster ().startNode ();
513513
514514            // Add more documents 
@@ -564,7 +564,7 @@ public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Except
564564            prepareCreate (
565565                "test" ,
566566                Settings .builder ()
567-                     .put ("index.number_of_shards" , 2 )
567+                     .put ("index.number_of_shards" , 3 )
568568                    .put ("index.number_of_replicas" , 0 )
569569                    .put ("index.derived_source.enabled" , true )
570570                    .put ("index.refresh_interval" , "1s" )
@@ -601,86 +601,74 @@ public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Except
601601
602602        // Start concurrent updates during rolling restart 
603603        logger .info ("--> starting rolling restart with concurrent updates" );
604+         concurrentUpdatesRollingRestartWithVerification (docCount );
605+     }
604606
605-         final  AtomicBoolean  stop  = new  AtomicBoolean (false );
606-         final  AtomicInteger  successfulUpdates  = new  AtomicInteger (0 );
607-         final  CountDownLatch  updateLatch  = new  CountDownLatch (1 );
608-         final  Thread  updateThread  = new  Thread (() -> {
607+     private  void  concurrentUpdatesRollingRestartWithVerification (int  initialDocCount ) throws  Exception  {
608+         AtomicBoolean  stop  = new  AtomicBoolean (false );
609+         AtomicInteger  totalUpdates  = new  AtomicInteger (0 );
610+         CountDownLatch  updateLatch  = new  CountDownLatch (1 );
611+ 
612+         // Start concurrent update thread 
613+         Thread  updateThread  = new  Thread (() -> {
609614            try  {
610-                 updateLatch .await ();
615+                 updateLatch .await ();  // Wait for cluster to be ready 
611616                while  (stop .get () == false ) {
612617                    try  {
613-                         // Update documents sequentially to avoid conflicts 
614-                         for  (int  i  = 0 ; i  < docCount  && !stop .get (); i ++) {
615-                             client ().prepareUpdate ("test" , String .valueOf (i ))
616-                                 .setRetryOnConflict (3 )
617-                                 .setDoc ("counter" , successfulUpdates .get () + 1 , "last_updated" , System .currentTimeMillis (), "version" , 1 )
618-                                 .execute ()
619-                                 .actionGet (TimeValue .timeValueSeconds (5 ));
620-                             successfulUpdates .incrementAndGet ();
621-                             Thread .sleep (50 ); // Larger delay between updates 
622-                         }
618+                         int  docId  = randomIntBetween (0 , initialDocCount  - 1 );
619+                         client ().prepareUpdate ("test" , String .valueOf (docId ))
620+                             .setRetryOnConflict (3 )
621+                             .setDoc ("counter" , randomIntBetween (0 , 1000 ), "last_updated" , System .currentTimeMillis (), "version" , 1 )
622+                             .execute ()
623+                             .actionGet ();
624+                         totalUpdates .incrementAndGet ();
625+                         Thread .sleep (10 );
623626                    } catch  (Exception  e ) {
624-                         if  (stop . get () ==  false ) {
625-                             logger . warn ( "Error in update thread" ,  e ) ;
627+                         if  (e   instanceof   InterruptedException ) {
628+                             break ;
626629                        }
630+                         logger .warn ("Error in update thread" , e );
627631                    }
628632                }
629633            } catch  (InterruptedException  e ) {
630634                Thread .currentThread ().interrupt ();
631635            }
632636        });
637+         updateThread .start ();
633638
634639        try  {
635640            // Add replicas 
636641            assertAcked (
637-                 client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , 1 ))
642+                 client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , 2 ))
638643            );
639644
640645            // Start additional nodes 
641-             for  (int  i  = 0 ; i  < 1 ; i ++) {
646+             for  (int  i  = 0 ; i  < 2 ; i ++) {
642647                internalCluster ().startNode ();
643648            }
644649            ensureGreen ("test" );
645650
646-             // Start updates after cluster is stable 
647-             updateThread .start ();
651+             // Start updates 
648652            updateLatch .countDown ();
649653
650654            // Wait for some updates to occur 
651-             assertBusy (() -> {  assertTrue ( "No successful updates occurred" ,  successfulUpdates . get () >  0 ); },  30 ,  TimeUnit . SECONDS );
655+             Thread . sleep ( 2000 );
652656
653657            // Rolling restart of all nodes 
654658            for  (String  node  : internalCluster ().getNodeNames ()) {
655-                 // Stop updates temporarily during node restart 
656-                 stop .set (true );
657-                 Thread .sleep (1000 ); // Wait for in-flight operations to complete 
658- 
659659                internalCluster ().restartNode (node );
660-                 ensureGreen (TimeValue .timeValueSeconds (60 ));
661- 
662-                 // Verify data consistency 
663-                 refresh ("test" );
664-                 verifyDerivedSourceWithUpdates (docCount );
665- 
666-                 // Resume updates 
667-                 stop .set (false );
660+                 ensureGreen (TimeValue .timeValueMinutes (2 ));
661+                 verifyDerivedSourceWithUpdates (initialDocCount );
668662            }
669- 
670663        } finally  {
671-             // Clean shutdown 
672664            stop .set (true );
673-             updateThread .join (TimeValue .timeValueSeconds (30 ).millis ());
674-             if  (updateThread .isAlive ()) {
675-                 updateThread .interrupt ();
676-                 updateThread .join (TimeValue .timeValueSeconds (5 ).millis ());
677-             }
665+             updateThread .join ();
678666        }
679667
680-         logger .info ("--> performed {} successful  updates during rolling restart" , successfulUpdates .get ());
668+         logger .info ("--> performed {} concurrent  updates during rolling restart" , totalUpdates .get ());
681669        refresh ("test" );
682670        flush ("test" );
683-         verifyDerivedSourceWithUpdates (docCount );
671+         verifyDerivedSourceWithUpdates (initialDocCount );
684672    }
685673
686674    private  void  verifyDerivedSourceWithUpdates (int  expectedDocs ) throws  Exception  {
@@ -699,8 +687,7 @@ private void verifyDerivedSourceWithUpdates(int expectedDocs) throws Exception {
699687                assertEquals ("text value "  + id , source .get ("text_field" ));
700688                assertNotNull ("counter missing for doc "  + id , source .get ("counter" ));
701689                assertFalse (((String ) source .get ("last_updated" )).isEmpty ());
702-                 Integer  counter  = (Integer ) source .get ("counter" );
703-                 assertEquals (counter  == 0  ? 0  : 1 , source .get ("version" ));
690+                 assertEquals (1 , source .get ("version" ));
704691
705692                // Verify text_field maintains original value 
706693                assertEquals ("text value "  + id , source .get ("text_field" ));
0 commit comments