Skip to content

Commit

Permalink
Reject bulk requests with invalid actions
Browse files Browse the repository at this point in the history
The existing bulk api silently ignores bulk item requests that have an invalid action. This change rejects those requests.

Signed-off-by: Rabi Panda <adnapibar@gmail.com>
  • Loading branch information
adnapibar committed Nov 17, 2022
1 parent 7aa615f commit 502131b
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 6 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Fixed
- Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827))
- Fixed compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944))
- Add jvm option to allow security manager ([#5194](https://github.com/opensearch-project/OpenSearch/pull/5194))
- Reject bulk requests with invalid actions ([#5299](https://github.com/opensearch-project/OpenSearch/issues/5299))

### Security

## [Unreleased 2.x]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,33 @@ public final class BulkRequestParser {
private static final ParseField IF_PRIMARY_TERM = new ParseField("if_primary_term");
private static final ParseField REQUIRE_ALIAS = new ParseField(DocWriteRequest.REQUIRE_ALIAS);

private enum Action {
CREATE,
DELETE,
INDEX,
UPDATE;

private static final Map<String, Action> VALID_ACTIONS = Map.of(
"create",
CREATE,
"delete",
DELETE,
"index",
INDEX,
"update",
UPDATE
);

static Action of(String name, int line) {
if (name != null && VALID_ACTIONS.containsKey(name)) {
return VALID_ACTIONS.get(name);
}
throw new IllegalArgumentException(
"Unknown action line [" + line + "], expected one of [create, delete, index, update]" + " but found [" + name + "]"
);
}
}

private static int findNextMarker(byte marker, int from, BytesReference data) {
final int res = data.indexOf(marker, from);
if (res != -1) {
Expand Down Expand Up @@ -176,7 +203,7 @@ public void parse(
+ "]"
);
}
String action = parser.currentName();
Action action = Action.of(parser.currentName(), line);

String index = defaultIndex;
String id = null;
Expand Down Expand Up @@ -272,7 +299,7 @@ public void parse(
);
}

if ("delete".equals(action)) {
if (action == Action.DELETE) {
deleteRequestConsumer.accept(
new DeleteRequest(index).id(id)
.routing(routing)
Expand All @@ -290,7 +317,7 @@ public void parse(

// we use internalAdd so we don't fork here, this allows us not to copy over the big byte array to small chunks
// of index request.
if ("index".equals(action)) {
if (action == Action.INDEX) {
if (opType == null) {
indexRequestConsumer.accept(
new IndexRequest(index).id(id)
Expand All @@ -317,7 +344,7 @@ public void parse(
.setRequireAlias(requireAlias)
);
}
} else if ("create".equals(action)) {
} else if (action == Action.CREATE) {
indexRequestConsumer.accept(
new IndexRequest(index).id(id)
.routing(routing)
Expand All @@ -330,7 +357,7 @@ public void parse(
.source(sliceTrimmingCarriageReturn(data, from, nextMarker, xContentType), xContentType)
.setRequireAlias(requireAlias)
);
} else if ("update".equals(action)) {
} else if (action == Action.UPDATE) {
if (version != Versions.MATCH_ANY || versionType != VersionType.INTERNAL) {
throw new IllegalArgumentException(
"Update requests do not support versioning. " + "Please use `if_seq_no` and `if_primary_term` instead"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,27 @@ public void testParseDeduplicatesParameterStrings() throws IOException {
assertSame(first.getPipeline(), second.getPipeline());
assertSame(first.routing(), second.routing());
}

public void testFailOnUnsupportedAction() {
BytesArray request = new BytesArray("{ \"baz\":{ \"_id\": \"bar\" } }\n{}\n");
BulkRequestParser parser = new BulkRequestParser();

IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> parser.parse(
request,
"foo",
null,
null,
null,
true,
false,
XContentType.JSON,
req -> fail(),
req -> fail(),
req -> fail()
)
);
assertEquals("Unknown action line [1], expected one of [create, delete, index, update] but found [baz]", ex.getMessage());
}
}

0 comments on commit 502131b

Please sign in to comment.