Skip to content

Update pull-based ingestion multi-threaded writer flow batchStartPointer computation#21697

Merged
varunbharadwaj merged 3 commits into
opensearch-project:mainfrom
varunbharadwaj:vb/fixmultipartition
May 21, 2026
Merged

Update pull-based ingestion multi-threaded writer flow batchStartPointer computation#21697
varunbharadwaj merged 3 commits into
opensearch-project:mainfrom
varunbharadwaj:vb/fixmultipartition

Conversation

@varunbharadwaj
Copy link
Copy Markdown
Contributor

@varunbharadwaj varunbharadwaj commented May 17, 2026

Description

  1. Update the pull-based ingestion multi-threaded write flow to handle the rare race scenario where a flush is called before the very first message is read by a processor thread, but another processor thread has read a message. In this case, use the first queued message pointer for determining batchStartPointer. This can only happen during uneven hash distribution on docID partitioning which is rare.
  2. Update Kafka consumer to be synchronized to avoid concurrent access when reset API is called.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 17, 2026

PR Reviewer Guide 🔍

(Review updated until commit 3efe675)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Race Condition

The add method has a race condition between putIfAbsent and remove. If thread A calls putIfAbsent (returns null), then thread B calls putIfAbsent for the same partition (returns non-null), then thread A's put fails and calls remove, it will incorrectly remove thread B's entry. This occurs when multiple threads add messages to the same partition concurrently and one encounters an exception.

IngestionShardPointer previousFirstQueuedPointer = partitionToFirstQueuedPointerMap.putIfAbsent(
    partition,
    shardUpdateMessage.pointer()
);
try {
    partitionToQueueMap.get(partition).put(shardUpdateMessage);
} catch (InterruptedException | RuntimeException e) {
    if (previousFirstQueuedPointer == null) {
        partitionToFirstQueuedPointerMap.remove(partition, shardUpdateMessage.pointer());
    }
    throw e;
}
Stale Pointer

restartPointer is captured before clearing queues but after closeConsumer(). If getBatchStartPointer() is called between closeConsumer() and capturing restartPointer, it may return a stale pointer from the old consumer state. This could cause the reinitialized consumer to start from an incorrect offset.

IngestionShardPointer restartPointer = getBatchStartPointer();
closeConsumer();
blockingQueueContainer.clearAllQueues();
initializeConsumer();

if (this.consumer == null) {
    return;
}

// Handle consumer offset reset the first time an index is created. The reset offset takes precedence if available.
IngestionShardPointer resetShardPointer = getResetShardPointer();
if (resetShardPointer != null) {
    initialBatchStartPointer = resetShardPointer;
    restartPointer = resetShardPointer;
}

// Force the consumer to start from the batchStartPointer. This will be the initialBatchStartPointer for first
// time initialization, or the latest batchStartPointer based on processed messages.
forcedShardPointer = restartPointer;

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 17, 2026

PR Code Suggestions ✨

Latest suggestions up to 3efe675
Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Prevent null pointers in list

The method can return null values in the list when both getCurrentShardPointer() and
partitionToFirstQueuedPointerMap.get() return null. This could cause
NullPointerException in downstream code that expects non-null pointers. Consider
filtering out null values or documenting this behavior explicitly.

server/src/main/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainer.java [195-201]

 return partitionToMessageProcessorMap.entrySet().stream().map(entry -> {
     IngestionShardPointer currentShardPointer = entry.getValue().getCurrentShardPointer();
     if (currentShardPointer != null) {
         return currentShardPointer;
     }
     return partitionToFirstQueuedPointerMap.get(entry.getKey());
-}).toList();
+}).filter(pointer -> pointer != null).toList();
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that null values can be returned in the list when both getCurrentShardPointer() and partitionToFirstQueuedPointerMap.get() return null. However, this may be intentional behavior to represent partitions without any queued messages. The suggestion to filter nulls could be valid but depends on how downstream code handles these cases.

Low
Possible issue
Fix concurrent modification cleanup issue

The cleanup logic in the catch block only removes the pointer if
previousFirstQueuedPointer == null, but this doesn't guarantee the correct pointer
is removed. If another thread adds a different pointer between putIfAbsent and the
exception, remove(partition, pointer) may fail to remove the entry. Use
remove(partition) instead to ensure cleanup regardless of concurrent modifications.

server/src/main/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainer.java [132-143]

 IngestionShardPointer previousFirstQueuedPointer = partitionToFirstQueuedPointerMap.putIfAbsent(
     partition,
     shardUpdateMessage.pointer()
 );
 try {
     partitionToQueueMap.get(partition).put(shardUpdateMessage);
 } catch (InterruptedException | RuntimeException e) {
     if (previousFirstQueuedPointer == null) {
-        partitionToFirstQueuedPointerMap.remove(partition, shardUpdateMessage.pointer());
+        partitionToFirstQueuedPointerMap.remove(partition);
     }
     throw e;
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion identifies a potential race condition, but the current implementation using remove(partition, shardUpdateMessage.pointer()) is actually safer as it only removes the entry if the value matches, preventing accidental removal of a pointer added by another thread. Using remove(partition) could incorrectly remove a valid pointer set by concurrent operations.

Low

Previous suggestions

Suggestions up to commit 4b32e73
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent consumer access during closure

Calling getBatchStartPointer() before closeConsumer() may access a consumer that is
being closed or in an inconsistent state. Move the restartPointer retrieval after
closeConsumer() but before clearAllQueues() to ensure thread-safe access to consumer
state while preserving queue data.

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java [699-703]

 private void handleConsumerInitialization() {
+    closeConsumer();
     // retrieve batchStartPointer before clearing the partition blocking queues
     IngestionShardPointer restartPointer = getBatchStartPointer();
-    closeConsumer();
     blockingQueueContainer.clearAllQueues();
Suggestion importance[1-10]: 2

__

Why: The suggestion misunderstands the intent of the PR change. The PR explicitly retrieves restartPointer before closing the consumer to capture the current state before it's cleared. Moving it after closeConsumer() would defeat the purpose of the fix, as the consumer would already be closed and unable to provide accurate pointer information.

Low

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 4b32e73: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 17, 2026

Codecov Report

❌ Patch coverage is 95.45455% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 73.49%. Comparing base (be29fd0) to head (3efe675).
⚠️ Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
...llingingest/PartitionedBlockingQueueContainer.java 94.73% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21697      +/-   ##
============================================
+ Coverage     73.48%   73.49%   +0.01%     
- Complexity    75078    75107      +29     
============================================
  Files          6012     6016       +4     
  Lines        340940   341091     +151     
  Branches      49076    49093      +17     
============================================
+ Hits         250543   250699     +156     
+ Misses        70409    70401       -8     
- Partials      19988    19991       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@varunbharadwaj varunbharadwaj marked this pull request as ready for review May 17, 2026 03:27
@varunbharadwaj varunbharadwaj requested a review from a team as a code owner May 17, 2026 03:27
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
@varunbharadwaj varunbharadwaj force-pushed the vb/fixmultipartition branch from 4b32e73 to 3efe675 Compare May 20, 2026 18:59
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 3efe675

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 3efe675: SUCCESS

@varunbharadwaj varunbharadwaj merged commit ebea88a into opensearch-project:main May 21, 2026
15 of 18 checks passed
msfroh pushed a commit that referenced this pull request May 22, 2026
…ter computation (#21697) (#21791)

* handle multi partition first message race condition
* synchronize kafka consumer to prevent concurrent access
* add comments explaining fallback to first queued pointer logic

---------

Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants