From b8d22742c8de8d668a737dcc345e9c1b75de3cac Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Fri, 23 Aug 2024 11:10:45 -0400 Subject: [PATCH] [Streaming Indexing] Introduce bulk HTTP API streaming flavor Signed-off-by: Andriy Redko --- CHANGELOG.md | 1 + buildSrc/version.properties | 2 +- .../rest/licenses/httpclient5-5.2.3.jar.sha1 | 1 - .../rest/licenses/httpclient5-5.3.1.jar.sha1 | 1 + .../org/opensearch/client/RestClient.java | 51 ++++++++++- .../licenses/httpclient5-5.2.3.jar.sha1 | 1 - .../licenses/httpclient5-5.3.1.jar.sha1 | 1 + .../rest/ReactorNetty4StreamingIT.java | 89 ++++++++++++++++++- ...ReactorNetty4StreamingRequestConsumer.java | 5 +- .../xcontent/support/XContentHttpChunk.java | 11 +++ .../document/RestBulkStreamingAction.java | 60 ++++++++----- 11 files changed, 196 insertions(+), 27 deletions(-) delete mode 100644 client/rest/licenses/httpclient5-5.2.3.jar.sha1 create mode 100644 client/rest/licenses/httpclient5-5.3.1.jar.sha1 delete mode 100644 client/sniffer/licenses/httpclient5-5.2.3.jar.sha1 create mode 100644 client/sniffer/licenses/httpclient5-5.3.1.jar.sha1 diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d4ec6a635fde..f870d671bf00b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325)) - Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895)) - Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336)) +- [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381)) ### Dependencies - Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081)) diff --git a/buildSrc/version.properties b/buildSrc/version.properties index ccec8e2891a65..98f474a7f0b90 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -37,7 +37,7 @@ reactor_netty = 1.1.22 reactor = 3.5.20 # client dependencies -httpclient5 = 5.2.3 +httpclient5 = 5.3.1 httpcore5 = 5.2.5 httpclient = 4.5.14 httpcore = 4.4.16 diff --git a/client/rest/licenses/httpclient5-5.2.3.jar.sha1 b/client/rest/licenses/httpclient5-5.2.3.jar.sha1 deleted file mode 100644 index 43e233e72001a..0000000000000 --- a/client/rest/licenses/httpclient5-5.2.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -5d753a99d299756998a08c488f2efdf9cf26198e \ No newline at end of file diff --git a/client/rest/licenses/httpclient5-5.3.1.jar.sha1 b/client/rest/licenses/httpclient5-5.3.1.jar.sha1 new file mode 100644 index 0000000000000..c8f32c1ec23a1 --- /dev/null +++ b/client/rest/licenses/httpclient5-5.3.1.jar.sha1 @@ -0,0 +1 @@ +56b53c8f4bcdaada801d311cf2ff8a24d6d96883 \ No newline at end of file diff --git a/client/rest/src/main/java/org/opensearch/client/RestClient.java b/client/rest/src/main/java/org/opensearch/client/RestClient.java index 5c87e3fda5701..654f0a5b8c7b4 100644 --- a/client/rest/src/main/java/org/opensearch/client/RestClient.java +++ b/client/rest/src/main/java/org/opensearch/client/RestClient.java @@ -114,6 +114,7 @@ import java.util.zip.GZIPOutputStream; import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; @@ -140,6 +141,7 @@ public class RestClient implements Closeable { private static final Log logger = LogFactory.getLog(RestClient.class); + private static final byte CHUNK_SEPARATOR = '\n'; private final CloseableHttpAsyncClient client; // We don't rely on default headers supported by HttpAsyncClient as those cannot be replaced. @@ -416,7 +418,12 @@ private Publisher>> streamRequest( try { final ResponseOrResponseException responseOrResponseException = convertResponse(request, node, message); if (responseOrResponseException.responseException == null) { - return Mono.just(message); + return Mono.just( + new Message<>( + message.getHead(), + Flux.from(message.getBody()).flatMapSequential(b -> Flux.fromIterable(frame(b))) + ) + ); } else { if (nodeTuple.nodes.hasNext()) { return Mono.from(streamRequest(nodeTuple, request)); @@ -431,6 +438,48 @@ private Publisher>> streamRequest( }); } + /** + * Frame the {@link ByteBuffer} into individual chunks that are separated by '\r\n' sequence. + * @param b {@link ByteBuffer} to split + * @return individual chunks + */ + private static Collection frame(ByteBuffer b) { + final Collection buffers = new ArrayList<>(); + + int position = b.position(); + while (b.hasRemaining()) { + // Skip the chunk separator when it comes right at the beginning + if (b.get() == '\r' && b.hasRemaining() && position > 1) { + if (b.get() == '\n') { + final byte[] chunk = new byte[b.position() - position]; + + b.position(position); + b.get(chunk); + + // Do not copy the '\r\n' sequence + buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length - 2)); + position = b.position(); + } + } + } + + if (buffers.isEmpty()) { + return Collections.singleton(b); + } + + // Copy last chunk + if (position != b.position()) { + final byte[] chunk = new byte[b.position() - position]; + + b.position(position); + b.get(chunk); + + buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length)); + } + + return buffers; + } + private ResponseOrResponseException convertResponse(InternalRequest request, Node node, ClassicHttpResponse httpResponse) throws IOException { RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse); diff --git a/client/sniffer/licenses/httpclient5-5.2.3.jar.sha1 b/client/sniffer/licenses/httpclient5-5.2.3.jar.sha1 deleted file mode 100644 index 43e233e72001a..0000000000000 --- a/client/sniffer/licenses/httpclient5-5.2.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -5d753a99d299756998a08c488f2efdf9cf26198e \ No newline at end of file diff --git a/client/sniffer/licenses/httpclient5-5.3.1.jar.sha1 b/client/sniffer/licenses/httpclient5-5.3.1.jar.sha1 new file mode 100644 index 0000000000000..c8f32c1ec23a1 --- /dev/null +++ b/client/sniffer/licenses/httpclient5-5.3.1.jar.sha1 @@ -0,0 +1 @@ +56b53c8f4bcdaada801d311cf2ff8a24d6d96883 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java index c564e289e3f88..1810b27c1200c 100644 --- a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java +++ b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java @@ -21,6 +21,8 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -44,7 +46,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testStreamingRequest() throws IOException { + public void testStreamingRequestNoBatching() throws IOException { final VirtualTimeScheduler scheduler = VirtualTimeScheduler.create(true); final Stream stream = IntStream.range(1, 6) @@ -85,6 +87,91 @@ public void testStreamingRequest() throws IOException { assertThat(count, equalTo(5)); } + public void testStreamingRequestOneBatch() throws IOException, InterruptedException { + final Stream stream = IntStream.range(1, 6) + .mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n"); + + final Duration delay = Duration.ofSeconds(1); + final StreamingRequest streamingRequest = new StreamingRequest<>( + "POST", + "/_bulk/stream", + Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + ); + streamingRequest.addParameter("refresh", "true"); + streamingRequest.addParameter("batch_size", "5"); + + final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + final StreamingResponse streamingResponse = client().streamRequest(streamingRequest); + + StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) + .expectNextMatches( + s -> s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"1\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"2\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"3\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"4\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"5\"") + ) + .expectComplete() + .verify(); + + assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(streamingResponse.getWarnings(), empty()); + + final Request request = new Request("GET", "/test-streaming/_count"); + final Response response = client().performRequest(request); + final ObjectPath objectPath = ObjectPath.createFromResponse(response); + final Integer count = objectPath.evaluate("count"); + assertThat(count, equalTo(5)); + } + + public void testStreamingRequestManyBatches() throws IOException { + final Stream stream = IntStream.range(1, 6) + .mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n"); + + final Duration delay = Duration.ofSeconds(1); + final StreamingRequest streamingRequest = new StreamingRequest<>( + "POST", + "/_bulk/stream", + Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + ); + streamingRequest.addParameter("refresh", "true"); + streamingRequest.addParameter("batch_size", "3"); + + final StreamingResponse streamingResponse = client().streamRequest(streamingRequest); + + StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) + .expectNextMatches( + s -> s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"1\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"2\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"3\"") + ) + .expectNextMatches( + s -> s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"4\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"5\"") + ) + .expectComplete() + .verify(); + + assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(streamingResponse.getWarnings(), empty()); + + final Request request = new Request("GET", "/test-streaming/_count"); + final Response response = client().performRequest(request); + final ObjectPath objectPath = ObjectPath.createFromResponse(response); + final Integer count = objectPath.evaluate("count"); + assertThat(count, equalTo(5)); + } + public void testStreamingBadRequest() throws IOException { final Stream stream = Stream.of( "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n" + "{ \"name\": \"josh\" }\n" diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java index 8ed6710c8a1e3..15ed82afa8009 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java @@ -13,6 +13,7 @@ import java.util.function.Consumer; +import io.netty.buffer.ByteBufAllocator; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.LastHttpContent; import org.reactivestreams.Publisher; @@ -23,10 +24,12 @@ class ReactorNetty4StreamingRequestConsumer implements Consumer, Publisher { private final ReactorNetty4StreamingResponseProducer sender; private final StreamingHttpChannel httpChannel; + private final ByteBufAllocator alloc; ReactorNetty4StreamingRequestConsumer(HttpServerRequest request, HttpServerResponse response) { this.sender = new ReactorNetty4StreamingResponseProducer(); this.httpChannel = new ReactorNetty4StreamingHttpChannel(request, response, sender); + this.alloc = response.alloc(); } @Override @@ -44,7 +47,7 @@ public void subscribe(Subscriber s) { } HttpChunk createChunk(HttpContent chunk, boolean last) { - return new ReactorNetty4HttpChunk(chunk.copy().content(), last); + return new ReactorNetty4HttpChunk(chunk.copy().content().retain(), last); } StreamingHttpChannel httpChannel() { diff --git a/server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java b/server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java index 15b63a0ac2030..fee0291bbcd00 100644 --- a/server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java +++ b/server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java @@ -15,10 +15,15 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.http.HttpChunk; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; + /** * Wraps the instance of the {@link XContentBuilder} into {@link HttpChunk} */ public final class XContentHttpChunk implements HttpChunk { + private static final byte[] CHUNK_SEPARATOR = new byte[] { '\r', '\n' }; private final BytesReference content; /** @@ -42,6 +47,12 @@ private XContentHttpChunk(@Nullable final XContentBuilder builder) { if (builder == null /* no content */) { content = BytesArray.EMPTY; } else { + // Always finalize the output chunk with '\r\n' sequence + try (final OutputStream out = builder.getOutputStream()) { + out.write(CHUNK_SEPARATOR); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } content = BytesReference.bytes(builder); } } diff --git a/server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java b/server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java index a38244fe9ff20..216b5d5f73909 100644 --- a/server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java +++ b/server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -95,6 +96,16 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC final Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null); final TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT); final String refresh = request.param("refresh"); + final TimeValue batch_interval = request.paramAsTime("batch_interval", null); + final int batch_size = request.paramAsInt("batch_size", 1); /* by default, batch size of 1 */ + + if (batch_interval != null && batch_interval.duration() <= 0) { + throw new IllegalArgumentException("The batch_interval value should be non-negative [" + batch_interval.millis() + "ms]."); + } + + if (batch_size <= 0) { + throw new IllegalArgumentException("The batch_size value should be non-negative [" + batch_size + "]."); + } final StreamingRestChannelConsumer consumer = (channel) -> { final MediaType mediaType = request.getMediaType(); @@ -114,39 +125,38 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC // Set the content type and the status code before sending the response stream over channel.prepareResponse(RestStatus.OK, Map.of("Content-Type", List.of(mediaType.mediaTypeWithoutParameters()))); - // This is initial implementation at the moment which transforms each single request stream chunk into - // individual bulk request and streams each response back. Another source of inefficiency comes from converting - // bulk response from raw (json/yaml/...) to model and back to raw (json/yaml/...). - // TODOs: - // - add batching (by interval and/or count) // - eliminate serialization inefficiencies - Flux.from(channel).zipWith(Flux.fromStream(Stream.generate(() -> { + createBufferedFlux(batch_interval, batch_size, channel).zipWith(Flux.fromStream(Stream.generate(() -> { BulkRequest bulkRequest = Requests.bulkRequest(); bulkRequest.waitForActiveShards(prepareBulkRequest.waitForActiveShards()); bulkRequest.timeout(prepareBulkRequest.timeout()); bulkRequest.setRefreshPolicy(prepareBulkRequest.getRefreshPolicy()); return bulkRequest; }))).map(t -> { - final HttpChunk chunk = t.getT1(); + boolean isLast = false; + final List chunks = t.getT1(); final BulkRequest bulkRequest = t.getT2(); - try (chunk) { - bulkRequest.add( - chunk.content(), - defaultIndex, - defaultRouting, - defaultFetchSourceContext, - defaultPipeline, - defaultRequireAlias, - allowExplicitIndex, - request.getMediaType() - ); - } catch (final IOException ex) { - throw new UncheckedIOException(ex); + for (final HttpChunk chunk : chunks) { + isLast |= chunk.isLast(); + try (chunk) { + bulkRequest.add( + chunk.content(), + defaultIndex, + defaultRouting, + defaultFetchSourceContext, + defaultPipeline, + defaultRequireAlias, + allowExplicitIndex, + request.getMediaType() + ); + } catch (final IOException ex) { + throw new UncheckedIOException(ex); + } } - return Tuple.tuple(chunk.isLast(), bulkRequest); + return Tuple.tuple(isLast, bulkRequest); }).flatMap(tuple -> { final CompletableFuture f = new CompletableFuture<>(); @@ -222,4 +232,12 @@ public boolean supportsStreaming() { public boolean allowsUnsafeBuffers() { return true; } + + private Flux> createBufferedFlux(final TimeValue batch_interval, final int batch_size, StreamingRestChannel channel) { + if (batch_interval != null) { + return Flux.from(channel).bufferTimeout(batch_size, Duration.ofMillis(batch_interval.millis())); + } else { + return Flux.from(channel).buffer(batch_size); + } + } }