@@ -453,7 +453,7 @@ public void testDerivedSourceRollingRestart() throws Exception {
453453 rollingRestartWithVerification (docCount );
454454 }
455455
456- public void testDerivedSourceWithMultiFieldsRollingRestart () throws Exception {
456+ public void testDerivedSourceWithMixedVersionRollingRestart () throws Exception {
457457 String mapping = """
458458 {
459459 "properties": {
@@ -503,12 +503,12 @@ public void testDerivedSourceWithMultiFieldsRollingRestart() throws Exception {
503503
504504 // Add replicas before starting new nodes
505505 assertAcked (
506- client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , 1 ))
506+ client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , 2 ))
507507 );
508508
509509 // Add nodes and additional documents
510510 int totalDocs = docCount ;
511- for (int i = 0 ; i < 1 ; i ++) {
511+ for (int i = 0 ; i < 2 ; i ++) {
512512 internalCluster ().startNode ();
513513
514514 // Add more documents
@@ -564,7 +564,7 @@ public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Except
564564 prepareCreate (
565565 "test" ,
566566 Settings .builder ()
567- .put ("index.number_of_shards" , 2 )
567+ .put ("index.number_of_shards" , 3 )
568568 .put ("index.number_of_replicas" , 0 )
569569 .put ("index.derived_source.enabled" , true )
570570 .put ("index.refresh_interval" , "1s" )
@@ -601,86 +601,74 @@ public void testDerivedSourceWithConcurrentUpdatesRollingRestart() throws Except
601601
602602 // Start concurrent updates during rolling restart
603603 logger .info ("--> starting rolling restart with concurrent updates" );
604+ concurrentUpdatesRollingRestartWithVerification (docCount );
605+ }
604606
605- final AtomicBoolean stop = new AtomicBoolean (false );
606- final AtomicInteger successfulUpdates = new AtomicInteger (0 );
607- final CountDownLatch updateLatch = new CountDownLatch (1 );
608- final Thread updateThread = new Thread (() -> {
607+ private void concurrentUpdatesRollingRestartWithVerification (int initialDocCount ) throws Exception {
608+ AtomicBoolean stop = new AtomicBoolean (false );
609+ AtomicInteger totalUpdates = new AtomicInteger (0 );
610+ CountDownLatch updateLatch = new CountDownLatch (1 );
611+
612+ // Start concurrent update thread
613+ Thread updateThread = new Thread (() -> {
609614 try {
610- updateLatch .await ();
615+ updateLatch .await (); // Wait for cluster to be ready
611616 while (stop .get () == false ) {
612617 try {
613- // Update documents sequentially to avoid conflicts
614- for (int i = 0 ; i < docCount && !stop .get (); i ++) {
615- client ().prepareUpdate ("test" , String .valueOf (i ))
616- .setRetryOnConflict (3 )
617- .setDoc ("counter" , successfulUpdates .get () + 1 , "last_updated" , System .currentTimeMillis (), "version" , 1 )
618- .execute ()
619- .actionGet (TimeValue .timeValueSeconds (5 ));
620- successfulUpdates .incrementAndGet ();
621- Thread .sleep (50 ); // Larger delay between updates
622- }
618+ int docId = randomIntBetween (0 , initialDocCount - 1 );
619+ client ().prepareUpdate ("test" , String .valueOf (docId ))
620+ .setRetryOnConflict (3 )
621+ .setDoc ("counter" , randomIntBetween (0 , 1000 ), "last_updated" , System .currentTimeMillis (), "version" , 1 )
622+ .execute ()
623+ .actionGet ();
624+ totalUpdates .incrementAndGet ();
625+ Thread .sleep (10 );
623626 } catch (Exception e ) {
624- if (stop . get () == false ) {
625- logger . warn ( "Error in update thread" , e ) ;
627+ if (e instanceof InterruptedException ) {
628+ break ;
626629 }
630+ logger .warn ("Error in update thread" , e );
627631 }
628632 }
629633 } catch (InterruptedException e ) {
630634 Thread .currentThread ().interrupt ();
631635 }
632636 });
637+ updateThread .start ();
633638
634639 try {
635640 // Add replicas
636641 assertAcked (
637- client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , 1 ))
642+ client ().admin ().indices ().prepareUpdateSettings ("test" ).setSettings (Settings .builder ().put ("index.number_of_replicas" , 2 ))
638643 );
639644
640645 // Start additional nodes
641- for (int i = 0 ; i < 1 ; i ++) {
646+ for (int i = 0 ; i < 2 ; i ++) {
642647 internalCluster ().startNode ();
643648 }
644649 ensureGreen ("test" );
645650
646- // Start updates after cluster is stable
647- updateThread .start ();
651+ // Start updates
648652 updateLatch .countDown ();
649653
650654 // Wait for some updates to occur
651- assertBusy (() -> { assertTrue ( "No successful updates occurred" , successfulUpdates . get () > 0 ); }, 30 , TimeUnit . SECONDS );
655+ Thread . sleep ( 2000 );
652656
653657 // Rolling restart of all nodes
654658 for (String node : internalCluster ().getNodeNames ()) {
655- // Stop updates temporarily during node restart
656- stop .set (true );
657- Thread .sleep (1000 ); // Wait for in-flight operations to complete
658-
659659 internalCluster ().restartNode (node );
660- ensureGreen (TimeValue .timeValueSeconds (60 ));
661-
662- // Verify data consistency
663- refresh ("test" );
664- verifyDerivedSourceWithUpdates (docCount );
665-
666- // Resume updates
667- stop .set (false );
660+ ensureGreen (TimeValue .timeValueMinutes (2 ));
661+ verifyDerivedSourceWithUpdates (initialDocCount );
668662 }
669-
670663 } finally {
671- // Clean shutdown
672664 stop .set (true );
673- updateThread .join (TimeValue .timeValueSeconds (30 ).millis ());
674- if (updateThread .isAlive ()) {
675- updateThread .interrupt ();
676- updateThread .join (TimeValue .timeValueSeconds (5 ).millis ());
677- }
665+ updateThread .join ();
678666 }
679667
680- logger .info ("--> performed {} successful updates during rolling restart" , successfulUpdates .get ());
668+ logger .info ("--> performed {} concurrent updates during rolling restart" , totalUpdates .get ());
681669 refresh ("test" );
682670 flush ("test" );
683- verifyDerivedSourceWithUpdates (docCount );
671+ verifyDerivedSourceWithUpdates (initialDocCount );
684672 }
685673
686674 private void verifyDerivedSourceWithUpdates (int expectedDocs ) throws Exception {
@@ -699,8 +687,7 @@ private void verifyDerivedSourceWithUpdates(int expectedDocs) throws Exception {
699687 assertEquals ("text value " + id , source .get ("text_field" ));
700688 assertNotNull ("counter missing for doc " + id , source .get ("counter" ));
701689 assertFalse (((String ) source .get ("last_updated" )).isEmpty ());
702- Integer counter = (Integer ) source .get ("counter" );
703- assertEquals (counter == 0 ? 0 : 1 , source .get ("version" ));
690+ assertEquals (1 , source .get ("version" ));
704691
705692 // Verify text_field maintains original value
706693 assertEquals ("text value " + id , source .get ("text_field" ));
0 commit comments