Skip to content

[pull-based-ingestion] Pass IndexMetadata instead of only IngestionSource when initiating the shardConsumer#21841

Open
zhaih wants to merge 3 commits into
opensearch-project:mainfrom
zhaih:main
Open

[pull-based-ingestion] Pass IndexMetadata instead of only IngestionSource when initiating the shardConsumer#21841
zhaih wants to merge 3 commits into
opensearch-project:mainfrom
zhaih:main

Conversation

@zhaih
Copy link
Copy Markdown
Contributor

@zhaih zhaih commented May 26, 2026

Description

IngestionSource only exposes a very limited set of index settings and sometimes the plugin might want to see more, for example:

  1. We only pass in the current shardId but don't know the total number of shards, this requires plugin must have some pre-existing knowledge about sharding (like, kafka is sharded the same way as OS index)
  2. The plugin might want to have access to the index mapping to apply correct transformation or filtering, when the input schema and index schema is not exactly aligned.

I directly modified the method signature as there seems already been an breaking change to the API recently (and not released yet)

Check List

  • Functionality includes testing.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 26, 2026

PR Reviewer Guide 🔍

(Review updated until commit 0dc1992)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Incomplete Test

The test testCreateShardConsumerWithNullSource now only verifies that a NullPointerException is thrown when passing null IndexMetadata, but it no longer actually invokes the method to trigger the exception. The expectThrows call is present but the lambda is empty, so the test passes trivially without exercising the code path.

expectThrows(NullPointerException.class, () -> factory.createShardConsumer("test-client", 0, (IndexMetadata) null));
Incomplete Test

The test testCreateShardConsumerWithNullSource now only verifies that a NullPointerException is thrown when passing null IndexMetadata, but it no longer actually invokes the method to trigger the exception. The expectThrows call is present but the lambda is empty, so the test passes trivially without exercising the code path.

expectThrows(NullPointerException.class, () -> factory.createShardConsumer("test-client", 0, (IndexMetadata) null));

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 26, 2026

PR Code Suggestions ✨

Latest suggestions up to 0dc1992

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Replace assertions with null checks

Replace assertions with proper null checks and error handling. Assertions can be
disabled at runtime with -da flag, leaving the code vulnerable to
NullPointerException in production environments when ingestion source is
unexpectedly null.

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java [574-576]

 IndexMetadata indexMetadata = engineConfig.getIndexSettings().getIndexMetadata();
-assert indexMetadata != null;
-assert indexMetadata.getIngestionSource() != null;
+if (indexMetadata == null || indexMetadata.getIngestionSource() == null) {
+    logger.error("Cannot reinitialize consumer: index metadata or ingestion source is null");
+    return;
+}
Suggestion importance[1-10]: 8

__

Why: Valid concern that assertions can be disabled in production with -da flag, making the code vulnerable to NullPointerException. Replacing with proper null checks and error handling is a better practice for production code, especially in critical paths like consumer reinitialization.

Medium
Add null check for ingestion source

Add null check for indexMetadata.getIngestionSource() before calling initialize().
If the ingestion source is null, this will cause a NullPointerException when passed
to the deprecated initialize() method, breaking backward compatibility with existing
implementations.

server/src/main/java/org/opensearch/index/IngestionConsumerFactory.java [91-94]

 default T createShardConsumer(String clientId, int shardId, IndexMetadata indexMetadata) {
-    initialize(indexMetadata.getIngestionSource());
+    IngestionSource ingestionSource = indexMetadata.getIngestionSource();
+    if (ingestionSource != null) {
+        initialize(ingestionSource);
+    }
     return createShardConsumer(clientId, shardId);
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential NullPointerException when indexMetadata.getIngestionSource() returns null. Adding a null check improves backward compatibility and robustness, though the impact depends on whether null ingestion sources are expected in practice.

Medium

Previous suggestions

Suggestions up to commit 681f1ee
CategorySuggestion                                                                                                                                    Impact
Possible issue
Replace assertions with runtime validation

Replace assertions with proper null checks and exception handling. Assertions can be
disabled at runtime with -da flag, leaving the code vulnerable to
NullPointerException in production environments.

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java [574-576]

 IndexMetadata indexMetadata = engineConfig.getIndexSettings().getIndexMetadata();
-assert indexMetadata != null;
-assert indexMetadata.getIngestionSource() != null;
+if (indexMetadata == null || indexMetadata.getIngestionSource() == null) {
+    throw new IllegalStateException("IndexMetadata and IngestionSource must not be null for ingestion source params update");
+}
Suggestion importance[1-10]: 8

__

Why: This is a valid concern as assertions can be disabled in production with the -da flag, potentially allowing NullPointerException to occur. Replacing assertions with explicit runtime checks ensures the validation always occurs, improving robustness in production environments. The suggestion correctly identifies a reliability issue in critical ingestion code.

Medium
Add null check for ingestion source

Add null check for indexMetadata.getIngestionSource() before calling initialize().
If the ingestion source is null, this will cause a NullPointerException during
consumer creation, breaking the ingestion pipeline.

server/src/main/java/org/opensearch/index/IngestionConsumerFactory.java [91-94]

 default T createShardConsumer(String clientId, int shardId, IndexMetadata indexMetadata) {
-    initialize(indexMetadata.getIngestionSource());
+    IngestionSource ingestionSource = indexMetadata.getIngestionSource();
+    if (ingestionSource == null) {
+        throw new IllegalArgumentException("IndexMetadata must contain a non-null IngestionSource");
+    }
+    initialize(ingestionSource);
     return createShardConsumer(clientId, shardId);
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential NullPointerException risk when indexMetadata.getIngestionSource() returns null. However, the PR code already includes an assertion for this in IngestionEngine.java (line 576), suggesting this scenario is expected to be prevented upstream. The suggestion provides defensive programming but may be redundant given the existing safeguards.

Medium
Suggestions up to commit ecf1ed1
CategorySuggestion                                                                                                                                    Impact
Possible issue
Replace assertions with runtime checks

Replace assertions with proper null checks and exception handling. Assertions can be
disabled at runtime with -da flag, leaving the code vulnerable to
NullPointerException in production environments.

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java [574-576]

 IndexMetadata indexMetadata = engineConfig.getIndexSettings().getIndexMetadata();
-assert indexMetadata != null;
-assert indexMetadata.getIngestionSource() != null;
+if (indexMetadata == null || indexMetadata.getIngestionSource() == null) {
+    throw new IllegalStateException("IndexMetadata and IngestionSource must not be null for ingestion source params update");
+}
Suggestion importance[1-10]: 8

__

Why: Valid concern about assertions being disabled in production with -da flag. Replacing assertions with explicit runtime checks ensures the validation always occurs, preventing potential NullPointerException in production environments. This is a good defensive programming practice for critical validation logic.

Medium
Add null check for ingestion source

Add null check for indexMetadata.getIngestionSource() before calling initialize().
If the ingestion source is null, this will cause a NullPointerException during
consumer creation, which could crash the ingestion pipeline.

server/src/main/java/org/opensearch/index/IngestionConsumerFactory.java [72-75]

 default T createShardConsumer(String clientId, int shardId, IndexMetadata indexMetadata) {
-    initialize(indexMetadata.getIngestionSource());
+    IngestionSource ingestionSource = indexMetadata.getIngestionSource();
+    if (ingestionSource == null) {
+        throw new IllegalArgumentException("IndexMetadata must contain a non-null IngestionSource");
+    }
+    initialize(ingestionSource);
     return createShardConsumer(clientId, shardId);
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential NullPointerException risk when indexMetadata.getIngestionSource() returns null. However, the PR already includes an assertion for this in IngestionEngine.java (line 576), suggesting the codebase expects this to be non-null. The suggestion adds defensive programming but may be redundant given existing safeguards.

Medium
Suggestions up to commit d0f0781
CategorySuggestion                                                                                                                                    Impact
Possible issue
Add null check for ingestion source

Add null check for indexMetadata.getIngestionSource() before calling initialize().
If the ingestion source is null, this will cause a NullPointerException during
consumer creation, breaking the backward compatibility path.

server/src/main/java/org/opensearch/index/IngestionConsumerFactory.java [72-75]

 default T createShardConsumer(String clientId, int shardId, IndexMetadata indexMetadata) {
-    initialize(indexMetadata.getIngestionSource());
+    IngestionSource ingestionSource = indexMetadata.getIngestionSource();
+    if (ingestionSource == null) {
+        throw new IllegalArgumentException("IndexMetadata must contain a non-null IngestionSource");
+    }
+    initialize(ingestionSource);
     return createShardConsumer(clientId, shardId);
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential NullPointerException in the backward compatibility path when indexMetadata.getIngestionSource() returns null. However, the PR already includes an assertion for this in IngestionEngine.java (line 576), suggesting the codebase expects non-null ingestion sources. The suggestion adds defensive programming but may be overly cautious given the existing design assumptions.

Medium
Replace assertions with runtime checks

Replace assertions with proper null checks and exception handling. Assertions can be
disabled at runtime with -da flag, leaving the code vulnerable to
NullPointerException in production environments.

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java [574-576]

 IndexMetadata indexMetadata = engineConfig.getIndexSettings().getIndexMetadata();
-assert indexMetadata != null;
-assert indexMetadata.getIngestionSource() != null;
+if (indexMetadata == null || indexMetadata.getIngestionSource() == null) {
+    throw new IllegalStateException("IndexMetadata and IngestionSource must not be null for ingestion source params update");
+}
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly points out that assertions can be disabled at runtime. However, assertions are commonly used in Java for internal invariant checks that should never fail in correct code. The score is moderate because while replacing with runtime checks adds safety, it may not align with the codebase's assertion usage patterns, and the context suggests these conditions are expected to always hold.

Low

…urce when initiating the shardConsumer

Signed-off-by: Patrick Zhai <pzhai@uber.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit ecf1ed1

Signed-off-by: Patrick Zhai <pzhai@uber.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 681f1ee

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 681f1ee: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Patrick Zhai <pzhai@uber.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 0dc1992

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 0dc1992: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 27, 2026

Codecov Report

❌ Patch coverage is 66.66667% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.39%. Comparing base (8175de0) to head (0dc1992).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...org/opensearch/index/IngestionConsumerFactory.java 0.00% 2 Missing ⚠️
...nsearch/plugin/kinesis/KinesisConsumerFactory.java 0.00% 1 Missing ⚠️
...a/org/opensearch/index/engine/IngestionEngine.java 66.66% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21841      +/-   ##
============================================
- Coverage     73.42%   73.39%   -0.03%     
+ Complexity    75450    75422      -28     
============================================
  Files          6034     6032       -2     
  Lines        342533   342518      -15     
  Branches      49263    49261       -2     
============================================
- Hits         251508   251395     -113     
- Misses        71048    71092      +44     
- Partials      19977    20031      +54     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant