Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move body writing logic into http-server #11342

Open
wants to merge 5 commits into
base: 4.8.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
CR
  • Loading branch information
yawkat committed Dec 3, 2024
commit 8849864e15a42dcdf99eb3d0a3759fdc181f8b05
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -37,13 +36,16 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

/**
* {@link ByteBodyFactory} implementation with netty-optimized bodies.
*
* @since 4.8.0
* @author Jonas Konrad
*/
@Internal
public final class NettyByteBodyFactory extends ByteBodyFactory {
yawkat marked this conversation as resolved.
Show resolved Hide resolved
private final EventLoop eventLoop;

public NettyByteBodyFactory(@NonNull Channel channel) {
super(new NettyByteBufferFactory(channel.alloc()));
this.eventLoop = channel.eventLoop();
}

private ByteBufAllocator alloc() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,23 @@ public boolean isBlocking() {
}

@Override
public ByteBodyHttpResponse<?> write(@NonNull ByteBodyFactory bodyFactory, HttpRequest<?> request, MutableHttpResponse<Writable> outgoingResponse, Argument<Writable> type, MediaType mediaType, Writable object) throws CodecException {
public ByteBodyHttpResponse<?> write(@NonNull ByteBodyFactory bodyFactory,
HttpRequest<?> request,
MutableHttpResponse<Writable> outgoingResponse,
Argument<Writable> type,
MediaType mediaType,
Writable object) throws CodecException {
outgoingResponse.getHeaders().contentTypeIfMissing(mediaType);
return ByteBodyHttpResponseWrapper.wrap(outgoingResponse, writePiece(bodyFactory, request, outgoingResponse, type, mediaType, object));
}

@Override
public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory, @NonNull HttpRequest<?> request, @NonNull HttpResponse<?> response, @NonNull Argument<Writable> type, @NonNull MediaType mediaType, Writable object) {
public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory,
@NonNull HttpRequest<?> request,
@NonNull HttpResponse<?> response,
@NonNull Argument<Writable> type,
@NonNull MediaType mediaType,
Writable object) {
ByteBufOutputStream outputStream = new ByteBufOutputStream(ByteBufAllocator.DEFAULT.buffer());
try {
object.writeTo(outputStream, MessageBodyWriter.getCharset(mediaType, response.getHeaders()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ final class NettyResponseLifecycle extends ResponseLifecycle {
private final NettyHttpRequest<?> request;

public NettyResponseLifecycle(RoutingInBoundHandler routingInBoundHandler, NettyHttpRequest<?> request) {
super(routingInBoundHandler.routeExecutor, routingInBoundHandler.messageBodyHandlerRegistry, routingInBoundHandler.conversionService, new NettyByteBodyFactory(request.getChannelHandlerContext().channel()));
super(routingInBoundHandler.routeExecutor,
routingInBoundHandler.messageBodyHandlerRegistry,
routingInBoundHandler.conversionService,
new NettyByteBodyFactory(request.getChannelHandlerContext().channel()));
this.routingInBoundHandler = routingInBoundHandler;
this.request = request;
}
Expand Down Expand Up @@ -109,7 +112,7 @@ private static class NettyConcatenatingSubscriber extends ConcatenatingSubscribe
sharedBuffer = new StreamingNettyByteBody.SharedBuffer(eventLoop, BodySizeLimits.UNLIMITED, this);
}

public static CloseableByteBody concatenate(EventLoop eventLoop, Publisher<ByteBody> publisher) {
static CloseableByteBody concatenate(EventLoop eventLoop, Publisher<ByteBody> publisher) {
NettyConcatenatingSubscriber subscriber = new NettyConcatenatingSubscriber(eventLoop);
publisher.subscribe(subscriber);
return new StreamingNettyByteBody(subscriber.sharedBuffer);
Expand Down Expand Up @@ -165,7 +168,7 @@ private static final class JsonNettyConcatenatingSubscriber extends NettyConcate
super(eventLoop);
}

public static CloseableByteBody concatenateJson(EventLoop eventLoop, Publisher<ByteBody> publisher) {
static CloseableByteBody concatenateJson(EventLoop eventLoop, Publisher<ByteBody> publisher) {
JsonNettyConcatenatingSubscriber subscriber = new JsonNettyConcatenatingSubscriber(eventLoop);
publisher.subscribe(subscriber);
return new StreamingNettyByteBody(subscriber.sharedBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ public abstract class ResponseLifecycle {
private final ConversionService conversionService;
private final ByteBodyFactory byteBodyFactory;

public ResponseLifecycle(RouteExecutor routeExecutor, MessageBodyHandlerRegistry messageBodyHandlerRegistry, ConversionService conversionService, ByteBodyFactory byteBodyFactory) {
public ResponseLifecycle(RouteExecutor routeExecutor,
MessageBodyHandlerRegistry messageBodyHandlerRegistry,
ConversionService conversionService,
ByteBodyFactory byteBodyFactory) {
this.routeExecutor = routeExecutor;
this.messageBodyHandlerRegistry = messageBodyHandlerRegistry;
this.conversionService = conversionService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,24 @@ public FileBodyWriter(SystemFileBodyWriter systemFileBodyWriter) {
}

@Override
public ByteBodyHttpResponse<?> write(@NonNull ByteBodyFactory bodyFactory, HttpRequest<?> request, MutableHttpResponse<File> outgoingResponse, Argument<File> type, MediaType mediaType, File object) throws CodecException {
public ByteBodyHttpResponse<?> write(@NonNull ByteBodyFactory bodyFactory,
HttpRequest<?> request,
MutableHttpResponse<File> outgoingResponse,
Argument<File> type,
MediaType mediaType,
File object) throws CodecException {
SystemFile systemFile = new SystemFile(object);
MutableHttpResponse<SystemFile> newResponse = outgoingResponse.body(systemFile);
return systemFileBodyWriter.write(bodyFactory, request, newResponse, systemFile);
}

@Override
public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory, @NonNull HttpRequest<?> request, @NonNull HttpResponse<?> response, @NonNull Argument<File> type, @NonNull MediaType mediaType, File object) {
public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory,
@NonNull HttpRequest<?> request,
@NonNull HttpResponse<?> response,
@NonNull Argument<File> type,
@NonNull MediaType mediaType,
File object) {
return systemFileBodyWriter.writePiece(bodyFactory, new SystemFile(object));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,23 @@ public final class InputStreamBodyWriter extends AbstractFileBodyWriter implemen
}

@Override
public ByteBodyHttpResponse<?> write(@NonNull ByteBodyFactory bodyFactory, HttpRequest<?> request, MutableHttpResponse<InputStream> outgoingResponse, Argument<InputStream> type, MediaType mediaType, InputStream object) throws CodecException {
public ByteBodyHttpResponse<?> write(@NonNull ByteBodyFactory bodyFactory,
HttpRequest<?> request,
MutableHttpResponse<InputStream> outgoingResponse,
Argument<InputStream> type,
MediaType mediaType,
InputStream object) throws CodecException {
outgoingResponse.getHeaders().contentTypeIfMissing(mediaType);
return ByteBodyHttpResponseWrapper.wrap(outgoingResponse, InputStreamByteBody.create(object, OptionalLong.empty(), executorService, bodyFactory));
}

@Override
public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory, @NonNull HttpRequest<?> request, @NonNull HttpResponse<?> response, @NonNull Argument<InputStream> type, @NonNull MediaType mediaType, InputStream object) {
public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory,
@NonNull HttpRequest<?> request,
@NonNull HttpResponse<?> response,
@NonNull Argument<InputStream> type,
@NonNull MediaType mediaType,
InputStream object) {
return InputStreamByteBody.create(object, OptionalLong.empty(), executorService, bodyFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ public final class StreamFileBodyWriter extends AbstractFileBodyWriter implement
}

@Override
public ByteBodyHttpResponse<?> write(@NonNull ByteBodyFactory bodyFactory, HttpRequest<?> request, MutableHttpResponse<StreamedFile> outgoingResponse, Argument<StreamedFile> type, MediaType mediaType, StreamedFile object) throws CodecException {
public ByteBodyHttpResponse<?> write(@NonNull ByteBodyFactory bodyFactory,
HttpRequest<?> request,
MutableHttpResponse<StreamedFile> outgoingResponse,
Argument<StreamedFile> type,
MediaType mediaType,
StreamedFile object) throws CodecException {
if (handleIfModifiedAndHeaders(request, outgoingResponse, object, outgoingResponse)) {
return notModified(bodyFactory, outgoingResponse);
} else {
Expand All @@ -69,7 +74,12 @@ public ByteBodyHttpResponse<?> write(@NonNull ByteBodyFactory bodyFactory, HttpR
}

@Override
public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory, @NonNull HttpRequest<?> request, @NonNull HttpResponse<?> response, @NonNull Argument<StreamedFile> type, @NonNull MediaType mediaType, StreamedFile object) {
public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory,
@NonNull HttpRequest<?> request,
@NonNull HttpResponse<?> response,
@NonNull Argument<StreamedFile> type,
@NonNull MediaType mediaType,
StreamedFile object) {
long length = object.getLength();
InputStream inputStream = object.getInputStream();
return InputStreamByteBody.create(inputStream, length > -1 ? OptionalLong.of(length) : OptionalLong.empty(), ioExecutor, bodyFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ public void writeTo(Argument<SystemFile> type, MediaType mediaType, SystemFile f
}

@Override
public ByteBodyHttpResponse<?> write(@NonNull ByteBodyFactory bodyFactory, HttpRequest<?> request, @NonNull MutableHttpResponse<SystemFile> httpResponse, @NonNull Argument<SystemFile> type, @NonNull MediaType mediaType, SystemFile object) throws CodecException {
public ByteBodyHttpResponse<?> write(@NonNull ByteBodyFactory bodyFactory,
HttpRequest<?> request,
@NonNull MutableHttpResponse<SystemFile> httpResponse,
@NonNull Argument<SystemFile> type,
@NonNull MediaType mediaType,
SystemFile object) throws CodecException {
return write(bodyFactory, request, httpResponse, object);
}

Expand Down Expand Up @@ -133,7 +138,12 @@ public ByteBodyHttpResponse<?> write(@NonNull ByteBodyFactory bodyFactory, HttpR
}

@Override
public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory, @NonNull HttpRequest<?> request, @NonNull HttpResponse<?> response, @NonNull Argument<SystemFile> type, @NonNull MediaType mediaType, SystemFile object) {
public CloseableByteBody writePiece(@NonNull ByteBodyFactory bodyFactory,
@NonNull HttpRequest<?> request,
@NonNull HttpResponse<?> response,
@NonNull Argument<SystemFile> type,
@NonNull MediaType mediaType,
SystemFile object) {
return writePiece(bodyFactory, object);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,27 @@ private ByteBufferBodyAdapter(Publisher<ByteBuffer> source, @Nullable Runnable o
super(source, onDiscard);
}

/**
* Create a new body that contains the bytes of the given publisher.
*
* @param source The byte publisher
* @return A body with those bytes
*/
@NonNull
static ReactiveByteBufferByteBody adapt(Publisher<ByteBuffer> source) {
static ReactiveByteBufferByteBody adapt(@NonNull Publisher<ByteBuffer> source) {
return adapt(source, null, null);
}

/**
* Create a new body that contains the bytes of the given publisher.
*
* @param publisher The byte publisher
* @param headersForLength Optional headers for reading the {@code content-length} header
* @param onDiscard Optional runnable to call if the body is discarded ({@link #allowDiscard()})
* @return A body with those bytes
*/
@NonNull
static ReactiveByteBufferByteBody adapt(Publisher<ByteBuffer> publisher, @Nullable HttpHeaders headersForLength, @Nullable Runnable onDiscard) {
static ReactiveByteBufferByteBody adapt(@NonNull Publisher<ByteBuffer> publisher, @Nullable HttpHeaders headersForLength, @Nullable Runnable onDiscard) {
ByteBufferBodyAdapter adapter = new ByteBufferBodyAdapter(publisher, onDiscard);
adapter.sharedBuffer = new ReactiveByteBufferByteBody.SharedBuffer(BodySizeLimits.UNLIMITED, adapter);
if (headersForLength != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ protected final void onForward(long n) {

@Override
public final void onNext(ByteBody body) {
long emitted = emitLeadingSeparator(first);
onForward(emitLeadingSeparator(first));
first = false;

BufferConsumer.Upstream component = forward(body);
Expand Down
Loading