Skip to content

Commit

Permalink
Address code review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed Aug 28, 2024
1 parent 0f54282 commit e6487d9
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import java.util.function.Consumer;

import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import org.reactivestreams.Publisher;
Expand All @@ -24,12 +23,10 @@
class ReactorNetty4StreamingRequestConsumer<T extends HttpContent> implements Consumer<T>, Publisher<HttpContent> {
private final ReactorNetty4StreamingResponseProducer sender;
private final StreamingHttpChannel httpChannel;
private final ByteBufAllocator alloc;

ReactorNetty4StreamingRequestConsumer(HttpServerRequest request, HttpServerResponse response) {
this.sender = new ReactorNetty4StreamingResponseProducer();
this.httpChannel = new ReactorNetty4StreamingHttpChannel(request, response, sender);
this.alloc = response.alloc();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,10 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.bytes.CompositeBytesReference;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.http.HttpChunk;

import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;

/**
* Wraps the instance of the {@link XContentBuilder} into {@link HttpChunk}
*/
Expand Down Expand Up @@ -48,13 +45,7 @@ private XContentHttpChunk(@Nullable final XContentBuilder builder) {
content = BytesArray.EMPTY;
} else {
// Always finalize the output chunk with '\r\n' sequence
try (final OutputStream out = builder.getOutputStream()) {
builder.close();
out.write(CHUNK_SEPARATOR);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
content = BytesReference.bytes(builder);
content = CompositeBytesReference.of(BytesReference.bytes(builder), new BytesArray(CHUNK_SEPARATOR));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
final Boolean defaultRequireAlias = request.paramAsBoolean(DocWriteRequest.REQUIRE_ALIAS, null);
final TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT);
final String refresh = request.param("refresh");
final TimeValue batch_interval = request.paramAsTime("batch_interval", null);
final int batch_size = request.paramAsInt("batch_size", 1); /* by default, batch size of 1 */
final TimeValue batchInterval = request.paramAsTime("batch_interval", null);
final int batchSize = request.paramAsInt("batch_size", 1); /* by default, batch size of 1 */

if (batch_interval != null && batch_interval.duration() <= 0) {
throw new IllegalArgumentException("The batch_interval value should be non-negative [" + batch_interval.millis() + "ms].");
if (batchInterval != null && batchInterval.duration() <= 0) {
throw new IllegalArgumentException("The batch_interval value should be non-negative [" + batchInterval.millis() + "ms].");
}

if (batch_size <= 0) {
throw new IllegalArgumentException("The batch_size value should be non-negative [" + batch_size + "].");
if (batchSize <= 0) {
throw new IllegalArgumentException("The batch_size value should be non-negative [" + batchSize + "].");
}

final StreamingRestChannelConsumer consumer = (channel) -> {
Expand All @@ -127,7 +127,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC

// TODOs:
// - eliminate serialization inefficiencies
createBufferedFlux(batch_interval, batch_size, channel).zipWith(Flux.fromStream(Stream.generate(() -> {
createBufferedFlux(batchInterval, batchSize, channel).zipWith(Flux.fromStream(Stream.generate(() -> {
BulkRequest bulkRequest = Requests.bulkRequest();
bulkRequest.waitForActiveShards(prepareBulkRequest.waitForActiveShards());
bulkRequest.timeout(prepareBulkRequest.timeout());
Expand Down

0 comments on commit e6487d9

Please sign in to comment.