Skip to content

Commit

Permalink
Support MessageBodyWriter/Reader in the HTTP client (#9201)
Browse files Browse the repository at this point in the history
This implements the reader/writer API for the client. There's also some server changes that I missed.

Some changes from MediaTypeCodec to MessageBodyHandler are still missing, such as for form upload conversions.
  • Loading branch information
yawkat authored May 11, 2023
1 parent 3286b25 commit 37874c6
Show file tree
Hide file tree
Showing 36 changed files with 1,222 additions and 834 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.annotation.Nullable;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
import io.micronaut.core.util.StringUtils;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.FilterMatcher;
import io.micronaut.http.bind.DefaultRequestBinderRegistry;
import io.micronaut.http.bind.RequestBinderRegistry;
import io.micronaut.http.body.MessageBodyHandlerRegistry;
import io.micronaut.http.body.MessageBodyReader;
import io.micronaut.http.body.MessageBodyWriter;
import io.micronaut.http.client.HttpClient;
import io.micronaut.http.client.HttpClientConfiguration;
import io.micronaut.http.client.HttpClientRegistry;
Expand All @@ -50,6 +54,7 @@
import io.micronaut.http.codec.MediaTypeCodec;
import io.micronaut.http.codec.MediaTypeCodecRegistry;
import io.micronaut.http.filter.HttpClientFilterResolver;
import io.micronaut.http.netty.body.CustomizableNettyJsonHandler;
import io.micronaut.http.netty.channel.ChannelPipelineCustomizer;
import io.micronaut.http.netty.channel.ChannelPipelineListener;
import io.micronaut.http.netty.channel.DefaultEventLoopGroupConfiguration;
Expand Down Expand Up @@ -81,6 +86,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadFactory;
Expand Down Expand Up @@ -109,6 +115,7 @@ class DefaultNettyHttpClientRegistry implements AutoCloseable,
private final NettyClientSslBuilder nettyClientSslBuilder;
private final ThreadFactory threadFactory;
private final MediaTypeCodecRegistry codecRegistry;
private final MessageBodyHandlerRegistry handlerRegistry;
private final BeanContext beanContext;
private final HttpClientConfiguration defaultHttpClientConfiguration;
private final EventLoopGroupRegistry eventLoopGroupRegistry;
Expand All @@ -128,6 +135,7 @@ class DefaultNettyHttpClientRegistry implements AutoCloseable,
* @param nettyClientSslBuilder The client SSL builder
* @param threadFactory The thread factory
* @param codecRegistry The codec registry
* @param handlerRegistry The handler registry
* @param eventLoopGroupRegistry The event loop group registry
* @param eventLoopGroupFactory The event loop group factory
* @param beanContext The bean context
Expand All @@ -141,6 +149,7 @@ public DefaultNettyHttpClientRegistry(
NettyClientSslBuilder nettyClientSslBuilder,
ThreadFactory threadFactory,
MediaTypeCodecRegistry codecRegistry,
MessageBodyHandlerRegistry handlerRegistry,
EventLoopGroupRegistry eventLoopGroupRegistry,
EventLoopGroupFactory eventLoopGroupFactory,
BeanContext beanContext,
Expand All @@ -152,6 +161,7 @@ public DefaultNettyHttpClientRegistry(
this.nettyClientSslBuilder = nettyClientSslBuilder;
this.threadFactory = threadFactory;
this.codecRegistry = codecRegistry;
this.handlerRegistry = handlerRegistry;
this.beanContext = beanContext;
this.eventLoopGroupFactory = eventLoopGroupFactory;
this.eventLoopGroupRegistry = eventLoopGroupRegistry;
Expand Down Expand Up @@ -388,6 +398,28 @@ private DefaultHttpClient getClient(ClientKey key, BeanContext beanContext, Anno
codecs.add(createNewJsonCodec(this.beanContext, jsonFeatures));
}
client.setMediaTypeCodecRegistry(MediaTypeCodecRegistry.of(codecs));

client.setHandlerRegistry(new MessageBodyHandlerRegistry() {
final MessageBodyHandlerRegistry delegate = client.getHandlerRegistry();

@SuppressWarnings("unchecked")
private <T> T customize(T handler) {
if (handler instanceof CustomizableNettyJsonHandler cnjh) {
return (T) cnjh.customize(jsonFeatures);
}
return handler;
}

@Override
public <T> Optional<MessageBodyReader<T>> findReader(Argument<T> type, List<MediaType> mediaType) {
return delegate.findReader(type, mediaType).map(this::customize);
}

@Override
public <T> Optional<MessageBodyWriter<T>> findWriter(Argument<T> type, List<MediaType> mediaType) {
return delegate.findWriter(type, mediaType).map(this::customize);
}
});
}
return client;
});
Expand Down Expand Up @@ -417,6 +449,7 @@ private DefaultHttpClient buildClient(
threadFactory,
nettyClientSslBuilder,
codecRegistry,
handlerRegistry,
WebSocketBeanRegistry.forClient(beanContext),
beanContext.findBean(RequestBinderRegistry.class).orElseGet(() ->
new DefaultRequestBinderRegistry(conversionService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,36 @@
*/
package io.micronaut.http.client.netty;

import io.micronaut.buffer.netty.NettyByteBufferFactory;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NonNull;
import io.micronaut.core.async.subscriber.Completable;
import io.micronaut.core.convert.ConversionContext;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.convert.value.MutableConvertibleValues;
import io.micronaut.core.convert.value.MutableConvertibleValuesMap;
import io.micronaut.core.io.buffer.ByteBuffer;
import io.micronaut.core.io.buffer.ByteBufferFactory;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpHeaders;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.MediaType;
import io.micronaut.http.codec.MediaTypeCodec;
import io.micronaut.http.codec.MediaTypeCodecRegistry;
import io.micronaut.http.body.MessageBodyHandlerRegistry;
import io.micronaut.http.body.MessageBodyReader;
import io.micronaut.http.cookie.Cookie;
import io.micronaut.http.cookie.Cookies;
import io.micronaut.http.netty.NettyHttpHeaders;
import io.micronaut.http.netty.NettyHttpResponseBuilder;
import io.micronaut.http.netty.cookies.NettyCookies;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

Expand All @@ -64,36 +63,36 @@ public class FullNettyClientHttpResponse<B> implements HttpResponse<B>, Completa
private final NettyHttpHeaders headers;
private final NettyCookies nettyCookies;
private final MutableConvertibleValues<Object> attributes;
private final FullHttpResponse nettyHttpResponse;
private final io.netty.handler.codec.http.HttpResponse nettyHttpResponse;
private final ByteBuf unpooledContent;
private final Map<Argument, Optional> convertedBodies = new HashMap<>();
private final MediaTypeCodecRegistry mediaTypeCodecRegistry;
private final ByteBufferFactory<ByteBufAllocator, ByteBuf> byteBufferFactory;
private final MessageBodyHandlerRegistry handlerRegistry;
private final B body;
private boolean complete;
private byte[] bodyBytes;
private final ConversionService conversionService;

/**
* @param fullHttpResponse The full Http response
* @param mediaTypeCodecRegistry The media type codec registry
* @param byteBufferFactory The byte buffer factory
* @param bodyType The body type
* @param convertBody Whether to auto convert the body to bodyType
* @param conversionService The conversion service
*/
FullNettyClientHttpResponse(
FullHttpResponse fullHttpResponse,
MediaTypeCodecRegistry mediaTypeCodecRegistry,
ByteBufferFactory<ByteBufAllocator, ByteBuf> byteBufferFactory,
MessageBodyHandlerRegistry handlerRegistry,
Argument<B> bodyType,
boolean convertBody,
ConversionService conversionService) {
this.conversionService = conversionService;
this.headers = new NettyHttpHeaders(fullHttpResponse.headers(), conversionService);
this.attributes = new MutableConvertibleValuesMap<>();
this.nettyHttpResponse = fullHttpResponse;
this.mediaTypeCodecRegistry = mediaTypeCodecRegistry;
this.byteBufferFactory = byteBufferFactory;
// this class doesn't really have lifecycle management (we don't make the user release()
// it), so we have to copy the data to a non-refcounted buffer.
this.unpooledContent = Unpooled.buffer(fullHttpResponse.content().readableBytes());
unpooledContent.writeBytes(fullHttpResponse.content());
this.handlerRegistry = handlerRegistry;
this.nettyCookies = new NettyCookies(fullHttpResponse.headers(), conversionService);
Class<?> rawBodyType = bodyType != null ? bodyType.getType() : null;
if (rawBodyType != null && !HttpStatus.class.isAssignableFrom(rawBodyType)) {
Expand Down Expand Up @@ -156,65 +155,23 @@ public <T> Optional<T> getBody(Class<T> type) {
return getBody(Argument.of(type));
}

/**
* @return The Netty native response object
*/
public FullHttpResponse getNativeResponse() {
return nettyHttpResponse;
}

@SuppressWarnings("unchecked")
@Override
public <T> Optional<T> getBody(Argument<T> type) {
if (type == null) {
return Optional.empty();
}

Class<T> javaType = type.getType();
if (javaType == void.class) {
if (type.getType() == void.class) {
return Optional.empty();
}

if (javaType == ByteBuffer.class) {
return Optional.of((T) byteBufferFactory.wrap(nettyHttpResponse.content()));
}

if (javaType == ByteBuf.class) {
return Optional.of((T) (nettyHttpResponse.content()));
}

if (javaType == byte[].class && bodyBytes != null) {
return Optional.of((T) (bodyBytes));
}

Optional<T> result = convertedBodies.computeIfAbsent(type, argument -> {
final boolean isOptional = argument.getType() == Optional.class;
final Argument finalArgument = isOptional ? argument.getFirstTypeVariable().orElse(argument) : argument;
Optional<T> converted;
try {
if (bodyBytes != null) {
return convertBytes(bodyBytes, finalArgument);
}
Optional<B> existing = getBody();
if (existing.isPresent()) {
converted = getBody().flatMap(b -> {

if (b instanceof ByteBuffer) {
ByteBuf bytebuf = (ByteBuf) ((ByteBuffer) b).asNativeBuffer();
return convertByteBuf(bytebuf, finalArgument);
} else {
final Optional opt = conversionService.convert(b, ConversionContext.of(finalArgument));
if (!opt.isPresent()) {
ByteBuf content = nettyHttpResponse.content();
return convertByteBuf(content, finalArgument);
}
return opt;
}
});
} else {
ByteBuf content = nettyHttpResponse.content();
converted = convertByteBuf(content, finalArgument);
}
converted = convertByteBuf(unpooledContent, finalArgument);
} catch (RuntimeException e) {
if (code() < 400) {
throw e;
Expand All @@ -234,7 +191,7 @@ public <T> Optional<T> getBody(Argument<T> type) {

);
if (LOG.isTraceEnabled() && !result.isPresent()) {
LOG.trace("Unable to convert response body to target type {}", javaType);
LOG.trace("Unable to convert response body to target type {}", type.getType());
}
return result;
}
Expand All @@ -244,10 +201,6 @@ private boolean isParseableBodyType(Class<?> rawBodyType) {
}

private <T> Optional convertByteBuf(ByteBuf content, Argument<T> type) {
if (complete) {
return Optional.empty();
}

if (content.refCnt() == 0 || content.readableBytes() == 0) {
if (LOG.isTraceEnabled()) {
LOG.trace("Full HTTP response received an empty body");
Expand All @@ -264,19 +217,12 @@ private <T> Optional convertByteBuf(ByteBuf content, Argument<T> type) {
}
Optional<MediaType> contentType = getContentType();
if (contentType.isPresent()) {
if (mediaTypeCodecRegistry != null) {
bodyBytes = ByteBufUtil.getBytes(content);
if (CharSequence.class.isAssignableFrom(type.getType())) {
Charset charset = contentType.flatMap(MediaType::getCharset).orElse(StandardCharsets.UTF_8);
return Optional.of(new String(bodyBytes, charset));
} else if (type.getType() == byte[].class) {
return Optional.of(bodyBytes);
} else {
Optional<MediaTypeCodec> foundCodec = mediaTypeCodecRegistry.findCodec(contentType.get());
if (foundCodec.isPresent()) {
MediaTypeCodec codec = foundCodec.get();
return Optional.of(codec.decode(type, bodyBytes));
}
Optional<MessageBodyReader<T>> reader = handlerRegistry.findReader(type, List.of(contentType.get()));
if (reader.isPresent()) {
MessageBodyReader<T> r = reader.get();
MediaType ct = contentType.get();
if (r.isReadable(type, ct)) {
return Optional.of(r.read(type, ct, headers, NettyByteBufferFactory.DEFAULT.wrap(content.retainedSlice())));
}
}
} else if (LOG.isTraceEnabled()) {
Expand All @@ -286,27 +232,6 @@ private <T> Optional convertByteBuf(ByteBuf content, Argument<T> type) {
return conversionService.convert(content, ConversionContext.of(type));
}

private <T> Optional convertBytes(byte[] bytes, Argument<T> type) {
Optional<MediaType> contentType = getContentType();
boolean hasContentType = contentType.isPresent();
if (mediaTypeCodecRegistry != null && hasContentType) {
if (CharSequence.class.isAssignableFrom(type.getType())) {
Charset charset = contentType.flatMap(MediaType::getCharset).orElse(StandardCharsets.UTF_8);
return Optional.of(new String(bytes, charset));
} else if (type.getType() == byte[].class) {
return Optional.of(bytes);
} else {
Optional<MediaTypeCodec> foundCodec = mediaTypeCodecRegistry.findCodec(contentType.get());
if (foundCodec.isPresent()) {
MediaTypeCodec codec = foundCodec.get();
return Optional.of(codec.decode(type, bytes));
}
}
}
// last chance, try type conversion
return conversionService.convert(bytes, ConversionContext.of(type));
}

@Override
public void onComplete() {
this.complete = true;
Expand All @@ -315,7 +240,15 @@ public void onComplete() {
@NonNull
@Override
public FullHttpResponse toFullHttpResponse() {
return this.nettyHttpResponse;
DefaultFullHttpResponse copy = new DefaultFullHttpResponse(
nettyHttpResponse.protocolVersion(),
nettyHttpResponse.status(),
unpooledContent,
nettyHttpResponse.headers(),
DefaultLastHttpContent.EMPTY_LAST_CONTENT.trailingHeaders()
);
copy.setDecoderResult(nettyHttpResponse.decoderResult());
return copy;
}

@NonNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class HttpGetSpec extends Specification {
String body = res.getBody(String).orElse(null)

then:
body == null
body == "success"
}

void "test that Optional.empty() should return 404"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.micronaut.http.client

import io.micronaut.core.async.annotation.SingleResult
import io.micronaut.context.ApplicationContext
import io.micronaut.context.annotation.Requires
import io.micronaut.http.HttpRequest
Expand All @@ -38,6 +37,7 @@ import spock.lang.Issue
import spock.lang.Shared
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.Semaphore
Expand Down Expand Up @@ -209,7 +209,6 @@ class JsonStreamSpec extends Specification {
Publisher<Book> list();

@Post(uri = "/count", processes = MediaType.APPLICATION_JSON_STREAM)
@SingleResult
Publisher<LibraryStats> count(@Body Flux<Book> theBooks)
}

Expand Down
Loading

0 comments on commit 37874c6

Please sign in to comment.