From 8d17c8d87e67db8924ed66ab0128f0bdc6aff274 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Wed, 28 Aug 2024 11:23:56 -0400 Subject: [PATCH] [Streaming Indexing] Introduce bulk HTTP API streaming flavor (#15381) * [Streaming Indexing] Introduce bulk HTTP API streaming flavor Signed-off-by: Andriy Redko * Address code review comments Signed-off-by: Andriy Redko * Add more test cases Signed-off-by: Andriy Redko * Add more test cases Signed-off-by: Andriy Redko --------- Signed-off-by: Andriy Redko --- CHANGELOG.md | 1 + buildSrc/version.properties | 2 +- .../rest/licenses/httpclient5-5.2.3.jar.sha1 | 1 - .../rest/licenses/httpclient5-5.3.1.jar.sha1 | 1 + .../org/opensearch/client/RestClient.java | 50 +++++- .../licenses/httpclient5-5.2.3.jar.sha1 | 1 - .../licenses/httpclient5-5.3.1.jar.sha1 | 1 + .../rest/ReactorNetty4StreamingIT.java | 163 +++++++++++++++++- .../ReactorNetty4StreamingHttpChannel.java | 1 - ...ReactorNetty4StreamingRequestConsumer.java | 2 +- ...tty4HttpServerTransportStreamingTests.java | 2 +- .../xcontent/support/XContentHttpChunk.java | 5 +- .../document/RestBulkStreamingAction.java | 71 +++++--- 13 files changed, 271 insertions(+), 30 deletions(-) delete mode 100644 client/rest/licenses/httpclient5-5.2.3.jar.sha1 create mode 100644 client/rest/licenses/httpclient5-5.3.1.jar.sha1 delete mode 100644 client/sniffer/licenses/httpclient5-5.2.3.jar.sha1 create mode 100644 client/sniffer/licenses/httpclient5-5.3.1.jar.sha1 diff --git a/CHANGELOG.md b/CHANGELOG.md index 0cd030290d4de..6b95e9ec57733 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325)) - Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895)) - Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336)) +- [Streaming Indexing] Introduce bulk HTTP API streaming flavor ([#15381](https://github.com/opensearch-project/OpenSearch/pull/15381)) - Add support for centralize snapshot creation with pinned timestamp ([#15124](https://github.com/opensearch-project/OpenSearch/pull/15124)) - Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326)) diff --git a/buildSrc/version.properties b/buildSrc/version.properties index ccec8e2891a65..98f474a7f0b90 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -37,7 +37,7 @@ reactor_netty = 1.1.22 reactor = 3.5.20 # client dependencies -httpclient5 = 5.2.3 +httpclient5 = 5.3.1 httpcore5 = 5.2.5 httpclient = 4.5.14 httpcore = 4.4.16 diff --git a/client/rest/licenses/httpclient5-5.2.3.jar.sha1 b/client/rest/licenses/httpclient5-5.2.3.jar.sha1 deleted file mode 100644 index 43e233e72001a..0000000000000 --- a/client/rest/licenses/httpclient5-5.2.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -5d753a99d299756998a08c488f2efdf9cf26198e \ No newline at end of file diff --git a/client/rest/licenses/httpclient5-5.3.1.jar.sha1 b/client/rest/licenses/httpclient5-5.3.1.jar.sha1 new file mode 100644 index 0000000000000..c8f32c1ec23a1 --- /dev/null +++ b/client/rest/licenses/httpclient5-5.3.1.jar.sha1 @@ -0,0 +1 @@ +56b53c8f4bcdaada801d311cf2ff8a24d6d96883 \ No newline at end of file diff --git a/client/rest/src/main/java/org/opensearch/client/RestClient.java b/client/rest/src/main/java/org/opensearch/client/RestClient.java index 5c87e3fda5701..ab112ca5219e7 100644 --- a/client/rest/src/main/java/org/opensearch/client/RestClient.java +++ b/client/rest/src/main/java/org/opensearch/client/RestClient.java @@ -114,6 +114,7 @@ import java.util.zip.GZIPOutputStream; import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; @@ -416,7 +417,12 @@ private Publisher>> streamRequest( try { final ResponseOrResponseException responseOrResponseException = convertResponse(request, node, message); if (responseOrResponseException.responseException == null) { - return Mono.just(message); + return Mono.just( + new Message<>( + message.getHead(), + Flux.from(message.getBody()).flatMapSequential(b -> Flux.fromIterable(frame(b))) + ) + ); } else { if (nodeTuple.nodes.hasNext()) { return Mono.from(streamRequest(nodeTuple, request)); @@ -431,6 +437,48 @@ private Publisher>> streamRequest( }); } + /** + * Frame the {@link ByteBuffer} into individual chunks that are separated by '\r\n' sequence. + * @param b {@link ByteBuffer} to split + * @return individual chunks + */ + private static Collection frame(ByteBuffer b) { + final Collection buffers = new ArrayList<>(); + + int position = b.position(); + while (b.hasRemaining()) { + // Skip the chunk separator when it comes right at the beginning + if (b.get() == '\r' && b.hasRemaining() && b.position() > 1) { + if (b.get() == '\n') { + final byte[] chunk = new byte[b.position() - position]; + + b.position(position); + b.get(chunk); + + // Do not copy the '\r\n' sequence + buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length - 2)); + position = b.position(); + } + } + } + + if (buffers.isEmpty()) { + return Collections.singleton(b); + } + + // Copy last chunk + if (position != b.position()) { + final byte[] chunk = new byte[b.position() - position]; + + b.position(position); + b.get(chunk); + + buffers.add(ByteBuffer.wrap(chunk, 0, chunk.length)); + } + + return buffers; + } + private ResponseOrResponseException convertResponse(InternalRequest request, Node node, ClassicHttpResponse httpResponse) throws IOException { RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse); diff --git a/client/sniffer/licenses/httpclient5-5.2.3.jar.sha1 b/client/sniffer/licenses/httpclient5-5.2.3.jar.sha1 deleted file mode 100644 index 43e233e72001a..0000000000000 --- a/client/sniffer/licenses/httpclient5-5.2.3.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -5d753a99d299756998a08c488f2efdf9cf26198e \ No newline at end of file diff --git a/client/sniffer/licenses/httpclient5-5.3.1.jar.sha1 b/client/sniffer/licenses/httpclient5-5.3.1.jar.sha1 new file mode 100644 index 0000000000000..c8f32c1ec23a1 --- /dev/null +++ b/client/sniffer/licenses/httpclient5-5.3.1.jar.sha1 @@ -0,0 +1 @@ +56b53c8f4bcdaada801d311cf2ff8a24d6d96883 \ No newline at end of file diff --git a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java index c564e289e3f88..6f3895fffa437 100644 --- a/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java +++ b/plugins/transport-reactor-netty4/src/javaRestTest/java/org/opensearch/rest/ReactorNetty4StreamingIT.java @@ -44,7 +44,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testStreamingRequest() throws IOException { + public void testStreamingRequestNoBatching() throws IOException { final VirtualTimeScheduler scheduler = VirtualTimeScheduler.create(true); final Stream stream = IntStream.range(1, 6) @@ -85,6 +85,167 @@ public void testStreamingRequest() throws IOException { assertThat(count, equalTo(5)); } + public void testStreamingRequestOneBatchBySize() throws IOException, InterruptedException { + final Stream stream = IntStream.range(1, 6) + .mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n"); + + final Duration delay = Duration.ofMillis(1); + final StreamingRequest streamingRequest = new StreamingRequest<>( + "POST", + "/_bulk/stream", + Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + ); + streamingRequest.addParameter("refresh", "true"); + streamingRequest.addParameter("batch_size", "5"); + + final StreamingResponse streamingResponse = client().streamRequest(streamingRequest); + + StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) + .expectNextMatches( + s -> s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"1\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"2\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"3\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"4\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"5\"") + ) + .expectComplete() + .verify(); + + assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(streamingResponse.getWarnings(), empty()); + + final Request request = new Request("GET", "/test-streaming/_count"); + final Response response = client().performRequest(request); + final ObjectPath objectPath = ObjectPath.createFromResponse(response); + final Integer count = objectPath.evaluate("count"); + assertThat(count, equalTo(5)); + } + + public void testStreamingRequestManyBatchesBySize() throws IOException { + final Stream stream = IntStream.range(1, 6) + .mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n"); + + final Duration delay = Duration.ofMillis(1); + final StreamingRequest streamingRequest = new StreamingRequest<>( + "POST", + "/_bulk/stream", + Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + ); + streamingRequest.addParameter("refresh", "true"); + streamingRequest.addParameter("batch_size", "3"); + + final StreamingResponse streamingResponse = client().streamRequest(streamingRequest); + + StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) + .expectNextMatches( + s -> s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"1\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"2\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"3\"") + ) + .expectNextMatches( + s -> s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"4\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"5\"") + ) + .expectComplete() + .verify(); + + assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(streamingResponse.getWarnings(), empty()); + + final Request request = new Request("GET", "/test-streaming/_count"); + final Response response = client().performRequest(request); + final ObjectPath objectPath = ObjectPath.createFromResponse(response); + final Integer count = objectPath.evaluate("count"); + assertThat(count, equalTo(5)); + } + + public void testStreamingRequestManyBatchesByInterval() throws IOException { + final Stream stream = IntStream.range(1, 6) + .mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n"); + + final Duration delay = Duration.ofMillis(500); + final StreamingRequest streamingRequest = new StreamingRequest<>( + "POST", + "/_bulk/stream", + Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + ); + streamingRequest.addParameter("refresh", "true"); + streamingRequest.addParameter("batch_interval", "5s"); + + final StreamingResponse streamingResponse = client().streamRequest(streamingRequest); + + // We don't check for a other documents here since those may appear in any of the chunks (it is very + // difficult to get the timing right). But at the end, the total number of the documents is being checked. + StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) + .expectNextMatches( + s -> s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"1\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"2\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"3\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"4\"") + && s.contains("\"result\":\"created\"") + && s.contains("\"_id\":\"5\"") + ) + .expectComplete() + .verify(); + + assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(streamingResponse.getWarnings(), empty()); + + final Request request = new Request("GET", "/test-streaming/_count"); + final Response response = client().performRequest(request); + final ObjectPath objectPath = ObjectPath.createFromResponse(response); + final Integer count = objectPath.evaluate("count"); + assertThat(count, equalTo(5)); + } + + public void testStreamingRequestManyBatchesByIntervalAndSize() throws IOException { + final Stream stream = IntStream.range(1, 6) + .mapToObj(id -> "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"" + id + "\" } }\n" + "{ \"name\": \"josh\" }\n"); + + final Duration delay = Duration.ofSeconds(1); + final StreamingRequest streamingRequest = new StreamingRequest<>( + "POST", + "/_bulk/stream", + Flux.fromStream(stream).delayElements(delay).map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + ); + streamingRequest.addParameter("refresh", "true"); + streamingRequest.addParameter("batch_interval", "3s"); + streamingRequest.addParameter("batch_size", "5"); + + final StreamingResponse streamingResponse = client().streamRequest(streamingRequest); + + // We don't check for a other documents here since those may appear in any of the chunks (it is very + // difficult to get the timing right). But at the end, the total number of the documents is being checked. + StepVerifier.create(Flux.from(streamingResponse.getBody()).map(b -> new String(b.array(), StandardCharsets.UTF_8))) + .expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"1\"")) + .expectNextMatches(s -> s.contains("\"result\":\"created\"") && s.contains("\"_id\":\"5\"")) + .expectComplete() + .verify(); + + assertThat(streamingResponse.getStatusLine().getStatusCode(), equalTo(200)); + assertThat(streamingResponse.getWarnings(), empty()); + + final Request request = new Request("GET", "/test-streaming/_count"); + final Response response = client().performRequest(request); + final ObjectPath objectPath = ObjectPath.createFromResponse(response); + final Integer count = objectPath.evaluate("count"); + assertThat(count, equalTo(5)); + } + public void testStreamingBadRequest() throws IOException { final Stream stream = Stream.of( "{ \"index\": { \"_index\": \"test-streaming\", \"_id\": \"1\" } }\n" + "{ \"name\": \"josh\" }\n" diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java index 1aa03aa9967e2..12ed847c0c0de 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java @@ -103,7 +103,6 @@ public void receiveChunk(HttpChunk message) { } } catch (final Exception ex) { producer.error(ex); - } finally { message.close(); } } diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java index 8ed6710c8a1e3..282a82dc39fda 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java @@ -44,7 +44,7 @@ public void subscribe(Subscriber s) { } HttpChunk createChunk(HttpContent chunk, boolean last) { - return new ReactorNetty4HttpChunk(chunk.copy().content(), last); + return new ReactorNetty4HttpChunk(chunk.copy().content().retain(), last); } StreamingHttpChannel httpChannel() { diff --git a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java index a7bf71e58e9b6..df0e4027cc474 100644 --- a/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java +++ b/plugins/transport-reactor-netty4/src/test/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpServerTransportStreamingTests.java @@ -191,7 +191,7 @@ public void dispatchBadRequest(final RestChannel channel, final ThreadContext th } catch (final IOException ex) { throw new UncheckedIOException(ex); } - }).collect(Collectors.joining("")))); + }).collect(Collectors.joining("\r\n", "", "\r\n")))); } finally { response.release(); } diff --git a/server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java b/server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java index 15b63a0ac2030..a7f1d30cd05dd 100644 --- a/server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java +++ b/server/src/main/java/org/opensearch/common/xcontent/support/XContentHttpChunk.java @@ -12,6 +12,7 @@ import org.opensearch.common.lease.Releasable; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.bytes.CompositeBytesReference; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.http.HttpChunk; @@ -19,6 +20,7 @@ * Wraps the instance of the {@link XContentBuilder} into {@link HttpChunk} */ public final class XContentHttpChunk implements HttpChunk { + private static final byte[] CHUNK_SEPARATOR = new byte[] { '\r', '\n' }; private final BytesReference content; /** @@ -42,7 +44,8 @@ private XContentHttpChunk(@Nullable final XContentBuilder builder) { if (builder == null /* no content */) { content = BytesArray.EMPTY; } else { - content = BytesReference.bytes(builder); + // Always finalize the output chunk with '\r\n' sequence + content = CompositeBytesReference.of(BytesReference.bytes(builder), new BytesArray(CHUNK_SEPARATOR)); } } diff --git a/server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java b/server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java index a38244fe9ff20..2e0d1b8ead814 100644 --- a/server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java +++ b/server/src/main/java/org/opensearch/rest/action/document/RestBulkStreamingAction.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -95,6 +96,17 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC final Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null); final TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT); final String refresh = request.param("refresh"); + final TimeValue batchInterval = request.paramAsTime("batch_interval", null); + final int batchSize = request.paramAsInt("batch_size", 1); /* by default, batch size of 1 */ + final boolean hasBatchSize = request.hasParam("batch_size"); /* is batch_size explicitly specified or default is used */ + + if (batchInterval != null && batchInterval.duration() <= 0) { + throw new IllegalArgumentException("The batch_interval value should be non-negative [" + batchInterval.millis() + "ms]."); + } + + if (batchSize <= 0) { + throw new IllegalArgumentException("The batch_size value should be non-negative [" + batchSize + "]."); + } final StreamingRestChannelConsumer consumer = (channel) -> { final MediaType mediaType = request.getMediaType(); @@ -114,39 +126,38 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC // Set the content type and the status code before sending the response stream over channel.prepareResponse(RestStatus.OK, Map.of("Content-Type", List.of(mediaType.mediaTypeWithoutParameters()))); - // This is initial implementation at the moment which transforms each single request stream chunk into - // individual bulk request and streams each response back. Another source of inefficiency comes from converting - // bulk response from raw (json/yaml/...) to model and back to raw (json/yaml/...). - // TODOs: - // - add batching (by interval and/or count) // - eliminate serialization inefficiencies - Flux.from(channel).zipWith(Flux.fromStream(Stream.generate(() -> { + createBufferedFlux(batchInterval, batchSize, hasBatchSize, channel).zipWith(Flux.fromStream(Stream.generate(() -> { BulkRequest bulkRequest = Requests.bulkRequest(); bulkRequest.waitForActiveShards(prepareBulkRequest.waitForActiveShards()); bulkRequest.timeout(prepareBulkRequest.timeout()); bulkRequest.setRefreshPolicy(prepareBulkRequest.getRefreshPolicy()); return bulkRequest; }))).map(t -> { - final HttpChunk chunk = t.getT1(); + boolean isLast = false; + final List chunks = t.getT1(); final BulkRequest bulkRequest = t.getT2(); - try (chunk) { - bulkRequest.add( - chunk.content(), - defaultIndex, - defaultRouting, - defaultFetchSourceContext, - defaultPipeline, - defaultRequireAlias, - allowExplicitIndex, - request.getMediaType() - ); - } catch (final IOException ex) { - throw new UncheckedIOException(ex); + for (final HttpChunk chunk : chunks) { + isLast |= chunk.isLast(); + try (chunk) { + bulkRequest.add( + chunk.content(), + defaultIndex, + defaultRouting, + defaultFetchSourceContext, + defaultPipeline, + defaultRequireAlias, + allowExplicitIndex, + request.getMediaType() + ); + } catch (final IOException ex) { + throw new UncheckedIOException(ex); + } } - return Tuple.tuple(chunk.isLast(), bulkRequest); + return Tuple.tuple(isLast, bulkRequest); }).flatMap(tuple -> { final CompletableFuture f = new CompletableFuture<>(); @@ -222,4 +233,22 @@ public boolean supportsStreaming() { public boolean allowsUnsafeBuffers() { return true; } + + private Flux> createBufferedFlux( + final TimeValue batchInterval, + final int batchSize, + final boolean hasBatchSize, + StreamingRestChannel channel + ) { + if (batchInterval != null) { + // If non-default batch size is specified, buffer by interval and batch + if (hasBatchSize) { + return Flux.from(channel).bufferTimeout(batchSize, Duration.ofMillis(batchInterval.millis())); + } else { + return Flux.from(channel).buffer(Duration.ofMillis(batchInterval.millis())); + } + } else { + return Flux.from(channel).buffer(batchSize); + } + } }