Skip to content

Commit

Permalink
[Streaming Indexing] Introduce bulk HTTP API streaming flavor (#15381) (
Browse files Browse the repository at this point in the history
#15472)

* [Streaming Indexing] Introduce bulk HTTP API streaming flavor

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

* Address code review comments

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

* Add more test cases

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

* Add more test cases

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

---------

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
(cherry picked from commit 8d17c8d)
  • Loading branch information
reta authored Aug 28, 2024
1 parent 635d2e9 commit 50d8fc5
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Star tree mapping changes ([#14605](https://github.com/opensearch-project/OpenSearch/pull/14605))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))

### Dependencies
Expand Down
50 changes: 49 additions & 1 deletion client/rest/src/main/java/org/opensearch/client/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import java.util.zip.GZIPOutputStream;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

Expand Down Expand Up @@ -394,7 +395,12 @@ private Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamRequest(
try {
final ResponseOrResponseException responseOrResponseException = convertResponse(request, node, message);
if (responseOrResponseException.responseException == null) {
return Mono.just(message);
return Mono.just(
new Message<>(
message.getHead(),
Flux.from(message.getBody()).flatMapSequential(b -> Flux.fromIterable(frame(b)))
)
);
} else {
if (nodeTuple.nodes.hasNext()) {
return Mono.from(streamRequest(nodeTuple, request));
Expand All @@ -409,6 +415,48 @@ private Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamRequest(
});
}

/**
* Frame the {@link ByteBuffer} into individual chunks that are separated by '\r\n' sequence.
* @param b {@link ByteBuffer} to split
* @return individual chunks
*/
private static Collection<ByteBuffer> frame(ByteBuffer b) {
final Collection<ByteBuffer> buffers = new ArrayList<>();

int position = b.position();
while (b.hasRemaining()) {
// Skip the chunk separator when it comes right at the beginning
if (b.get() == '\r' && b.hasRemaining() && b.position() > 1) {
if (b.get() == '\n') {
final byte[] chunk = new byte[b.position() - position];

b.position(position);
b.get(chunk);

// Do not copy the '\r\n' sequence
buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length - 2));
position = b.position();
}
}
}

if (buffers.isEmpty()) {
return Collections.singleton(b);
}

// Copy last chunk
if (position != b.position()) {
final byte[] chunk = new byte[b.position() - position];

b.position(position);
b.get(chunk);

buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length));
}

return buffers;
}

private ResponseOrResponseException convertResponse(InternalRequest request, Node node, HttpResponse httpResponse) throws IOException {
RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse);
int statusCode = httpResponse.getStatusLine().getStatusCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void tearDown() throws Exception {
super.tearDown();
}

public void testStreamingRequest() throws IOException {
public void testStreamingRequestNoBatching() throws IOException {
final VirtualTimeScheduler scheduler = VirtualTimeScheduler.create(true);

final Stream<String> stream = IntStream.range(1, 6)
Expand Down Expand Up @@ -85,6 +85,167 @@ public void testStreamingRequest() throws IOException {
assertThat(count, equalTo(5));
}

public void testStreamingRequestOneBatchBySize() throws IOException, InterruptedException {
final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");

final Duration delay = Duration.ofMillis(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
);
streamingRequest.addParameter("refresh", "true");
streamingRequest.addParameter("batch_size", "5");

final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);

StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
.expectNextMatches(
s -> s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"1\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"2\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"3\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"4\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"5\"")
)
.expectComplete()
.verify();

assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
assertThat(streamingResponse.getWarnings(), empty());

final Request request = new Request("GET", "/test-streaming/_count");
final Response response = client().performRequest(request);
final ObjectPath objectPath = ObjectPath.createFromResponse(response);
final Integer count = objectPath.evaluate("count");
assertThat(count, equalTo(5));
}

public void testStreamingRequestManyBatchesBySize() throws IOException {
final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");

final Duration delay = Duration.ofMillis(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
);
streamingRequest.addParameter("refresh", "true");
streamingRequest.addParameter("batch_size", "3");

final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);

StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
.expectNextMatches(
s -> s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"1\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"2\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"3\"")
)
.expectNextMatches(
s -> s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"4\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"5\"")
)
.expectComplete()
.verify();

assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
assertThat(streamingResponse.getWarnings(), empty());

final Request request = new Request("GET", "/test-streaming/_count");
final Response response = client().performRequest(request);
final ObjectPath objectPath = ObjectPath.createFromResponse(response);
final Integer count = objectPath.evaluate("count");
assertThat(count, equalTo(5));
}

public void testStreamingRequestManyBatchesByInterval() throws IOException {
final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");

final Duration delay = Duration.ofMillis(500);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
);
streamingRequest.addParameter("refresh", "true");
streamingRequest.addParameter("batch_interval", "5s");

final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);

// We don't check for a other documents here since those may appear in any of the chunks (it is very
// difficult to get the timing right). But at the end, the total number of the documents is being checked.
StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
.expectNextMatches(
s -> s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"1\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"2\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"3\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"4\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"5\"")
)
.expectComplete()
.verify();

assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
assertThat(streamingResponse.getWarnings(), empty());

final Request request = new Request("GET", "/test-streaming/_count");
final Response response = client().performRequest(request);
final ObjectPath objectPath = ObjectPath.createFromResponse(response);
final Integer count = objectPath.evaluate("count");
assertThat(count, equalTo(5));
}

public void testStreamingRequestManyBatchesByIntervalAndSize() throws IOException {
final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");

final Duration delay = Duration.ofSeconds(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
);
streamingRequest.addParameter("refresh", "true");
streamingRequest.addParameter("batch_interval", "3s");
streamingRequest.addParameter("batch_size", "5");

final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);

// We don't check for a other documents here since those may appear in any of the chunks (it is very
// difficult to get the timing right). But at the end, the total number of the documents is being checked.
StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
.expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"1\""))
.expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"5\""))
.expectComplete()
.verify();

assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
assertThat(streamingResponse.getWarnings(), empty());

final Request request = new Request("GET", "/test-streaming/_count");
final Response response = client().performRequest(request);
final ObjectPath objectPath = ObjectPath.createFromResponse(response);
final Integer count = objectPath.evaluate("count");
assertThat(count, equalTo(5));
}

public void testStreamingBadRequest() throws IOException {
final Stream<String> stream = Stream.of(
"{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n" + "{ \"name\": \"josh\" }\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ public void receiveChunk(HttpChunk message) {
}
} catch (final Exception ex) {
producer.error(ex);
} finally {
message.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void subscribe(Subscriber<? super HttpContent> s) {
}

HttpChunk createChunk(HttpContent chunk, boolean last) {
return new ReactorNetty4HttpChunk(chunk.copy().content(), last);
return new ReactorNetty4HttpChunk(chunk.copy().content().retain(), last);
}

StreamingHttpChannel httpChannel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
} catch (final IOException ex) {
throw new UncheckedIOException(ex);
}
}).collect(Collectors.joining(""))));
}).collect(Collectors.joining("\r\n", "", "\r\n"))));
} finally {
response.release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.bytes.CompositeBytesReference;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.http.HttpChunk;

/**
* Wraps the instance of the {@link XContentBuilder} into {@link HttpChunk}
*/
public final class XContentHttpChunk implements HttpChunk {
private static final byte[] CHUNK_SEPARATOR = new byte[] { '\r', '\n' };
private final BytesReference content;

/**
Expand All @@ -42,7 +44,8 @@ private XContentHttpChunk(@Nullable final XContentBuilder builder) {
if (builder == null /* no content */) {
content = BytesArray.EMPTY;
} else {
content = BytesReference.bytes(builder);
// Always finalize the output chunk with '\r\n' sequence
content = CompositeBytesReference.of(BytesReference.bytes(builder), new BytesArray(CHUNK_SEPARATOR));
}
}

Expand Down
Loading

0 comments on commit 50d8fc5

Please sign in to comment.