Skip to content

Analytics engine - add support for index patterns, aliases, multi-index search#21822

Open
mch2 wants to merge 5 commits into
opensearch-project:mainfrom
mch2:incremental-shard-tasks
Open

Analytics engine - add support for index patterns, aliases, multi-index search#21822
mch2 wants to merge 5 commits into
opensearch-project:mainfrom
mch2:incremental-shard-tasks

Conversation

@mch2
Copy link
Copy Markdown
Member

@mch2 mch2 commented May 25, 2026

Description

Summary

Adds support for alias, index pattern, and multi-index queries to the analytics engine. A query like source=my_alias now fans out across all backing indices, with schema widening to null-fill columns that individual shards don't have.

Planning: When the planner encounters a table name that resolves to multiple indices, it validates that any field shared across indices has a compatible type (e.g. text and keyword both map to VARCHAR —
fine; long vs keyword — rejected with a clear error). It then builds a single scan node whose row type is the union of all fields across all backing indices, and routes the query to shards on every backing index.

Execution: Each shard registers its local parquet table with the schema inferred from its files. But the Substrait plan references the full union schema. To bridge the gap, during session creation extract the plan's declared schema (base_schema) and appends any columns this shard doesn't have as nullable. DataFusion's built-in adapter then produces null values for those columns at read time.

For single-index queries this widening is a no-op: the plan's schema matches the shard's schema, so the field-name comparison exits immediately with no work done.

Table name binding: The plan references the logical name (e.g. "my_alias"), but each shard knows itself by its concrete index name (e.g. "idx_a"). Session creation extracts the logical name from the plan's NamedTable and registers the table under it, so the Substrait consumer can bind. For single-index queries, the logical name equals the concrete name — same code path, no branching.

Wildcard support (test frontend): The PPL frontend's schema lookup was flattening tables into a static map, losing the lazy resolution that handles wildcards. Fixed by delegating get() to the underlying
schema which resolves expressions on demand.

Filter delegation (indexed path): The indexed execution path (filter delegation to Lucene) now also receives plan bytes, enabling multi-index support for queries with MATCH predicates across aliases.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Enables queries against aliases, wildcard patterns, and comma-separated index
expressions. The planner resolves these to concrete indices, validates schema
compatibility, and builds a union row type. At the data node, Rust widens the
registered ListingTable schema from the plan's base_schema so DataFusion
null-fills columns this shard doesn't have.

Key components:
- IndexResolution: expands aliases/patterns to concrete indices, validates
  field type compatibility, rejects filter aliases and data streams
- FieldStorageResolver.merged(): unions per-field storage across backing indices
- ShardTargetResolver: fans out shard routing across all concrete indices
- widen_schema_from_plan (Rust): appends missing nullable columns to the
  ListingTable using from_substrait_named_struct for type conversion
- UnifiedQueryService: preserves lazy table resolution for wildcard support
- Indexed execution (filter delegation): now passes plan bytes to Rust,
  enabling multi-index support on the delegation path

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
@mch2 mch2 requested a review from a team as a code owner May 25, 2026 06:43
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 25, 2026

PR Reviewer Guide 🔍

(Review updated until commit bd5a25f)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

The resolve method with IndexNameExpressionResolver parameter can return a fallback result from metadata().index(name) when the resolver is null or returns no indices. This fallback path is intended for test contexts but could mask configuration issues in production if the resolver is unexpectedly null or if getIndicesLookup() is unpopulated. Production code paths should fail fast rather than silently falling back to a potentially incomplete resolution.

IndexMetadata direct = clusterState.metadata().index(name);
if (direct != null) {
    return new IndexResolution(name, List.of(direct));
}
throw new IllegalArgumentException("Index or alias [" + name + "] not found in cluster state");
Possible Issue

In orderedColumns, when targetColumnOrder is provided but a column name is not found in the batch, an IllegalStateException is thrown. However, if targetColumnOrder is null or empty, the method returns batch.getFieldVectors() directly without validation. This inconsistency means that a missing column is only caught when explicit ordering is requested. If the batch schema genuinely differs from expectations in the no-ordering path, the error surfaces later as an index-out-of-bounds or value mismatch rather than a clear diagnostic.

private static List<FieldVector> orderedColumns(VectorSchemaRoot batch, List<String> targetColumnOrder) {
    if (targetColumnOrder == null || targetColumnOrder.isEmpty()) {
        return batch.getFieldVectors();
    }
    List<FieldVector> ordered = new ArrayList<>(targetColumnOrder.size());
    for (String name : targetColumnOrder) {
        FieldVector vector = batch.getVector(name);
        if (vector == null) {
            throw new IllegalStateException(
                "Column [" + name + "] expected by plan row type not found in batch schema: " + batch.getSchema().getFields()
            );
        }
        ordered.add(vector);
    }
    return ordered;
}
Possible Issue

The resolveTable method catches IndexNotFoundException and returns null, which causes Calcite to report "table not found". However, if the resolver throws a different exception (e.g., security denial, cluster state inconsistency), that exception is not caught and will propagate up, potentially causing an unexpected failure mode. The method should either catch broader exceptions or document that only IndexNotFoundException is handled.

private static Table resolveTable(ClusterState clusterState, IndexNameExpressionResolver resolver, String expression) {
    String[] concrete;
    try {
        // Comma-split first: concreteIndexNames treats each vararg as one expression, and
        // splitting lets the resolver honor exclusions across tokens (e.g. "test*,-test1").
        concrete = resolver.concreteIndexNames(
            clusterState,
            IndicesOptions.lenientExpandOpen(),
            Strings.splitStringByCommaToArray(expression)
        );
    } catch (IndexNotFoundException e) {
        return null;
    }
Possible Issue

In widen_schema_from_plan, if from_substrait_named_struct fails, the function returns the original inferred schema without logging the error. This silent fallback means that a malformed plan or unsupported type in the base_schema will cause the widening to be skipped, and the query may fail later with a confusing "column not found" error instead of a clear "plan decode failed" message at session creation time.

let consumer = DefaultSubstraitConsumer::new(&extensions, &state);
let df_schema = match from_substrait_named_struct(&consumer, &base_schema) {
    Ok(s) => s,
    Err(_) => return Arc::clone(inferred),
};
let expected = df_schema.as_arrow().clone();

@mch2 mch2 changed the title Incremental shard tasks Analytics engine - add support for index patterns, aliases, multi-index search May 25, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 25, 2026

PR Code Suggestions ✨

Latest suggestions up to bd5a25f

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Guard against null pointer dereference

The hasPlan condition checks planBytes != null but then unconditionally accesses
planBytes.length in the ternary expressions. If planBytes is null, this will throw a
NullPointerException. Either ensure planBytes is never null at the call site or add
explicit null guards before accessing .length.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/nativelib/NativeBridge.java [1123-1125]

 boolean hasPlan = planBytes != null && planBytes.length > 0;
-MemorySegment planSegment = hasPlan ? call.bytes(planBytes) : MemorySegment.NULL;
-long planLen = hasPlan ? planBytes.length : 0L;
+MemorySegment planSegment = (planBytes != null && planBytes.length > 0) ? call.bytes(planBytes) : MemorySegment.NULL;
+long planLen = (planBytes != null) ? planBytes.length : 0L;
Suggestion importance[1-10]: 7

__

Why: The hasPlan boolean is computed correctly, but the subsequent ternary expressions redundantly check planBytes != null and planBytes.length > 0 again. While not a bug (the first check ensures planBytes is non-null before .length is accessed), the redundancy is unnecessary. The suggestion to add explicit null guards in the ternary expressions is valid for defensive coding, though the existing code is already safe due to the hasPlan gate.

Medium
General
Catch broader exceptions during resolution

The method catches IndexNotFoundException and returns null, but the calling code in
OpenSearchSchemaBuilder.resolveTable expects null to mean "no table found" and
silently skips registration. If the resolver throws other exceptions (e.g., security
violations, cluster state issues), they will propagate uncaught and fail the schema
build. Consider catching broader exceptions or documenting the narrow exception
contract.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/IndexResolution.java [115-121]

 String[] concrete;
 try {
     concrete = resolver.concreteIndexNames(
         clusterState,
         IndicesOptions.lenientExpandOpen(),
         Strings.splitStringByCommaToArray(expression)
     );
-} catch (IndexNotFoundException e) {
+} catch (Exception e) {
     return null;
 }
Suggestion importance[1-10]: 5

__

Why: The method catches IndexNotFoundException and returns null, which is the intended behavior for "no match" cases. Catching broader exceptions (e.g., Exception) would mask real errors (security violations, cluster state corruption) that should propagate rather than being silently swallowed. The narrow exception contract is correct; the suggestion to catch Exception would reduce debuggability.

Low
Capture schema before potential closure

The method throws IllegalStateException when a column is missing, but the batch has
already been closed in the finally block of the caller. If the exception is thrown
after the batch is closed, the error message references a closed batch's schema,
which may be invalid or misleading. Consider validating the schema before closing
the batch or capturing the schema separately for error reporting.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [323-338]

 private static List<FieldVector> orderedColumns(VectorSchemaRoot batch, List<String> targetColumnOrder) {
     if (targetColumnOrder == null || targetColumnOrder.isEmpty()) {
         return batch.getFieldVectors();
     }
     List<FieldVector> ordered = new ArrayList<>(targetColumnOrder.size());
     for (String name : targetColumnOrder) {
         FieldVector vector = batch.getVector(name);
         if (vector == null) {
+            String schemaSnapshot = batch.getSchema().getFields().toString();
             throw new IllegalStateException(
-                "Column [" + name + "] expected by plan row type not found in batch schema: " + batch.getSchema().getFields()
+                "Column [" + name + "] expected by plan row type not found in batch schema: " + schemaSnapshot
             );
         }
         ordered.add(vector);
     }
     return ordered;
 }
Suggestion importance[1-10]: 3

__

Why: The concern about referencing a closed batch's schema in the error message is theoretical but unlikely in practice: the orderedColumns helper is called BEFORE the batch is closed (inside the try block), so the schema is still valid when the exception is thrown. Capturing the schema separately adds no real safety and introduces unnecessary overhead. The suggestion is overly defensive for a non-issue.

Low

Previous suggestions

Suggestions up to commit 0e1972a
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fail fast on table name extraction

The unwrap_or_else fallback logs an error but silently continues with the concrete
name, which may cause the Substrait consumer to fail binding. Consider returning an
error instead of falling back, or validate that the fallback is intentional for
production scenarios where plan parsing can legitimately fail.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [239-246]

 let register_name = if !plan_bytes.is_empty() {
-    crate::api::first_named_table_name(plan_bytes).unwrap_or_else(|| {
-        error!("create_session_context: failed to extract table name from plan, falling back to concrete name: {}", table_name);
-        table_name.to_string()
-    })
+    crate::api::first_named_table_name(plan_bytes).ok_or_else(|| {
+        DataFusionError::Execution(format!(
+            "Failed to extract table name from plan for table '{}'; cannot register table",
+            table_name
+        ))
+    })?
 } else {
     table_name.to_string()
 };
Suggestion importance[1-10]: 7

__

Why: The fallback to table_name when plan parsing fails could mask issues in multi-index queries where the logical name differs from the concrete name. Failing fast would surface the problem earlier, though the current fallback may be intentional for robustness.

Medium
Fail when base schema is missing

Logging a warning and silently skipping widening may hide a critical schema mismatch
in multi-index queries. If the plan is expected to contain a base_schema for the
table, consider returning an error instead of falling back to the inferred schema.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [89-93]

-let Some(base_schema) = crate::api::base_schema_for_table(&plan, table_name) else {
-    log::warn!("widen_schema_from_plan: no base_schema found for table '{}' in plan — skipping widening", table_name);
-    return Arc::clone(inferred);
-};
+let base_schema = crate::api::base_schema_for_table(&plan, table_name).ok_or_else(|| {
+    DataFusionError::Execution(format!(
+        "No base_schema found for table '{}' in plan; cannot widen schema for multi-index query",
+        table_name
+    ))
+})?;
Suggestion importance[1-10]: 6

__

Why: Silently skipping widening when base_schema is absent could hide issues in multi-index queries, but the current behavior may be intentional for single-index queries where no plan is supplied. The warning log provides visibility.

Low
General
Preserve exception context on resolution failure

Returning null on IndexNotFoundException causes the caller to report "table not
found" generically. Preserve the original exception or wrap it with context so the
user sees the actual missing index name from the resolver's error message.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/IndexResolution.java [111-122]

 String[] concrete;
 try {
     concrete = resolver.concreteIndexNames(
         clusterState,
         IndicesOptions.lenientExpandOpen(),
         Strings.splitStringByCommaToArray(expression)
     );
 } catch (IndexNotFoundException e) {
-    return null;
+    throw new IllegalArgumentException("Index or alias [" + expression + "] not found: " + e.getMessage(), e);
 }
Suggestion importance[1-10]: 6

__

Why: Returning null on IndexNotFoundException loses the resolver's error details. Wrapping the exception would provide better diagnostics, though the current approach does eventually surface a "not found" error at the caller.

Low
Clarify schema mismatch error message

The method throws IllegalStateException when a column is missing, but this is a data
integrity issue that should fail the query. Consider using a more specific exception
type or adding context about which stage/shard produced the mismatched batch to aid
debugging.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [323-338]

 private static List<FieldVector> orderedColumns(VectorSchemaRoot batch, List<String> targetColumnOrder) {
     if (targetColumnOrder == null || targetColumnOrder.isEmpty()) {
         return batch.getFieldVectors();
     }
     List<FieldVector> ordered = new ArrayList<>(targetColumnOrder.size());
     for (String name : targetColumnOrder) {
         FieldVector vector = batch.getVector(name);
         if (vector == null) {
             throw new IllegalStateException(
-                "Column [" + name + "] expected by plan row type not found in batch schema: " + batch.getSchema().getFields()
+                "Column [" + name + "] expected by plan row type not found in batch schema: " 
+                + batch.getSchema().getFields() + ". This indicates a schema mismatch between the plan and the data node output."
             );
         }
         ordered.add(vector);
     }
     return ordered;
 }
Suggestion importance[1-10]: 5

__

Why: Adding context about the schema mismatch cause would aid debugging, but the existing error message already includes the batch schema fields, so the improvement is marginal.

Low
Suggestions up to commit abf0dcb
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fail fast on missing table name

The unwrap_or_else fallback logs an error but silently proceeds with the concrete
name, which may cause the Substrait consumer to fail binding if the plan references
a different logical name. Consider returning an error instead of falling back, or
validate that the fallback is semantically correct for the query.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [239-247]

 let register_name = if !plan_bytes.is_empty() {
-    crate::api::first_named_table_name(plan_bytes).unwrap_or_else(|| {
-        error!("create_session_context: failed to extract table name from plan, falling back to concrete name: {}", table_name);
-        table_name.to_string()
-    })
+    crate::api::first_named_table_name(plan_bytes).ok_or_else(|| {
+        DataFusionError::Execution(format!(
+            "Failed to extract table name from plan for concrete index: {}", 
+            table_name
+        ))
+    })?
 } else {
     table_name.to_string()
 };
Suggestion importance[1-10]: 7

__

Why: The fallback to table_name when plan extraction fails could cause silent binding mismatches in multi-index queries. Failing fast with a clear error is safer than proceeding with potentially incorrect table registration.

Medium
General
Preserve exception context on resolution failure

Returning null on IndexNotFoundException causes the caller to report "table not
found" but swallows the original exception details. Preserve the exception context
by wrapping it in an IllegalArgumentException so the user sees why resolution failed
(e.g., security filtering, missing permissions).

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/IndexResolution.java [115-122]

 String[] concrete;
 try {
     concrete = resolver.concreteIndexNames(
         clusterState,
         IndicesOptions.lenientExpandOpen(),
         Strings.splitStringByCommaToArray(expression)
     );
 } catch (IndexNotFoundException e) {
-    return null;
+    throw new IllegalArgumentException("Index or alias [" + expression + "] not found: " + e.getMessage(), e);
 }
Suggestion importance[1-10]: 6

__

Why: Returning null on IndexNotFoundException loses diagnostic context (security filtering, permissions). Wrapping in IllegalArgumentException preserves the cause for better error reporting, though the current behavior (clean "table not found") is acceptable.

Low
Use appropriate exception for schema mismatch

The method throws IllegalStateException when a column is missing, but this is a data
integrity issue that should fail the query. Use a more specific exception type
(e.g., IllegalArgumentException or a custom exception) to distinguish schema
mismatches from internal state errors.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [323-339]

 private static List<FieldVector> orderedColumns(VectorSchemaRoot batch, List<String> targetColumnOrder) {
     if (targetColumnOrder == null || targetColumnOrder.isEmpty()) {
         return batch.getFieldVectors();
     }
     List<FieldVector> ordered = new ArrayList<>(targetColumnOrder.size());
     for (String name : targetColumnOrder) {
         FieldVector vector = batch.getVector(name);
         if (vector == null) {
-            throw new IllegalStateException(
+            throw new IllegalArgumentException(
                 "Column [" + name + "] expected by plan row type not found in batch schema: " + batch.getSchema().getFields()
             );
         }
         ordered.add(vector);
     }
     return ordered;
 }
Suggestion importance[1-10]: 5

__

Why: IllegalStateException is semantically incorrect for a schema mismatch (data integrity issue). IllegalArgumentException better conveys that the input (batch schema) doesn't match expectations, improving error clarity.

Low
Suggestions up to commit b0c13f5
CategorySuggestion                                                                                                                                    Impact
General
Use more appropriate exception type

The method throws IllegalStateException when a column is missing, but this is called
during result materialization after query execution. A missing column indicates a
serious planning/execution mismatch that should fail fast with a more specific
error. Consider using IllegalArgumentException or a custom exception type to better
signal this is a contract violation rather than a transient state issue.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [323-338]

 private static List<FieldVector> orderedColumns(VectorSchemaRoot batch, List<String> targetColumnOrder) {
     if (targetColumnOrder == null || targetColumnOrder.isEmpty()) {
         return batch.getFieldVectors();
     }
     List<FieldVector> ordered = new ArrayList<>(targetColumnOrder.size());
     for (String name : targetColumnOrder) {
         FieldVector vector = batch.getVector(name);
         if (vector == null) {
-            throw new IllegalStateException(
+            throw new IllegalArgumentException(
                 "Column [" + name + "] expected by plan row type not found in batch schema: " + batch.getSchema().getFields()
             );
         }
         ordered.add(vector);
     }
     return ordered;
 }
Suggestion importance[1-10]: 3

__

Why: While IllegalArgumentException might be slightly more semantically appropriate than IllegalStateException for a contract violation, both are acceptable runtime exceptions for this scenario. The change is minor and doesn't significantly impact correctness or maintainability.

Low
Suggestions up to commit 576c8e8
CategorySuggestion                                                                                                                                    Impact
General
Validate nested field type compatibility

The validation only checks top-level properties and skips nested object fields. A
conflict in a nested field (e.g., user.age being long in one index and keyword in
another) won't be caught, potentially causing runtime failures when the query
references that nested path.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/IndexResolution.java [182-238]

 private static void validateSchemaCompatibility(String aliasName, List<IndexMetadata> backing) {
-    ...
+    record Decl(String type, String sourceIndex) {}
+    java.util.Map<String, Decl> declared = new java.util.LinkedHashMap<>();
     for (IndexMetadata index : backing) {
         MappingMetadata mm = index.mapping();
-        if (mm == null) {
-            continue;
-        }
+        if (mm == null) continue;
         Map<String, Object> source = mm.sourceAsMap();
         Object props = source.get("properties");
-        if (!(props instanceof Map<?, ?> propsMap)) {
-            continue;
+        if (!(props instanceof Map<?, ?> propsMap)) continue;
+        validateFieldsRecursive(propsMap, "", declared, index.getIndex().getName(), aliasName);
+    }
+}
+
+@SuppressWarnings("unchecked")
+private static void validateFieldsRecursive(Map<?, ?> properties, String prefix, Map<String, Decl> declared, String indexName, String aliasName) {
+    for (Map.Entry<?, ?> entry : properties.entrySet()) {
+        String fieldName = prefix.isEmpty() ? String.valueOf(entry.getKey()) : prefix + "." + entry.getKey();
+        Object fieldDef = entry.getValue();
+        if (!(fieldDef instanceof Map<?, ?> fieldMap)) continue;
+        String type = Objects.toString(fieldMap.get("type"), null);
+        if (type != null) {
+            Decl previous = declared.get(fieldName);
+            if (previous == null) {
+                declared.put(fieldName, new Decl(type, indexName));
+            } else if (!Objects.equals(OpenSearchSchemaBuilder.mapFieldType(previous.type), OpenSearchSchemaBuilder.mapFieldType(type))) {
+                throw new IllegalStateException("Alias [" + aliasName + "] resolves to indices with incompatible field types: [" + fieldName + "] is [" + previous.type + "] in [" + previous.sourceIndex + "] but [" + type + "] in [" + indexName + "]");
+            }
         }
-        for (Map.Entry<?, ?> entry : propsMap.entrySet()) {
-            String fieldName = String.valueOf(entry.getKey());
-            Object fieldDef = entry.getValue();
-            if (!(fieldDef instanceof Map<?, ?> fieldMap)) {
-                continue;
-            }
-            String type = Objects.toString(fieldMap.get("type"), null);
-            ...
+        Object nestedProps = fieldMap.get("properties");
+        if (nestedProps instanceof Map<?, ?> nestedMap) {
+            validateFieldsRecursive(nestedMap, fieldName, declared, indexName, aliasName);
+        }
+    }
+}
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that the current validation only checks top-level fields and misses nested field conflicts. This is a real limitation that could cause runtime failures. The proposed recursive validation would catch these cases, improving correctness. The impact is moderate since nested field conflicts are less common but not negligible.

Medium
Fail fast on plan extraction failure

The fallback to table_name when plan extraction fails may silently mask plan
corruption or schema mismatches. Consider returning an error instead of falling
back, so the query fails fast with a clear diagnostic rather than proceeding with
potentially incorrect table binding.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [235-243]

 let register_name = if !plan_bytes.is_empty() {
-    crate::api::first_named_table_name(plan_bytes).unwrap_or_else(|| {
-        error!("create_session_context: failed to extract table name from plan, falling back to concrete name: {}", table_name);
-        table_name.to_string()
-    })
+    crate::api::first_named_table_name(plan_bytes).ok_or_else(|| {
+        DataFusionError::Execution(format!(
+            "create_session_context: failed to extract table name from plan for concrete index: {}", 
+            table_name
+        ))
+    })?
 } else {
     table_name.to_string()
 };
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that silently falling back to table_name when plan extraction fails could mask issues. Failing fast would provide clearer diagnostics. However, the fallback is logged as an error, so the issue is not completely silent. The improvement is moderate.

Low
Validate cluster state lookup upfront

The null check for lookup is defensive but if it's null, the subsequent
lookup.get(name) would NPE. The code should either fail fast when lookup is null or
handle it consistently. Consider validating cluster state completeness upfront.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/IndexResolution.java [97-106]

 public static IndexResolution resolve(String name, ClusterState clusterState, IndexNameExpressionResolver resolver) {
     SortedMap<String, IndexAbstraction> lookup = clusterState.metadata().getIndicesLookup();
-    IndexAbstraction abstraction = lookup == null ? null : lookup.get(name);
+    if (lookup == null) {
+        throw new IllegalStateException("Cluster state metadata indices lookup is null");
+    }
+    IndexAbstraction abstraction = lookup.get(name);
     if (abstraction != null) {
         return switch (abstraction.getType()) {
             case CONCRETE_INDEX -> new IndexResolution(name, abstraction.getIndices());
             case ALIAS -> resolveAlias(name, abstraction.getIndices());
             case DATA_STREAM -> throw new IllegalStateException("Data stream [" + name + "] is not supported by analytics queries");
         };
     }
     ...
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies a potential NPE if lookup is null. However, in production OpenSearch, getIndicesLookup() should never return null for a valid ClusterState. The defensive check is reasonable but the impact is low since the condition is unlikely.

Low
Limit error message field enumeration

The error message constructs a potentially large string by calling
batch.getSchema().getFields() which may contain many fields. For large schemas, this
could cause memory pressure or log bloat. Consider limiting the fields shown or
using a more concise representation.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [323-338]

 private static List<FieldVector> orderedColumns(VectorSchemaRoot batch, List<String> targetColumnOrder) {
     if (targetColumnOrder == null || targetColumnOrder.isEmpty()) {
         return batch.getFieldVectors();
     }
     List<FieldVector> ordered = new ArrayList<>(targetColumnOrder.size());
     for (String name : targetColumnOrder) {
         FieldVector vector = batch.getVector(name);
         if (vector == null) {
+            List<String> availableFields = batch.getSchema().getFields().stream()
+                .map(f -> f.getName())
+                .limit(10)
+                .collect(java.util.stream.Collectors.toList());
             throw new IllegalStateException(
-                "Column [" + name + "] expected by plan row type not found in batch schema: " + batch.getSchema().getFields()
+                "Column [" + name + "] expected by plan row type not found in batch schema. Available fields (first 10): " + availableFields
             );
         }
         ordered.add(vector);
     }
     return ordered;
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that large schemas could produce verbose error messages. However, the impact is minor since this is an error path that should rarely execute, and the full field list aids debugging. The improvement is marginal.

Low
Suggestions up to commit c2ecd23
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fail fast on table name extraction

The fallback to table_name when extraction fails may silently cause binding
mismatches in multi-index scenarios. Consider returning an error instead of falling
back, since a failed extraction indicates a malformed plan that should not proceed.

sandbox/plugins/analytics-backend-datafusion/rust/src/session_context.rs [235-246]

 let register_name = if !plan_bytes.is_empty() {
-    crate::api::first_named_table_name(plan_bytes).unwrap_or_else(|| {
-        error!("create_session_context: failed to extract table name from plan, falling back to concrete name: {}", table_name);
-        table_name.to_string()
-    })
+    crate::api::first_named_table_name(plan_bytes).ok_or_else(|| {
+        DataFusionError::Execution(format!(
+            "create_session_context: failed to extract table name from plan for concrete index: {}", 
+            table_name
+        ))
+    })?
 } else {
     table_name.to_string()
 };
Suggestion importance[1-10]: 7

__

Why: The suggestion to fail fast when table name extraction fails instead of falling back to table_name is valid. The fallback could mask a malformed plan and cause binding mismatches in multi-index scenarios. Returning an error provides clearer failure semantics and prevents silent incorrect behavior.

Medium
Add null check for indices lookup

The null check for lookup should be handled more defensively. If getIndicesLookup()
returns null, the subsequent lookup.get(name) will throw NPE. Consider throwing a
clear exception when lookup is null to avoid silent failures.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/IndexResolution.java [97-119]

 public static IndexResolution resolve(String name, ClusterState clusterState, IndexNameExpressionResolver resolver) {
     SortedMap<String, IndexAbstraction> lookup = clusterState.metadata().getIndicesLookup();
-    IndexAbstraction abstraction = lookup == null ? null : lookup.get(name);
+    if (lookup == null) {
+        throw new IllegalStateException("Indices lookup is unavailable in cluster state");
+    }
+    IndexAbstraction abstraction = lookup.get(name);
     if (abstraction != null) {
         return switch (abstraction.getType()) {
             case CONCRETE_INDEX -> new IndexResolution(name, abstraction.getIndices());
             case ALIAS -> resolveAlias(name, abstraction.getIndices());
             case DATA_STREAM -> throw new IllegalStateException("Data stream [" + name + "] is not supported by analytics queries");
         };
     }
     if (resolver != null) {
         String[] expressions = Strings.splitStringByCommaToArray(name);
         String[] concrete;
         try {
             concrete = resolver.concreteIndexNames(
                 clusterState,
                 IndicesOptions.lenientExpandOpen(),
                 expressions
             );
         } catch (IndexNotFoundException e) {
             throw new IllegalArgumentException("Index or alias [" + name + "] not found in cluster state", e);
         }
         ...
     }
Suggestion importance[1-10]: 3

__

Why: While adding an explicit null check for lookup is defensive, the existing code already handles the null case by checking abstraction != null after lookup.get(name). The suggestion adds clarity but doesn't fix a real bug since lookup being null would result in abstraction being null, which is already handled.

Low
General
Validate nested field type compatibility

The validation only checks top-level properties and skips nested object fields. A
conflict in nested fields (e.g., user.age being long vs keyword) won't be caught,
potentially causing runtime failures during query execution.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/IndexResolution.java [182-238]

 private static void validateSchemaCompatibility(String aliasName, List<IndexMetadata> backing) {
-    ...
+    record Decl(String type, String sourceIndex) {}
+    java.util.Map<String, Decl> declared = new java.util.LinkedHashMap<>();
     for (IndexMetadata index : backing) {
         MappingMetadata mm = index.mapping();
-        if (mm == null) {
-            continue;
-        }
+        if (mm == null) continue;
         Map<String, Object> source = mm.sourceAsMap();
         Object props = source.get("properties");
-        if (!(props instanceof Map<?, ?> propsMap)) {
-            continue;
+        if (!(props instanceof Map<?, ?> propsMap)) continue;
+        validateFieldsRecursive(propsMap, "", declared, index.getIndex().getName(), aliasName);
+    }
+}
+
+private static void validateFieldsRecursive(Map<?, ?> props, String prefix, Map<String, Decl> declared, String indexName, String aliasName) {
+    for (Map.Entry<?, ?> entry : props.entrySet()) {
+        String fieldName = prefix.isEmpty() ? String.valueOf(entry.getKey()) : prefix + "." + entry.getKey();
+        Object fieldDef = entry.getValue();
+        if (!(fieldDef instanceof Map<?, ?> fieldMap)) continue;
+        String type = Objects.toString(fieldMap.get("type"), null);
+        if (type != null) {
+            Decl previous = declared.get(fieldName);
+            if (previous == null) {
+                declared.put(fieldName, new Decl(type, indexName));
+            } else if (!Objects.equals(OpenSearchSchemaBuilder.mapFieldType(previous.type), OpenSearchSchemaBuilder.mapFieldType(type))) {
+                throw new IllegalStateException("Alias [" + aliasName + "] resolves to indices with incompatible field types: [" + fieldName + "] is [" + previous.type + "] in [" + previous.sourceIndex + "] but [" + type + "] in [" + indexName + "]");
+            }
         }
-        for (Map.Entry<?, ?> entry : propsMap.entrySet()) {
-            String fieldName = String.valueOf(entry.getKey());
-            Object fieldDef = entry.getValue();
-            if (!(fieldDef instanceof Map<?, ?> fieldMap)) {
-                continue;
-            }
-            String type = Objects.toString(fieldMap.get("type"), null);
-            if (type == null) {
-                continue;
-            }
-            ...
+        Object nestedProps = fieldMap.get("properties");
+        if (nestedProps instanceof Map<?, ?> nestedMap) {
+            validateFieldsRecursive(nestedMap, fieldName, declared, indexName, aliasName);
         }
     }
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that the current validation only checks top-level fields and would miss conflicts in nested object fields (e.g., user.age). The proposed recursive validation would catch these conflicts at planning time rather than at execution. However, the PR comment at line 186 explicitly states "Walks only top-level properties — a conflict on a nested object's leaf... is left to the data node's by-name binding check," indicating this is a known limitation with a documented fallback strategy.

Low
Optimize error message construction

The error message constructs a string representation of the batch schema fields
which may be expensive for large schemas. Consider logging the field names only or
limiting the output size to avoid performance degradation when exceptions occur
frequently.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [320-335]

 private static List<FieldVector> orderedColumns(VectorSchemaRoot batch, List<String> targetColumnOrder) {
     if (targetColumnOrder == null || targetColumnOrder.isEmpty()) {
         return batch.getFieldVectors();
     }
     List<FieldVector> ordered = new ArrayList<>(targetColumnOrder.size());
     for (String name : targetColumnOrder) {
         FieldVector vector = batch.getVector(name);
         if (vector == null) {
+            List<String> fieldNames = batch.getSchema().getFields().stream()
+                .map(f -> f.getName())
+                .collect(java.util.stream.Collectors.toList());
             throw new IllegalStateException(
-                "Column [" + name + "] expected by plan row type not found in batch schema: " + batch.getSchema().getFields()
+                "Column [" + name + "] expected by plan row type not found in batch schema. Available fields: " + fieldNames
             );
         }
         ordered.add(vector);
     }
     return ordered;
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion to extract field names instead of calling toString() on the entire Fields collection is a valid optimization that reduces the cost of exception construction. However, this is a minor performance improvement that only matters when exceptions are thrown, so the impact is limited.

Low

@mch2 mch2 force-pushed the incremental-shard-tasks branch from 1ace488 to c2ecd23 Compare May 25, 2026 06:57
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c2ecd23

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for c2ecd23: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 25, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.42%. Comparing base (0c60e50) to head (0e1972a).

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21822      +/-   ##
============================================
+ Coverage     73.34%   73.42%   +0.08%     
+ Complexity    75417    75404      -13     
============================================
  Files          6032     6032              
  Lines        342404   342404              
  Branches      49235    49235              
============================================
+ Hits         251142   251425     +283     
+ Misses        71272    70915     -357     
- Partials      19990    20064      +74     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 576c8e8

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 576c8e8: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 576c8e8: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 576c8e8: SUCCESS

Copy link
Copy Markdown
Member

@sandeshkr419 sandeshkr419 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes @mch2

Let's add up test cases for these as well if:

  • case-insensitive table resolution
  • test for findTableName on a multi-input join/union shape
  • rust tests for widen_schema_from_plan
  • first_named_table_name / base_schema_for_table with multi-scan plans
  • nested field schema mismatch
  • IndexResolutionTests tests for the closed-index case on the concrete-index path.
  • IndexResolutionTests.testMissingNameThrows doesn't pass a resolver, so it can't actually exercise the wildcard fallback, check once.
  • OpenSearchSchemaBuilderTests.testCommaSeparatedSourcesResolveToUnionedTable doesn't test field-conflict behavior.

(I may have missed in the code if some of the above tests actually exist and I overlooked through them - just validate that they do please)

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit b0c13f5

- Case-insensitive table resolution test (OpenSearchSchemaBuilderTests)
- findTableName on join/union shapes (RelNodeUtilsTests) + generalize
  findTableName to match any TableScan, not just OpenSearchTableScan
- Field-conflict test for comma-separated sources (first-wins semantics)
- IndexResolution: concrete name with resolver, exclusion pattern test
- Rust: empty/garbage input tests for first_named_table_name
- Rust: widen_schema_from_plan noop tests (empty plan, all cols present)

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
@mch2 mch2 force-pushed the incremental-shard-tasks branch from b0c13f5 to abf0dcb Compare May 27, 2026 01:29
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit abf0dcb

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 0e1972a

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 0e1972a: SUCCESS

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
@mch2 mch2 force-pushed the incremental-shard-tasks branch from 0e1972a to bd5a25f Compare May 27, 2026 06:37
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit bd5a25f

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for bd5a25f: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Comment on lines +70 to +74
for (String name : clusterState.metadata().getIndicesLookup().keySet()) {
Table table = resolveTable(clusterState, resolver, name);
if (table != null) {
super.put(name, table);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be pretty expensive right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants