Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635))
- Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523))
- Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629))
- Add SMILE/CBOR/YAML document format support to Bulk GRPC endpoint ([#19744](https://github.com/opensearch-project/OpenSearch/pull/19744))

### Changed
- Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,21 @@ private static Boolean valueOrDefault(Boolean value, Boolean globalDefault) {
return value;
}

/**
* Detects the media type from the byte content, with fallback to JSON if detection fails.
* This enables support for JSON, SMILE, and CBOR formats in gRPC bulk requests.
*
* @param document The document content as bytes
* @return The detected MediaType, or JSON if detection fails or document is empty
*/
static MediaType detectMediaType(byte[] document) {
if (document == null || document.length == 0) {
return MediaTypeRegistry.JSON;
}
MediaType detectedType = MediaTypeRegistry.mediaTypeFromBytes(document, 0, document.length);
return detectedType != null ? detectedType : MediaTypeRegistry.JSON;
}

/**
* Similar to {@link BulkRequestParser#parse(BytesReference, String, String, FetchSourceContext, String, Boolean, boolean, MediaType, Consumer, Consumer, Consumer)}, except that it takes into account global values.
*
Expand Down Expand Up @@ -231,6 +246,7 @@ public static IndexRequest buildCreateRequest(
pipeline = createOperation.hasPipeline() ? createOperation.getPipeline() : pipeline;
requireAlias = createOperation.hasRequireAlias() ? createOperation.getRequireAlias() : requireAlias;

MediaType mediaType = detectMediaType(document);
IndexRequest indexRequest = new IndexRequest(index).id(id)
.routing(routing)
.version(version)
Expand All @@ -239,7 +255,7 @@ public static IndexRequest buildCreateRequest(
.setPipeline(pipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(document, MediaTypeRegistry.JSON)
.source(document, mediaType)
.setRequireAlias(requireAlias);
return indexRequest;
}
Expand Down Expand Up @@ -288,6 +304,7 @@ public static IndexRequest buildIndexRequest(
ifPrimaryTerm = indexOperation.hasIfPrimaryTerm() ? indexOperation.getIfPrimaryTerm() : ifPrimaryTerm;
requireAlias = indexOperation.hasRequireAlias() ? indexOperation.getRequireAlias() : requireAlias;

MediaType mediaType = detectMediaType(document);
IndexRequest indexRequest;
if (opType == null) {
indexRequest = new IndexRequest(index).id(id)
Expand All @@ -297,7 +314,7 @@ public static IndexRequest buildIndexRequest(
.setPipeline(pipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(document, MediaTypeRegistry.JSON)
.source(document, mediaType)
.setRequireAlias(requireAlias);
} else {
indexRequest = new IndexRequest(index).id(id)
Expand All @@ -308,7 +325,7 @@ public static IndexRequest buildIndexRequest(
.setPipeline(pipeline)
.setIfSeqNo(ifSeqNo)
.setIfPrimaryTerm(ifPrimaryTerm)
.source(document, MediaTypeRegistry.JSON)
.source(document, mediaType)
.setRequireAlias(requireAlias);
}
return indexRequest;
Expand Down Expand Up @@ -408,7 +425,9 @@ public static UpdateRequest fromProto(
}

if (updateAction.hasUpsert()) {
updateRequest.upsert(updateAction.getUpsert(), MediaTypeRegistry.JSON);
byte[] upsertBytes = updateAction.getUpsert().toByteArray();
MediaType upsertMediaType = detectMediaType(upsertBytes);
updateRequest.upsert(upsertBytes, upsertMediaType);
}

if (updateAction.hasDocAsUpsert()) {
Expand All @@ -424,7 +443,8 @@ public static UpdateRequest fromProto(
}
}

updateRequest.doc(document, MediaTypeRegistry.JSON);
MediaType mediaType = detectMediaType(document);
updateRequest.doc(document, mediaType);

if (updateOperation.hasIfSeqNo()) {
updateRequest.setIfSeqNo(updateOperation.getIfSeqNo());
Expand Down
Loading
Loading