Skip to content

Commit

Permalink
[Streaming Indexing] Introduce bulk HTTP API streaming flavor
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed Aug 27, 2024
1 parent 091ab6f commit b8d2274
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
2 changes: 1 addition & 1 deletion buildSrc/version.properties
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ reactor_netty = 1.1.22
reactor = 3.5.20

# client dependencies
httpclient5 = 5.2.3
httpclient5 = 5.3.1
httpcore5 = 5.2.5
httpclient = 4.5.14
httpcore = 4.4.16
Expand Down
1 change: 0 additions & 1 deletion client/rest/licenses/httpclient5-5.2.3.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions client/rest/licenses/httpclient5-5.3.1.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
56b53c8f4bcdaada801d311cf2ff8a24d6d96883
51 changes: 50 additions & 1 deletion client/rest/src/main/java/org/opensearch/client/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import java.util.zip.GZIPOutputStream;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

Expand All @@ -140,6 +141,7 @@
public class RestClient implements Closeable {

private static final Log logger = LogFactory.getLog(RestClient.class);
private static final byte CHUNK_SEPARATOR = '\n';

private final CloseableHttpAsyncClient client;
// We don't rely on default headers supported by HttpAsyncClient as those cannot be replaced.
Expand Down Expand Up @@ -416,7 +418,12 @@ private Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamRequest(
try {
final ResponseOrResponseException responseOrResponseException = convertResponse(request, node, message);
if (responseOrResponseException.responseException == null) {
return Mono.just(message);
return Mono.just(
new Message<>(
message.getHead(),
Flux.from(message.getBody()).flatMapSequential(b -> Flux.fromIterable(frame(b)))
)
);
} else {
if (nodeTuple.nodes.hasNext()) {
return Mono.from(streamRequest(nodeTuple, request));
Expand All @@ -431,6 +438,48 @@ private Publisher<Message<HttpResponse, Publisher<ByteBuffer>>> streamRequest(
});
}

/**
* Frame the {@link ByteBuffer} into individual chunks that are separated by '\r\n' sequence.
* @param b {@link ByteBuffer} to split
* @return individual chunks
*/
private static Collection<ByteBuffer> frame(ByteBuffer b) {
final Collection<ByteBuffer> buffers = new ArrayList<>();

int position = b.position();
while (b.hasRemaining()) {
// Skip the chunk separator when it comes right at the beginning
if (b.get() == '\r' && b.hasRemaining() && position > 1) {
if (b.get() == '\n') {
final byte[] chunk = new byte[b.position() - position];

b.position(position);
b.get(chunk);

// Do not copy the '\r\n' sequence
buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length - 2));
position = b.position();
}
}
}

if (buffers.isEmpty()) {
return Collections.singleton(b);
}

// Copy last chunk
if (position != b.position()) {
final byte[] chunk = new byte[b.position() - position];

b.position(position);
b.get(chunk);

buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length));
}

return buffers;
}

private ResponseOrResponseException convertResponse(InternalRequest request, Node node, ClassicHttpResponse httpResponse)
throws IOException {
RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse);
Expand Down
1 change: 0 additions & 1 deletion client/sniffer/licenses/httpclient5-5.2.3.jar.sha1

This file was deleted.

1 change: 1 addition & 0 deletions client/sniffer/licenses/httpclient5-5.3.1.jar.sha1
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
56b53c8f4bcdaada801d311cf2ff8a24d6d96883
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.IntStream;
import java.util.stream.Stream;

Expand All @@ -44,7 +46,7 @@ public void tearDown() throws Exception {
super.tearDown();
}

public void testStreamingRequest() throws IOException {
public void testStreamingRequestNoBatching() throws IOException {
final VirtualTimeScheduler scheduler = VirtualTimeScheduler.create(true);

final Stream<String> stream = IntStream.range(1, 6)
Expand Down Expand Up @@ -85,6 +87,91 @@ public void testStreamingRequest() throws IOException {
assertThat(count, equalTo(5));
}

public void testStreamingRequestOneBatch() throws IOException, InterruptedException {
final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");

final Duration delay = Duration.ofSeconds(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
);
streamingRequest.addParameter("refresh", "true");
streamingRequest.addParameter("batch_size", "5");

final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);

StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
.expectNextMatches(
s -> s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"1\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"2\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"3\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"4\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"5\"")
)
.expectComplete()
.verify();

assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
assertThat(streamingResponse.getWarnings(), empty());

final Request request = new Request("GET", "/test-streaming/_count");
final Response response = client().performRequest(request);
final ObjectPath objectPath = ObjectPath.createFromResponse(response);
final Integer count = objectPath.evaluate("count");
assertThat(count, equalTo(5));
}

public void testStreamingRequestManyBatches() throws IOException {
final Stream<String> stream = IntStream.range(1, 6)
.mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n");

final Duration delay = Duration.ofSeconds(1);
final StreamingRequest<ByteBuffer> streamingRequest = new StreamingRequest<>(
"POST",
"/_bulk/stream",
Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
);
streamingRequest.addParameter("refresh", "true");
streamingRequest.addParameter("batch_size", "3");

final StreamingResponse<ByteBuffer> streamingResponse = client().streamRequest(streamingRequest);

StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8)))
.expectNextMatches(
s -> s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"1\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"2\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"3\"")
)
.expectNextMatches(
s -> s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"4\"")
&& s.contains("\"result\":\"created\"")
&& s.contains("\"_id\":\"5\"")
)
.expectComplete()
.verify();

assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200));
assertThat(streamingResponse.getWarnings(), empty());

final Request request = new Request("GET", "/test-streaming/_count");
final Response response = client().performRequest(request);
final ObjectPath objectPath = ObjectPath.createFromResponse(response);
final Integer count = objectPath.evaluate("count");
assertThat(count, equalTo(5));
}

public void testStreamingBadRequest() throws IOException {
final Stream<String> stream = Stream.of(
"{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n" + "{ \"name\": \"josh\" }\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.util.function.Consumer;

import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import org.reactivestreams.Publisher;
Expand All @@ -23,10 +24,12 @@
class ReactorNetty4StreamingRequestConsumer<T extends HttpContent> implements Consumer<T>, Publisher<HttpContent> {
private final ReactorNetty4StreamingResponseProducer sender;
private final StreamingHttpChannel httpChannel;
private final ByteBufAllocator alloc;

ReactorNetty4StreamingRequestConsumer(HttpServerRequest request, HttpServerResponse response) {
this.sender = new ReactorNetty4StreamingResponseProducer();
this.httpChannel = new ReactorNetty4StreamingHttpChannel(request, response, sender);
this.alloc = response.alloc();
}

@Override
Expand All @@ -44,7 +47,7 @@ public void subscribe(Subscriber<? super HttpContent> s) {
}

HttpChunk createChunk(HttpContent chunk, boolean last) {
return new ReactorNetty4HttpChunk(chunk.copy().content(), last);
return new ReactorNetty4HttpChunk(chunk.copy().content().retain(), last);
}

StreamingHttpChannel httpChannel() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.http.HttpChunk;

import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;

/**
* Wraps the instance of the {@link XContentBuilder} into {@link HttpChunk}
*/
public final class XContentHttpChunk implements HttpChunk {
private static final byte[] CHUNK_SEPARATOR = new byte[] { '\r', '\n' };
private final BytesReference content;

/**
Expand All @@ -42,6 +47,12 @@ private XContentHttpChunk(@Nullable final XContentBuilder builder) {
if (builder == null /* no content */) {
content = BytesArray.EMPTY;
} else {
// Always finalize the output chunk with '\r\n' sequence
try (final OutputStream out = builder.getOutputStream()) {
out.write(CHUNK_SEPARATOR);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
content = BytesReference.bytes(builder);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -95,6 +96,16 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
final Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null);
final TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT);
final String refresh = request.param("refresh");
final TimeValue batch_interval = request.paramAsTime("batch_interval", null);
final int batch_size = request.paramAsInt("batch_size", 1); /* by default, batch size of 1 */

if (batch_interval != null && batch_interval.duration() <= 0) {
throw new IllegalArgumentException("The batch_interval value should be non-negative [" + batch_interval.millis() + "ms].");
}

if (batch_size <= 0) {
throw new IllegalArgumentException("The batch_size value should be non-negative [" + batch_size + "].");
}

final StreamingRestChannelConsumer consumer = (channel) -> {
final MediaType mediaType = request.getMediaType();
Expand All @@ -114,39 +125,38 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// Set the content type and the status code before sending the response stream over
channel.prepareResponse(RestStatus.OK, Map.of("Content-Type", List.of(mediaType.mediaTypeWithoutParameters())));

// This is initial implementation at the moment which transforms each single request stream chunk into
// individual bulk request and streams each response back. Another source of inefficiency comes from converting
// bulk response from raw (json/yaml/...) to model and back to raw (json/yaml/...).

// TODOs:
// - add batching (by interval and/or count)
// - eliminate serialization inefficiencies
Flux.from(channel).zipWith(Flux.fromStream(Stream.generate(() -> {
createBufferedFlux(batch_interval, batch_size, channel).zipWith(Flux.fromStream(Stream.generate(() -> {
BulkRequest bulkRequest = Requests.bulkRequest();
bulkRequest.waitForActiveShards(prepareBulkRequest.waitForActiveShards());
bulkRequest.timeout(prepareBulkRequest.timeout());
bulkRequest.setRefreshPolicy(prepareBulkRequest.getRefreshPolicy());
return bulkRequest;
}))).map(t -> {
final HttpChunk chunk = t.getT1();
boolean isLast = false;
final List<HttpChunk> chunks = t.getT1();
final BulkRequest bulkRequest = t.getT2();

try (chunk) {
bulkRequest.add(
chunk.content(),
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
allowExplicitIndex,
request.getMediaType()
);
} catch (final IOException ex) {
throw new UncheckedIOException(ex);
for (final HttpChunk chunk : chunks) {
isLast |= chunk.isLast();
try (chunk) {
bulkRequest.add(
chunk.content(),
defaultIndex,
defaultRouting,
defaultFetchSourceContext,
defaultPipeline,
defaultRequireAlias,
allowExplicitIndex,
request.getMediaType()
);
} catch (final IOException ex) {
throw new UncheckedIOException(ex);
}
}

return Tuple.tuple(chunk.isLast(), bulkRequest);
return Tuple.tuple(isLast, bulkRequest);
}).flatMap(tuple -> {
final CompletableFuture<BulkResponse> f = new CompletableFuture<>();

Expand Down Expand Up @@ -222,4 +232,12 @@ public boolean supportsStreaming() {
public boolean allowsUnsafeBuffers() {
return true;
}

private Flux<List<HttpChunk>> createBufferedFlux(final TimeValue batch_interval, final int batch_size, StreamingRestChannel channel) {
if (batch_interval != null) {
return Flux.from(channel).bufferTimeout(batch_size, Duration.ofMillis(batch_interval.millis()));
} else {
return Flux.from(channel).buffer(batch_size);
}
}
}

0 comments on commit b8d2274

Please sign in to comment.