Skip to content

Implement FieldMappingIngestionMessageMapper for pull-based ingestion#20729

Open
imRishN wants to merge 24 commits intoopensearch-project:mainfrom
imRishN:field-mapper-imp
Open

Implement FieldMappingIngestionMessageMapper for pull-based ingestion#20729
imRishN wants to merge 24 commits intoopensearch-project:mainfrom
imRishN:field-mapper-imp

Conversation

@imRishN
Copy link
Member

@imRishN imRishN commented Feb 25, 2026

Description

Implements the field_mapping mapper type that extracts _id, _version, and _op_type from configurable top-level fields in raw stream messages. Extracted fields are removed from _source. Supports boolean op_type detection by default, with an optional delete_value setting for custom value matching (e.g., "Y"/"N", "expired", integer flags).

Also updates the mapper factory to accept mapper settings and wires mapper settings from IngestionSource through DefaultStreamPoller.

Related Issues

Resolves #20728

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions github-actions bot added enhancement Enhancement or improvement to existing feature or request missing-component labels Feb 25, 2026
@github-actions
Copy link
Contributor

github-actions bot commented Feb 25, 2026

PR Reviewer Guide 🔍

(Review updated until commit 5e1c081)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 Multiple PR themes

Sub-PR theme: Implement FieldMappingIngestionMessageMapper core logic and factory wiring

Relevant files:

  • server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java
  • server/src/test/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapperTests.java
  • server/src/main/java/org/opensearch/indices/pollingingest/mappers/IngestionMessageMapper.java
  • server/src/test/java/org/opensearch/indices/pollingingest/mappers/IngestionMessageMapperTests.java
  • server/src/main/java/org/opensearch/indices/pollingingest/mappers/RawPayloadIngestionMessageMapper.java

Sub-PR theme: Wire mapperSettings through IngestionEngine, DefaultStreamPoller, and validation

Relevant files:

  • server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java
  • server/src/main/java/org/opensearch/index/engine/IngestionEngine.java
  • server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java
  • server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java
  • CHANGELOG.md

⚡ Recommended focus areas for review

Silent Op Type Miss

When opTypeField is configured but the field is absent from the message, the operation type silently defaults to "index". This may be unexpected behavior — a missing op_type_field could indicate a malformed message. Consider whether this should throw or at least log a warning, especially when delete_value or create_value is configured.

if (opTypeField != null && rawPayload.containsKey(opTypeField)) {
    Object val = rawPayload.remove(opTypeField);
    validateScalar(opTypeField, val);
    String stringVal = String.valueOf(val).trim();
    payloadMap.put(OP_TYPE, resolveOpType(stringVal));
} else {
    payloadMap.put(OP_TYPE, OP_TYPE_INDEX);
}
Version Parsing

The version value is stored as a raw string (e.g., String.valueOf(versionValue).trim()) without validating that it represents a valid long integer. Downstream external versioning in OpenSearch requires a positive long. A non-numeric or negative version string will fail later with a less informative error. Consider validating the version is a parseable positive long at extraction time.

    Object versionValue = rawPayload.remove(versionField);
    validateScalar(versionField, versionValue);
    payloadMap.put(VersionFieldMapper.NAME, String.valueOf(versionValue).trim());
}
Mutable Source Map

rawPayload (returned by IngestionUtils.getParsedPayloadMap) is mutated directly via remove() calls and then placed as _source. If the parsed map is shared or reused elsewhere, this could cause subtle bugs. Verify that getParsedPayloadMap always returns a fresh mutable map, or make a defensive copy before mutating.

Map<String, Object> rawPayload = IngestionUtils.getParsedPayloadMap((byte[]) message.getPayload());
Map<String, Object> payloadMap = new HashMap<>();

// Extract _id
long autoGeneratedIdTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP;
if (idField != null) {
    if (rawPayload.containsKey(idField) == false) {
        throw new IllegalArgumentException("configured id_field [" + idField + "] is missing from the message");
    }
    Object idValue = rawPayload.remove(idField);
    validateScalar(idField, idValue);
    String idStr = String.valueOf(idValue).trim();
    if (idStr.isEmpty()) {
        throw new IllegalArgumentException("field [" + idField + "] has an empty value");
    }
    payloadMap.put(ID, idStr);
} else {
    String id = RequestUtils.generateID();
    payloadMap.put(ID, id);
    autoGeneratedIdTimestamp = System.currentTimeMillis();
}

// Extract _version
if (versionField != null) {
    if (rawPayload.containsKey(versionField) == false) {
        throw new IllegalArgumentException("configured version_field [" + versionField + "] is missing from the message");
    }
    Object versionValue = rawPayload.remove(versionField);
    validateScalar(versionField, versionValue);
    payloadMap.put(VersionFieldMapper.NAME, String.valueOf(versionValue).trim());
}

// Extract _op_type
// The op_type_field value is matched against delete_value and create_value to determine the operation type.
// If neither delete_value nor create_value is configured, the field is still removed from _source but
// the operation type defaults to index.
if (opTypeField != null && rawPayload.containsKey(opTypeField)) {
    Object val = rawPayload.remove(opTypeField);
    validateScalar(opTypeField, val);
    String stringVal = String.valueOf(val).trim();
    payloadMap.put(OP_TYPE, resolveOpType(stringVal));
} else {
    payloadMap.put(OP_TYPE, OP_TYPE_INDEX);
}

// Remaining fields become _source
payloadMap.put(SOURCE, rawPayload);
Null mapperSettings

In validateSettings, the check mapperSettings != null && mapperSettings.isEmpty() == false for the default case correctly handles null. However, in create(String, int, Map), mapperSettings is passed directly to FieldMappingIngestionMessageMapper without null-guarding at the factory level (the constructor handles null, but it's inconsistent). Consider normalizing null to empty map at the factory entry point for clarity.

static IngestionMessageMapper create(String mapperTypeString, int shardId, Map<String, Object> mapperSettings) {
    MapperType mapperType = MapperType.fromString(mapperTypeString);
    switch (mapperType) {
        case DEFAULT:
            return new DefaultIngestionMessageMapper();
        case RAW_PAYLOAD:
            return new RawPayloadIngestionMessageMapper(shardId);
        case FIELD_MAPPING:
            return new FieldMappingIngestionMessageMapper(mapperSettings);

@github-actions
Copy link
Contributor

github-actions bot commented Feb 25, 2026

PR Code Suggestions ✨

Latest suggestions up to 5e1c081

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Parse version field as numeric long value

The version value is stored as a String in the payload map, but external versioning
in OpenSearch requires a long. Downstream processing that expects a numeric version
will fail or behave incorrectly if it receives a string. The version should be
parsed to a long and validated to be a positive number before being stored.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java [148-155]

 if (versionField != null) {
     if (rawPayload.containsKey(versionField) == false) {
         throw new IllegalArgumentException("configured version_field [" + versionField + "] is missing from the message");
     }
     Object versionValue = rawPayload.remove(versionField);
     validateScalar(versionField, versionValue);
-    payloadMap.put(VersionFieldMapper.NAME, String.valueOf(versionValue).trim());
+    String versionStr = String.valueOf(versionValue).trim();
+    try {
+        long version = Long.parseLong(versionStr);
+        if (version < 0) {
+            throw new IllegalArgumentException("field [" + versionField + "] must be a non-negative long, but got: " + version);
+        }
+        payloadMap.put(VersionFieldMapper.NAME, version);
+    } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("field [" + versionField + "] must be a valid long, but got: " + versionStr);
+    }
 }
Suggestion importance[1-10]: 7

__

Why: OpenSearch external versioning requires a long value, and storing the version as a String in payloadMap could cause downstream failures when the version is used for conflict resolution. Parsing and validating the version as a long early would prevent confusing runtime errors and ensure correctness.

Medium
Add type safety check before payload cast

The cast (byte[]) message.getPayload() will throw a ClassCastException if the
payload is not a byte[]. This is a runtime error that would be hard to diagnose.
Consider adding a null check and type check before casting, or catching the
ClassCastException and rethrowing it as a more descriptive IllegalArgumentException.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java [125]

-Map<String, Object> rawPayload = IngestionUtils.getParsedPayloadMap((byte[]) message.getPayload());
+Object payload = message.getPayload();
+if (!(payload instanceof byte[])) {
+    throw new IllegalArgumentException(
+        "FieldMappingIngestionMessageMapper expects a byte[] payload, but got: "
+            + (payload == null ? "null" : payload.getClass().getSimpleName())
+    );
+}
+Map<String, Object> rawPayload = IngestionUtils.getParsedPayloadMap((byte[]) payload);
Suggestion importance[1-10]: 4

__

Why: The cast (byte[]) message.getPayload() could throw a ClassCastException at runtime. However, this pattern appears consistent with how other mappers in the codebase handle payloads (e.g., RawPayloadIngestionMessageMapper), suggesting it's an accepted contract. The improvement is valid but has limited impact given the established pattern.

Low
General
Clarify missing op_type_field handling behavior

When opTypeField is configured but missing from the message, the operation silently
defaults to index. This is inconsistent with the behavior of versionField, which
throws an exception when missing. If the user explicitly configured an opTypeField,
a missing field in the message may indicate a data issue and should at least be
documented. Consider whether a missing opTypeField should throw an exception
(similar to versionField) or remain a silent default, and make the behavior
consistent and explicit.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java [161-168]

-if (opTypeField != null && rawPayload.containsKey(opTypeField)) {
-    Object val = rawPayload.remove(opTypeField);
-    validateScalar(opTypeField, val);
-    String stringVal = String.valueOf(val).trim();
-    payloadMap.put(OP_TYPE, resolveOpType(stringVal));
+if (opTypeField != null) {
+    if (rawPayload.containsKey(opTypeField) == false) {
+        // op_type_field is optional in each message; default to index if absent
+        payloadMap.put(OP_TYPE, OP_TYPE_INDEX);
+    } else {
+        Object val = rawPayload.remove(opTypeField);
+        validateScalar(opTypeField, val);
+        String stringVal = String.valueOf(val).trim();
+        payloadMap.put(OP_TYPE, resolveOpType(stringVal));
+    }
 } else {
     payloadMap.put(OP_TYPE, OP_TYPE_INDEX);
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion asks to clarify behavior that is already tested and documented in the test testOpTypeFieldMissing_DefaultsToIndex. The existing code is functionally correct and the behavior (defaulting to index when opTypeField is absent) is intentional. The 'improved_code' is functionally equivalent to the existing code, just restructured with a comment.

Low

Previous suggestions

Suggestions up to commit c30fb6b
CategorySuggestion                                                                                                                                    Impact
Possible issue
Validate version field is a valid long

The version value is stored as a string, but OpenSearch external versioning requires
a long. If a non-numeric string is stored, it will cause a downstream parsing
failure with an unclear error. The version should be parsed and validated as a long
at this point to provide a clear error message.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java [148-155]

 if (versionField != null) {
     if (rawPayload.containsKey(versionField) == false) {
         throw new IllegalArgumentException("configured version_field [" + versionField + "] is missing from the message");
     }
     Object versionValue = rawPayload.remove(versionField);
     validateScalar(versionField, versionValue);
-    payloadMap.put(VersionFieldMapper.NAME, String.valueOf(versionValue).trim());
+    String versionStr = String.valueOf(versionValue).trim();
+    try {
+        Long.parseLong(versionStr);
+    } catch (NumberFormatException e) {
+        throw new IllegalArgumentException(
+            "field [" + versionField + "] must be a valid long value for versioning, but found: [" + versionStr + "]"
+        );
+    }
+    payloadMap.put(VersionFieldMapper.NAME, versionStr);
 }
Suggestion importance[1-10]: 6

__

Why: This is a valid concern - OpenSearch external versioning requires a long value, and storing a non-numeric string as _version would cause a downstream parsing failure with an unclear error. Early validation with a clear error message improves debuggability.

Low
Avoid mutating potentially shared raw payload map

The code mutates rawPayload directly by calling rawPayload.remove(...) on the map
returned by getParsedPayloadMap. If that method returns an unmodifiable map or a
shared reference, this will throw an exception or cause unintended side effects. The
raw payload should be copied into a new mutable map before modification.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java [125-126]

-Map<String, Object> rawPayload = IngestionUtils.getParsedPayloadMap((byte[]) message.getPayload());
+Map<String, Object> rawPayload = new HashMap<>(IngestionUtils.getParsedPayloadMap((byte[]) message.getPayload()));
 Map<String, Object> payloadMap = new HashMap<>();
Suggestion importance[1-10]: 5

__

Why: This is a valid defensive programming concern - if getParsedPayloadMap returns an unmodifiable or shared map, calling remove() on it would throw an exception. Wrapping it in a new HashMap<>() is a safe practice, though the actual risk depends on the implementation of getParsedPayloadMap.

Low
General
Clarify op_type field absence handling

When opTypeField is configured but missing from the message, the operation silently
defaults to index. This could mask data issues where the op_type field is expected
but absent. Consider adding a warning log or, if the field is required when
configured, throwing an exception similar to how versionField is handled.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java [161-168]

-if (opTypeField != null && rawPayload.containsKey(opTypeField)) {
-    Object val = rawPayload.remove(opTypeField);
-    validateScalar(opTypeField, val);
-    String stringVal = String.valueOf(val).trim();
-    payloadMap.put(OP_TYPE, resolveOpType(stringVal));
+if (opTypeField != null) {
+    if (rawPayload.containsKey(opTypeField) == false) {
+        // op_type_field is optional in the message; default to index
+        payloadMap.put(OP_TYPE, OP_TYPE_INDEX);
+    } else {
+        Object val = rawPayload.remove(opTypeField);
+        validateScalar(opTypeField, val);
+        String stringVal = String.valueOf(val).trim();
+        payloadMap.put(OP_TYPE, resolveOpType(stringVal));
+    }
 } else {
     payloadMap.put(OP_TYPE, OP_TYPE_INDEX);
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion restructures the code for clarity but doesn't change behavior - the existing code already handles the case where opTypeField is null or missing by defaulting to OP_TYPE_INDEX. The test testOpTypeFieldMissing_DefaultsToIndex explicitly validates this behavior, so the current implementation is intentional.

Low
Suggestions up to commit 163581c
CategorySuggestion                                                                                                                                    Impact
Possible issue
Validate version field is a valid long value

The version value is stored as a String in the payload map, but external versioning
in OpenSearch requires a long. Downstream processing that expects a numeric version
will fail or behave incorrectly if it receives a string. The version should be
parsed to a long and validated to be a positive number before being stored.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java [148-155]

 if (versionField != null) {
     if (rawPayload.containsKey(versionField) == false) {
         throw new IllegalArgumentException("configured version_field [" + versionField + "] is missing from the message");
     }
     Object versionValue = rawPayload.remove(versionField);
     validateScalar(versionField, versionValue);
-    payloadMap.put(VersionFieldMapper.NAME, String.valueOf(versionValue).trim());
+    String versionStr = String.valueOf(versionValue).trim();
+    try {
+        long version = Long.parseLong(versionStr);
+        if (version < 0) {
+            throw new IllegalArgumentException("field [" + versionField + "] must be a non-negative long, but got: " + version);
+        }
+        payloadMap.put(VersionFieldMapper.NAME, versionStr);
+    } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("field [" + versionField + "] must be a valid long, but got: " + versionStr);
+    }
 }
Suggestion importance[1-10]: 6

__

Why: External versioning in OpenSearch requires a long value, and storing the version as a String without validation could cause downstream failures. Parsing and validating the version as a long at ingestion time provides better error messages and prevents silent failures.

Low
Add type safety check before payload cast

The cast (byte[]) message.getPayload() will throw a ClassCastException if the
message payload is not a byte[]. This is a runtime error that would be difficult to
diagnose. Consider adding a null check and type check before casting, or catching
the ClassCastException and wrapping it in a more descriptive
IllegalArgumentException.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java [125]

-Map<String, Object> rawPayload = IngestionUtils.getParsedPayloadMap((byte[]) message.getPayload());
+Object payload = message.getPayload();
+if (!(payload instanceof byte[])) {
+    throw new IllegalArgumentException(
+        "FieldMappingIngestionMessageMapper expects a byte[] payload, but got: "
+            + (payload == null ? "null" : payload.getClass().getSimpleName())
+    );
+}
+Map<String, Object> rawPayload = IngestionUtils.getParsedPayloadMap((byte[]) payload);
Suggestion importance[1-10]: 4

__

Why: The cast (byte[]) message.getPayload() could throw a ClassCastException at runtime. However, this pattern appears consistent with how other mappers in the codebase handle payloads (e.g., RawPayloadIngestionMessageMapper), so this is a minor defensive improvement rather than a critical fix.

Low
General
Clarify silent fallback for missing op_type_field

When opTypeField is configured but missing from the message, the operation silently
defaults to index. This is inconsistent with the behavior of versionField, which
throws an exception when missing. If the opTypeField is configured, its absence from
the message may indicate a data quality issue and should be handled explicitly, or
at least documented clearly as intentional. Consider whether a missing opTypeField
should throw an exception similar to versionField, or add a comment explaining the
intentional silent fallback.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java [161-168]

-if (opTypeField != null && rawPayload.containsKey(opTypeField)) {
-    Object val = rawPayload.remove(opTypeField);
-    validateScalar(opTypeField, val);
-    String stringVal = String.valueOf(val).trim();
-    payloadMap.put(OP_TYPE, resolveOpType(stringVal));
+if (opTypeField != null) {
+    if (rawPayload.containsKey(opTypeField) == false) {
+        // op_type_field is optional in each message; missing field defaults to index
+        payloadMap.put(OP_TYPE, OP_TYPE_INDEX);
+    } else {
+        Object val = rawPayload.remove(opTypeField);
+        validateScalar(opTypeField, val);
+        String stringVal = String.valueOf(val).trim();
+        payloadMap.put(OP_TYPE, resolveOpType(stringVal));
+    }
 } else {
     payloadMap.put(OP_TYPE, OP_TYPE_INDEX);
 }
Suggestion importance[1-10]: 3

__

Why: The behavior is intentional and documented in the test testOpTypeFieldMissing_DefaultsToIndex, which explicitly tests that a missing opTypeField defaults to index. The suggestion adds a comment but doesn't change behavior, making it a minor documentation improvement.

Low
Suggestions up to commit 9d08270
CategorySuggestion                                                                                                                                    Impact
General
Validate version field is a valid positive long

The version value is stored as a plain string, but OpenSearch external versioning
requires a positive long value. If a non-numeric or negative value is provided, it
will fail later during indexing with a less informative error. Consider validating
that the version value is a valid positive long at mapping time to provide a clearer
error message.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java [148-155]

 if (versionField != null) {
     if (rawPayload.containsKey(versionField) == false) {
         throw new IllegalArgumentException("configured version_field [" + versionField + "] is missing from the message");
     }
     Object versionValue = rawPayload.remove(versionField);
     validateScalar(versionField, versionValue);
-    payloadMap.put(VersionFieldMapper.NAME, String.valueOf(versionValue).trim());
+    String versionStr = String.valueOf(versionValue).trim();
+    try {
+        long versionLong = Long.parseLong(versionStr);
+        if (versionLong < 0) {
+            throw new IllegalArgumentException("field [" + versionField + "] must be a non-negative long for versioning, but got: " + versionStr);
+        }
+    } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("field [" + versionField + "] must be a valid long for versioning, but got: " + versionStr);
+    }
+    payloadMap.put(VersionFieldMapper.NAME, versionStr);
 }
Suggestion importance[1-10]: 6

__

Why: OpenSearch external versioning requires a positive long, so validating the version value at mapping time provides a much clearer error message than letting it fail deeper in the indexing pipeline. This is a meaningful improvement to error handling and user experience.

Low
Clarify silent default for missing op_type field

When opTypeField is configured but missing from the message, the operation silently
defaults to index. This is inconsistent with the behavior of versionField and
idField, which throw exceptions when their configured fields are missing. Consider
whether a missing opTypeField should also throw an exception, or at minimum document
this intentional asymmetry clearly. If the silent default is intentional, it should
be validated that this is the expected behavior per the spec.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java [161-168]

-if (opTypeField != null && rawPayload.containsKey(opTypeField)) {
-    Object val = rawPayload.remove(opTypeField);
-    validateScalar(opTypeField, val);
-    String stringVal = String.valueOf(val).trim();
-    payloadMap.put(OP_TYPE, resolveOpType(stringVal));
+if (opTypeField != null) {
+    if (rawPayload.containsKey(opTypeField) == false) {
+        // op_type_field is optional in the message; default to index if absent
+        payloadMap.put(OP_TYPE, OP_TYPE_INDEX);
+    } else {
+        Object val = rawPayload.remove(opTypeField);
+        validateScalar(opTypeField, val);
+        String stringVal = String.valueOf(val).trim();
+        payloadMap.put(OP_TYPE, resolveOpType(stringVal));
+    }
 } else {
     payloadMap.put(OP_TYPE, OP_TYPE_INDEX);
 }
Suggestion importance[1-10]: 3

__

Why: The existing code already handles the missing opTypeField case by defaulting to index, and there's even a test testOpTypeFieldMissing_DefaultsToIndex that validates this behavior. The suggestion's improved_code is functionally equivalent to the existing code, just restructured. The asymmetry with idField/versionField is intentional per the test coverage.

Low
Possible issue
Defensive copy prevents mutation of original payload map

The rawPayload map returned by getParsedPayloadMap is directly mutated via
rawPayload.remove(...) calls. If the underlying map implementation is unmodifiable
or if the same message object is processed multiple times (e.g., on retry), this
will cause a runtime exception. Consider creating a defensive copy of the raw
payload before mutating it.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java [125-126]

-Map<String, Object> rawPayload = IngestionUtils.getParsedPayloadMap((byte[]) message.getPayload());
+Map<String, Object> rawPayload = new HashMap<>(IngestionUtils.getParsedPayloadMap((byte[]) message.getPayload()));
 Map<String, Object> payloadMap = new HashMap<>();
Suggestion importance[1-10]: 5

__

Why: The suggestion is valid — mutating the map returned by getParsedPayloadMap could cause issues if the map is unmodifiable or reused. Creating a defensive copy via new HashMap<>(...) is a safe practice, though the actual risk depends on the implementation of getParsedPayloadMap.

Low
Suggestions up to commit d65edd6
CategorySuggestion                                                                                                                                    Impact
Possible issue
Ensure raw payload map is mutable before modification

The rawPayload map returned by IngestionUtils.getParsedPayloadMap may be
unmodifiable (e.g., Collections.unmodifiableMap) depending on the implementation.
Since the code calls rawPayload.remove(...) on it, this would throw an
UnsupportedOperationException at runtime. It is safer to wrap it in a new HashMap
before mutating it.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java [125-126]

-Map<String, Object> rawPayload = IngestionUtils.getParsedPayloadMap((byte[]) message.getPayload());
+Map<String, Object> rawPayload = new HashMap<>(IngestionUtils.getParsedPayloadMap((byte[]) message.getPayload()));
 Map<String, Object> payloadMap = new HashMap<>();
Suggestion importance[1-10]: 6

__

Why: If IngestionUtils.getParsedPayloadMap returns an unmodifiable map, calling rawPayload.remove(...) would throw an UnsupportedOperationException. Wrapping in a new HashMap<>() is a defensive and correct fix, though it depends on the actual implementation of getParsedPayloadMap.

Low
General
Validate settings before constructing mapper instance

The create method does not validate mapperSettings before constructing the mapper.
If invalid settings are passed, the error will be thrown inside the constructor
rather than at the factory level. Calling validateSettings(mapperType,
mapperSettings) before construction would provide a consistent validation point and
clearer error attribution.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/IngestionMessageMapper.java [120-132]

 static IngestionMessageMapper create(String mapperTypeString, int shardId, Map<String, Object> mapperSettings) {
     MapperType mapperType = MapperType.fromString(mapperTypeString);
+    validateSettings(mapperType, mapperSettings);
     switch (mapperType) {
         case DEFAULT:
             return new DefaultIngestionMessageMapper();
         case RAW_PAYLOAD:
             return new RawPayloadIngestionMessageMapper(shardId);
         case FIELD_MAPPING:
             return new FieldMappingIngestionMessageMapper(mapperSettings);
         default:
             throw new IllegalArgumentException("Unknown mapper type: " + mapperType);
     }
 }
Suggestion importance[1-10]: 4

__

Why: Adding validateSettings before construction provides a consistent validation point, but since FieldMappingIngestionMessageMapper's constructor already calls validateSettings internally, this would result in double validation. The improvement is minor and introduces redundancy.

Low
Warn when configured op_type_field is absent from message

When opTypeField is configured but missing from the message, the code silently
defaults to OP_TYPE_INDEX. However, if deleteValue or createValue is configured
alongside opTypeField, a missing opTypeField in the message may indicate a data
issue that should be surfaced. Consider at minimum logging a warning, or optionally
throwing an exception when the field is configured but absent, to avoid silently
misclassifying operations.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java [161-168]

 if (opTypeField != null && rawPayload.containsKey(opTypeField)) {
     Object val = rawPayload.remove(opTypeField);
     validateScalar(opTypeField, val);
     String stringVal = String.valueOf(val).trim();
     payloadMap.put(OP_TYPE, resolveOpType(stringVal));
 } else {
+    if (opTypeField != null && (deleteValue != null || createValue != null)) {
+        // op_type_field is configured with values but missing from message; log a warning
+        logger.warn("configured op_type_field [{}] is missing from the message; defaulting to index", opTypeField);
+    }
     payloadMap.put(OP_TYPE, OP_TYPE_INDEX);
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion adds a warning log when opTypeField is configured but missing from the message. While this could help with debugging, the existing test testOpTypeFieldMissing_DefaultsToIndex explicitly validates the silent default-to-index behavior, suggesting this is intentional design. The improvement is minor and the improved_code references a logger that may not be defined in the class.

Low
Suggestions up to commit 86540d2
CategorySuggestion                                                                                                                                    Impact
Possible issue
Validate and parse version field as numeric long

The version value is stored as a string, but OpenSearch external versioning requires
a numeric long value. Storing it as a string may cause downstream failures when the
version is used for conflict resolution. Parse the version string to a Long and
validate it is a positive number before storing.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java [148-155]

 if (versionField != null) {
     if (rawPayload.containsKey(versionField) == false) {
         throw new IllegalArgumentException("configured version_field [" + versionField + "] is missing from the message");
     }
     Object versionValue = rawPayload.remove(versionField);
     validateScalar(versionField, versionValue);
-    payloadMap.put(VersionFieldMapper.NAME, String.valueOf(versionValue).trim());
+    String versionStr = String.valueOf(versionValue).trim();
+    long version;
+    try {
+        version = Long.parseLong(versionStr);
+    } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("field [" + versionField + "] must be a valid long value for versioning, but found: " + versionStr);
+    }
+    if (version < 0) {
+        throw new IllegalArgumentException("field [" + versionField + "] must be a non-negative version value, but found: " + version);
+    }
+    payloadMap.put(VersionFieldMapper.NAME, version);
 }
Suggestion importance[1-10]: 6

__

Why: OpenSearch external versioning requires a numeric long value, so storing the version as a string could cause downstream failures. However, it's possible the downstream processing already handles string-to-long conversion, and the test at line 115 asserts "1234567890" (a string), suggesting the current design may be intentional.

Low
Ensure raw payload map is mutable before modification

The rawPayload map returned by getParsedPayloadMap may be unmodifiable (e.g.,
Collections.unmodifiableMap), which would cause a runtime exception when calling
rawPayload.remove(idField). Wrap it in a new HashMap to ensure mutability before
performing removals.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java [125-126]

-Map<String, Object> rawPayload = IngestionUtils.getParsedPayloadMap((byte[]) message.getPayload());
+Map<String, Object> rawPayload = new HashMap<>(IngestionUtils.getParsedPayloadMap((byte[]) message.getPayload()));
 Map<String, Object> payloadMap = new HashMap<>();
Suggestion importance[1-10]: 5

__

Why: If IngestionUtils.getParsedPayloadMap returns an unmodifiable map, calling rawPayload.remove() would throw an UnsupportedOperationException. Wrapping in a HashMap is a defensive measure, though it depends on the actual implementation of getParsedPayloadMap.

Low
General
Clarify silent default when op_type_field is absent

When opTypeField is configured but the field is absent from the message, the
operation silently defaults to index. This could mask data issues where a required
op-type field is unexpectedly missing. Consider adding a warning log or, if the
field is considered required when configured, throwing an exception similar to how
versionField absence is handled.

server/src/main/java/org/opensearch/indices/pollingingest/mappers/FieldMappingIngestionMessageMapper.java [161-168]

-if (opTypeField != null && rawPayload.containsKey(opTypeField)) {
-    Object val = rawPayload.remove(opTypeField);
-    validateScalar(opTypeField, val);
-    String stringVal = String.valueOf(val).trim();
-    payloadMap.put(OP_TYPE, resolveOpType(stringVal));
+if (opTypeField != null) {
+    if (rawPayload.containsKey(opTypeField) == false) {
+        // op_type_field is configured but missing from message; default to index
+        payloadMap.put(OP_TYPE, OP_TYPE_INDEX);
+    } else {
+        Object val = rawPayload.remove(opTypeField);
+        validateScalar(opTypeField, val);
+        String stringVal = String.valueOf(val).trim();
+        payloadMap.put(OP_TYPE, resolveOpType(stringVal));
+    }
 } else {
     payloadMap.put(OP_TYPE, OP_TYPE_INDEX);
 }
Suggestion importance[1-10]: 2

__

Why: The improved_code is functionally equivalent to the existing_code — it just restructures the conditional without changing behavior. The suggestion only adds a comment and doesn't meaningfully improve correctness or safety.

Low

@github-actions
Copy link
Contributor

Persistent review updated to latest commit 7f3c5b6

@github-actions
Copy link
Contributor

❌ Gradle check result for 7f3c5b6: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Contributor

Persistent review updated to latest commit 835b1d4

@github-actions
Copy link
Contributor

❌ Gradle check result for 835b1d4: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

imRishN and others added 20 commits March 1, 2026 13:20
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
…t#20725)

Emeritus maintainers are not active in the project, therefore I don't
see a lot of value in tracking their affiliation.

Signed-off-by: Andrew Ross <andrross@amazon.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
---------

Signed-off-by: Divya <DIVYA2@ibm.com>
Signed-off-by: Divya <divyaruhil999@gmail.com>
Co-authored-by: Divya <DIVYA2@ibm.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
…m SecureTransportSettingsProvider (opensearch-project#20734)

Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
…nsearch-project#20737)

Signed-off-by: Andriy Redko <drreta@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
…on (opensearch-project#20704)

* remove experimental tag for pull-based ingestion

Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>

* update BroadcastRequest to be marked as public API

Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>

---------

Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Ankit Jain <jainankitk@apache.org>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
…ch-project#20585)

Signed-off-by: Shashank Gowri <shnkgo@amazon.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@imRishN imRishN force-pushed the field-mapper-imp branch from 9d08270 to e7b82bd Compare March 1, 2026 07:57
@github-actions
Copy link
Contributor

github-actions bot commented Mar 1, 2026

Persistent review updated to latest commit 163581c

@github-actions
Copy link
Contributor

github-actions bot commented Mar 1, 2026

❌ Gradle check result for 163581c: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@github-actions
Copy link
Contributor

github-actions bot commented Mar 1, 2026

Persistent review updated to latest commit c30fb6b

@github-actions
Copy link
Contributor

github-actions bot commented Mar 1, 2026

❌ Gradle check result for c30fb6b: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: Rishab Nahata <rishabnahata07@gmail.com>
@github-actions
Copy link
Contributor

github-actions bot commented Mar 1, 2026

Persistent review updated to latest commit 5e1c081

@github-actions
Copy link
Contributor

github-actions bot commented Mar 1, 2026

✅ Gradle check result for 5e1c081: SUCCESS

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement Enhancement or improvement to existing feature or request missing-component

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature Request] Implement field_mapping mapper type for pull-based ingestion

8 participants