Skip to content

Commit 1015576

Browse files
varunbharadwajjainankitk
authored andcommitted
fix ingestion pause state initialization on replica promotion (opensearch-project#19212)
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com> Signed-off-by: Ankit Jain <jainankitk@apache.org>
1 parent 0e99457 commit 1015576

File tree

3 files changed

+11
-5
lines changed

3 files changed

+11
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3333
- Grant access to testclusters dir for tests ([#19085](https://github.com/opensearch-project/OpenSearch/issues/19085))
3434
- Fix assertion error when collapsing search results with concurrent segment search enabled ([#19053](https://github.com/opensearch-project/OpenSearch/pull/19053))
3535
- Fix skip_unavailable setting changing to default during node drop issue ([#18766](https://github.com/opensearch-project/OpenSearch/pull/18766))
36+
- Fix pull-based ingestion pause state initialization during replica promotion ([#19212](https://github.com/opensearch-project/OpenSearch/pull/19212))
3637

3738
### Dependencies
3839
- Bump `com.netflix.nebula.ospackage-base` from 12.0.0 to 12.1.0 ([#19019](https://github.com/opensearch-project/OpenSearch/pull/19019))

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -191,11 +191,13 @@ public void testPauseAndResumeIngestion() throws Exception {
191191
produceData("2", "name2", "20");
192192
internalCluster().startClusterManagerOnlyNode();
193193
final String nodeA = internalCluster().startDataOnlyNode();
194-
final String nodeB = internalCluster().startDataOnlyNode();
195194

196195
createIndexWithDefaultSettings(1, 1);
196+
ensureYellowAndNoInitializingShards(indexName);
197+
waitForSearchableDocs(2, Arrays.asList(nodeA));
198+
final String nodeB = internalCluster().startDataOnlyNode();
197199
ensureGreen(indexName);
198-
waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB));
200+
assertTrue(nodeA.equals(primaryNodeName(indexName)));
199201

200202
// pause ingestion
201203
PauseIngestionResponse pauseResponse = pauseIngestion(indexName);
@@ -219,12 +221,13 @@ public void testPauseAndResumeIngestion() throws Exception {
219221
client().admin().cluster().prepareReroute().add(new AllocateReplicaAllocationCommand(indexName, 0, nodeC)).get();
220222
ensureGreen(indexName);
221223
assertTrue(nodeC.equals(replicaNodeName(indexName)));
222-
assertEquals(2, getSearchableDocCount(nodeB));
223224
waitForState(() -> {
224225
GetIngestionStateResponse ingestionState = getIngestionState(indexName);
225-
return Arrays.stream(ingestionState.getShardStates())
226-
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"));
226+
return ingestionState.getFailedShards() == 0
227+
&& Arrays.stream(ingestionState.getShardStates())
228+
.allMatch(state -> state.isPollerPaused() && state.pollerState().equalsIgnoreCase("paused"));
227229
});
230+
assertEquals(2, getSearchableDocCount(nodeB));
228231

229232
// resume ingestion
230233
ResumeIngestionResponse resumeResponse = resumeIngestion(indexName);

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ private DefaultStreamPoller(
157157
this.errorStrategy = errorStrategy;
158158
this.indexName = indexSettings.getIndex().getName();
159159

160+
// handle initial poller states
161+
this.paused = initialState == State.PAUSED;
160162
}
161163

162164
@Override

0 commit comments

Comments
 (0)