@@ -191,11 +191,13 @@ public void testPauseAndResumeIngestion() throws Exception {
191191        produceData ("2" , "name2" , "20" );
192192        internalCluster ().startClusterManagerOnlyNode ();
193193        final  String  nodeA  = internalCluster ().startDataOnlyNode ();
194-         final  String  nodeB  = internalCluster ().startDataOnlyNode ();
195194
196195        createIndexWithDefaultSettings (1 , 1 );
196+         ensureYellowAndNoInitializingShards (indexName );
197+         waitForSearchableDocs (2 , Arrays .asList (nodeA ));
198+         final  String  nodeB  = internalCluster ().startDataOnlyNode ();
197199        ensureGreen (indexName );
198-         waitForSearchableDocs ( 2 ,  Arrays . asList ( nodeA ,  nodeB ));
200+         assertTrue ( nodeA . equals ( primaryNodeName ( indexName ) ));
199201
200202        // pause ingestion 
201203        PauseIngestionResponse  pauseResponse  = pauseIngestion (indexName );
@@ -219,12 +221,13 @@ public void testPauseAndResumeIngestion() throws Exception {
219221        client ().admin ().cluster ().prepareReroute ().add (new  AllocateReplicaAllocationCommand (indexName , 0 , nodeC )).get ();
220222        ensureGreen (indexName );
221223        assertTrue (nodeC .equals (replicaNodeName (indexName )));
222-         assertEquals (2 , getSearchableDocCount (nodeB ));
223224        waitForState (() -> {
224225            GetIngestionStateResponse  ingestionState  = getIngestionState (indexName );
225-             return  Arrays .stream (ingestionState .getShardStates ())
226-                 .allMatch (state  -> state .isPollerPaused () && state .pollerState ().equalsIgnoreCase ("paused" ));
226+             return  ingestionState .getFailedShards () == 0 
227+                 && Arrays .stream (ingestionState .getShardStates ())
228+                     .allMatch (state  -> state .isPollerPaused () && state .pollerState ().equalsIgnoreCase ("paused" ));
227229        });
230+         assertEquals (2 , getSearchableDocCount (nodeB ));
228231
229232        // resume ingestion 
230233        ResumeIngestionResponse  resumeResponse  = resumeIngestion (indexName );
0 commit comments