Skip to content

Commit

Permalink
[Streaming Indexing] Introduce bulk HTTP API streaming flavor
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed Aug 27, 2024
1 parent 091ab6f commit 20f72eb
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
2 changes: 1 addition & 1 deletion buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ reactor_netty = 1.1.22
reactor = 3.5.20

# client dependencies
httpclient5 = 5.2.3
httpclient5 = 5.3.1
httpcore5 = 5.2.5
httpclient = 4.5.14
httpcore = 4.4.16
Expand Down
1 change: 0 additions & 1 deletion client/rest/licenses/httpclient5-5.2.3.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions client/rest/licenses/httpclient5-5.3.1.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
56b53c8f4bcdaada801d311cf2ff8a24d6d96883
50 changes: 49 additions & 1 deletion client/rest/src/main/java/org/opensearch/client/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import java.util.zip.GZIPOutputStream;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

Expand Down Expand Up @@ -416,7 +417,12 @@ private Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamRequest(
try {
final ResponseOrResponseException responseOrResponseException = convertResponse(request, node, message);
if (responseOrResponseException.responseException == null) {
return Mono.just(message);
return Mono.just(

Check warning on line 420 in client/rest/src/main/java/org/opensearch/client/RestClient.java

View check run for this annotation

Codecov / codecov/patch

client/rest/src/main/java/org/opensearch/client/RestClient.java#L420

Added line #L420 was not covered by tests
new Message<>(
message.getHead(),
Flux.from(message.getBody()).flatMapSequential(b -> Flux.fromIterable(frame(b)))

Check warning on line 423 in client/rest/src/main/java/org/opensearch/client/RestClient.java

View check run for this annotation

Codecov / codecov/patch

client/rest/src/main/java/org/opensearch/client/RestClient.java#L422-L423

Added lines #L422 - L423 were not covered by tests
)
);
} else {
if (nodeTuple.nodes.hasNext()) {
return Mono.from(streamRequest(nodeTuple, request));
Expand All @@ -431,6 +437,48 @@ private Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamRequest(
});
}

/**
* Frame the {@link ByteBuffer} into individual chunks that are separated by '\r\n' sequence.
* @param b {@link ByteBuffer} to split
* @return individual chunks
*/
private static Collection<ByteBuffer> frame(ByteBuffer b) {
final Collection<ByteBuffer> buffers = new ArrayList<>();

Check warning on line 446 in client/rest/src/main/java/org/opensearch/client/RestClient.java

View check run for this annotation

Codecov / codecov/patch

client/rest/src/main/java/org/opensearch/client/RestClient.java#L446

Added line #L446 was not covered by tests

int position = b.position();

Check warning on line 448 in client/rest/src/main/java/org/opensearch/client/RestClient.java

View check run for this annotation

Codecov / codecov/patch

client/rest/src/main/java/org/opensearch/client/RestClient.java#L448

Added line #L448 was not covered by tests
while (b.hasRemaining()) {
// Skip the chunk separator when it comes right at the beginning
if (b.get() == '\r' && b.hasRemaining() && position > 1) {
if (b.get() == '\n') {
final byte[] chunk = new byte[b.position() - position];

Check warning on line 453 in client/rest/src/main/java/org/opensearch/client/RestClient.java

View check run for this annotation

Codecov / codecov/patch

client/rest/src/main/java/org/opensearch/client/RestClient.java#L453

Added line #L453 was not covered by tests

b.position(position);
b.get(chunk);

Check warning on line 456 in client/rest/src/main/java/org/opensearch/client/RestClient.java

View check run for this annotation

Codecov / codecov/patch

client/rest/src/main/java/org/opensearch/client/RestClient.java#L455-L456

Added lines #L455 - L456 were not covered by tests

// Do not copy the '\r\n' sequence
buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length - 2));
position = b.position();
}

Check warning on line 461 in client/rest/src/main/java/org/opensearch/client/RestClient.java

View check run for this annotation

Codecov / codecov/patch

client/rest/src/main/java/org/opensearch/client/RestClient.java#L459-L461

Added lines #L459 - L461 were not covered by tests
}
}

if (buffers.isEmpty()) {
return Collections.singleton(b);

Check warning on line 466 in client/rest/src/main/java/org/opensearch/client/RestClient.java

View check run for this annotation

Codecov / codecov/patch

client/rest/src/main/java/org/opensearch/client/RestClient.java#L466

Added line #L466 was not covered by tests
}

// Copy last chunk
if (position != b.position()) {
final byte[] chunk = new byte[b.position() - position];

Check warning on line 471 in client/rest/src/main/java/org/opensearch/client/RestClient.java

View check run for this annotation

Codecov / codecov/patch

client/rest/src/main/java/org/opensearch/client/RestClient.java#L471

Added line #L471 was not covered by tests

b.position(position);
b.get(chunk);

Check warning on line 474 in client/rest/src/main/java/org/opensearch/client/RestClient.java

View check run for this annotation

Codecov / codecov/patch

client/rest/src/main/java/org/opensearch/client/RestClient.java#L473-L474

Added lines #L473 - L474 were not covered by tests

buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length));

Check warning on line 476 in client/rest/src/main/java/org/opensearch/client/RestClient.java

View check run for this annotation

Codecov / codecov/patch

client/rest/src/main/java/org/opensearch/client/RestClient.java#L476

Added line #L476 was not covered by tests
}

return buffers;

Check warning on line 479 in client/rest/src/main/java/org/opensearch/client/RestClient.java

View check run for this annotation

Codecov / codecov/patch

client/rest/src/main/java/org/opensearch/client/RestClient.java#L479

Added line #L479 was not covered by tests
}

private ResponseOrResponseException convertResponse(InternalRequest request, Node node, ClassicHttpResponse httpResponse)
throws IOException {
RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse);
Expand Down
1 change: 0 additions & 1 deletion client/sniffer/licenses/httpclient5-5.2.3.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions client/sniffer/licenses/httpclient5-5.3.1.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
56b53c8f4bcdaada801d311cf2ff8a24d6d96883
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.IntStream;
import java.util.stream.Stream;

Expand All @@ -44,7 +46,7 @@ public void tearDown() throws Exception {
super.tearDown();
}

public void testStreamingRequest() throws IOException {
public void testStreamingRequestNoBatching() throws IOException {
final VirtualTimeScheduler scheduler = VirtualTimeScheduler.create(true);

final Stream<String> stream = IntStream.range(1, 6)
Expand Down Expand Up @@ -85,6 +87,91 @@ public void testStreamingRequest() throws IOException {
assertThat(count, equalTo(5));
}

public void testStreamingRequestOneBatch() throws IOException, InterruptedException {
final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");

final Duration delay = Duration.ofSeconds(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
);
streamingRequest.addParameter("refresh", "true");
streamingRequest.addParameter("batch_size", "5");

final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);

StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
.expectNextMatches(
s -> s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"1\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"2\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"3\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"4\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"5\"")
)
.expectComplete()
.verify();

assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
assertThat(streamingResponse.getWarnings(), empty());

final Request request = new Request("GET", "/test-streaming/_count");
final Response response = client().performRequest(request);
final ObjectPath objectPath = ObjectPath.createFromResponse(response);
final Integer count = objectPath.evaluate("count");
assertThat(count, equalTo(5));
}

public void testStreamingRequestManyBatches() throws IOException {
final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");

final Duration delay = Duration.ofSeconds(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
);
streamingRequest.addParameter("refresh", "true");
streamingRequest.addParameter("batch_size", "3");

final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);

StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
.expectNextMatches(
s -> s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"1\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"2\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"3\"")
)
.expectNextMatches(
s -> s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"4\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"5\"")
)
.expectComplete()
.verify();

assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
assertThat(streamingResponse.getWarnings(), empty());

final Request request = new Request("GET", "/test-streaming/_count");
final Response response = client().performRequest(request);
final ObjectPath objectPath = ObjectPath.createFromResponse(response);
final Integer count = objectPath.evaluate("count");
assertThat(count, equalTo(5));
}

public void testStreamingBadRequest() throws IOException {
final Stream<String> stream = Stream.of(
"{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n" + "{ \"name\": \"josh\" }\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ public void receiveChunk(HttpChunk message) {
}
} catch (final Exception ex) {
producer.error(ex);
} finally {
message.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.util.function.Consumer;

import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import org.reactivestreams.Publisher;
Expand All @@ -23,10 +24,12 @@
class ReactorNetty4StreamingRequestConsumer<T extends HttpContent> implements Consumer<T>, Publisher<HttpContent> {
private final ReactorNetty4StreamingResponseProducer sender;
private final StreamingHttpChannel httpChannel;
private final ByteBufAllocator alloc;

ReactorNetty4StreamingRequestConsumer(HttpServerRequest request, HttpServerResponse response) {
this.sender = new ReactorNetty4StreamingResponseProducer();
this.httpChannel = new ReactorNetty4StreamingHttpChannel(request, response, sender);
this.alloc = response.alloc();
}

@Override
Expand All @@ -44,7 +47,7 @@ public void subscribe(Subscriber<? super HttpContent> s) {
}

HttpChunk createChunk(HttpContent chunk, boolean last) {
return new ReactorNetty4HttpChunk(chunk.copy().content(), last);
return new ReactorNetty4HttpChunk(chunk.copy().content().retain(), last);
}

StreamingHttpChannel httpChannel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th
} catch (final IOException ex) {
throw new UncheckedIOException(ex);
}
}).collect(Collectors.joining(""))));
}).collect(Collectors.joining("\r\n", "", "\r\n"))));
} finally {
response.release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.http.HttpChunk;

import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;

/**
* Wraps the instance of the {@link XContentBuilder} into {@link HttpChunk}
*/
public final class XContentHttpChunk implements HttpChunk {
private static final byte[] CHUNK_SEPARATOR = new byte[] { '\r', '\n' };
private final BytesReference content;

/**
Expand All @@ -42,6 +47,13 @@ private XContentHttpChunk(@Nullable final XContentBuilder builder) {
if (builder == null /* no content */) {
content = BytesArray.EMPTY;
} else {
// Always finalize the output chunk with '\r\n' sequence
try (final OutputStream out = builder.getOutputStream()) {
builder.close();
out.write(CHUNK_SEPARATOR);
} catch (IOException ex) {
throw new UncheckedIOException(ex);

Check warning on line 55 in server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java#L54-L55

Added lines #L54 - L55 were not covered by tests
}
content = BytesReference.bytes(builder);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -95,6 +96,16 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
final Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null);
final TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT);
final String refresh = request.param("refresh");
final TimeValue batch_interval = request.paramAsTime("batch_interval", null);
final int batch_size = request.paramAsInt("batch_size", 1); /* by default, batch size of 1 */

if (batch_interval != null && batch_interval.duration() <= 0) {
throw new IllegalArgumentException("The batch_interval value should be non-negative [" + batch_interval.millis() + "ms].");

Check warning on line 103 in server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java#L103

Added line #L103 was not covered by tests
}

if (batch_size <= 0) {
throw new IllegalArgumentException("The batch_size value should be non-negative [" + batch_size + "].");

Check warning on line 107 in server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java#L107

Added line #L107 was not covered by tests
}

final StreamingRestChannelConsumer consumer = (channel) -> {
final MediaType mediaType = request.getMediaType();
Expand All @@ -114,39 +125,38 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// Set the content type and the status code before sending the response stream over
channel.prepareResponse(RestStatus.OK, Map.of("Content-Type", List.of(mediaType.mediaTypeWithoutParameters())));

// This is initial implementation at the moment which transforms each single request stream chunk into
// individual bulk request and streams each response back. Another source of inefficiency comes from converting
// bulk response from raw (json/yaml/...) to model and back to raw (json/yaml/...).

// TODOs:
// - add batching (by interval and/or count)
// - eliminate serialization inefficiencies
Flux.from(channel).zipWith(Flux.fromStream(Stream.generate(() -> {
createBufferedFlux(batch_interval, batch_size, channel).zipWith(Flux.fromStream(Stream.generate(() -> {

Check warning on line 130 in server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java#L130

Added line #L130 was not covered by tests
BulkRequest bulkRequest = Requests.bulkRequest();
bulkRequest.waitForActiveShards(prepareBulkRequest.waitForActiveShards());
bulkRequest.timeout(prepareBulkRequest.timeout());
bulkRequest.setRefreshPolicy(prepareBulkRequest.getRefreshPolicy());
return bulkRequest;
}))).map(t -> {
final HttpChunk chunk = t.getT1();
boolean isLast = false;
final List<HttpChunk> chunks = t.getT1();

Check warning on line 138 in server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java#L137-L138

Added lines #L137 - L138 were not covered by tests
final BulkRequest bulkRequest = t.getT2();

try (chunk) {
bulkRequest.add(
chunk.content(),
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
allowExplicitIndex,
request.getMediaType()
);
} catch (final IOException ex) {
throw new UncheckedIOException(ex);
for (final HttpChunk chunk : chunks) {
isLast |= chunk.isLast();
try (chunk) {
bulkRequest.add(
chunk.content(),

Check warning on line 145 in server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java#L142-L145

Added lines #L142 - L145 were not covered by tests
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
allowExplicitIndex,
request.getMediaType()

Check warning on line 152 in server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java#L152

Added line #L152 was not covered by tests
);
} catch (final IOException ex) {
throw new UncheckedIOException(ex);
}

Check warning on line 156 in server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java#L154-L156

Added lines #L154 - L156 were not covered by tests
}

return Tuple.tuple(chunk.isLast(), bulkRequest);
return Tuple.tuple(isLast, bulkRequest);

Check warning on line 159 in server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java#L159

Added line #L159 was not covered by tests
}).flatMap(tuple -> {
final CompletableFuture<BulkResponse> f = new CompletableFuture<>();

Expand Down Expand Up @@ -222,4 +232,12 @@ public boolean supportsStreaming() {
public boolean allowsUnsafeBuffers() {
return true;
}

private Flux<List<HttpChunk>> createBufferedFlux(final TimeValue batch_interval, final int batch_size, StreamingRestChannel channel) {
if (batch_interval != null) {
return Flux.from(channel).bufferTimeout(batch_size, Duration.ofMillis(batch_interval.millis()));

Check warning on line 238 in server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java#L238

Added line #L238 was not covered by tests
} else {
return Flux.from(channel).buffer(batch_size);

Check warning on line 240 in server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java#L240

Added line #L240 was not covered by tests
}
}
}

0 comments on commit 20f72eb

Please sign in to comment.