Skip to content

Commit

Permalink
Add test for #1286 (#3736)
Browse files Browse the repository at this point in the history
* Skip writing if chunked handler is present. Fixes #1286
  • Loading branch information
jameskleeh authored Jul 17, 2020
1 parent b8342e3 commit d07feeb
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultThreadFactory;
Expand Down Expand Up @@ -2992,7 +2993,8 @@ protected void writeAndClose(Channel channel, ChannelPool channelPool, FlowableE
private void processRequestWrite(Channel channel, ChannelPool channelPool, FlowableEmitter<?> emitter, ChannelPipeline pipeline) {
ChannelFuture channelFuture;
if (encoder != null && encoder.isChunked()) {
pipeline.replace(ChannelPipelineCustomizer.HANDLER_HTTP_STREAM, ChannelPipelineCustomizer.HANDLER_HTTP_CHUNK, new ChunkedWriteHandler());
channel.attr(AttributeKey.valueOf(ChannelPipelineCustomizer.HANDLER_HTTP_CHUNK)).set(true);
pipeline.addAfter(ChannelPipelineCustomizer.HANDLER_HTTP_STREAM, ChannelPipelineCustomizer.HANDLER_HTTP_CHUNK, new ChunkedWriteHandler());
channel.write(nettyRequest);
channelFuture = channel.writeAndFlush(encoder);
} else {
Expand Down Expand Up @@ -3025,6 +3027,7 @@ private void closeChannelIfNecessary(
if (encoder != null) {
encoder.cleanFiles();
}
channel.attr(AttributeKey.valueOf(ChannelPipelineCustomizer.HANDLER_HTTP_CHUNK)).set(null);
if (closeChannelAfterWrite) {
closeChannelAsync(channel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,16 @@
package io.micronaut.http.client

import io.micronaut.core.io.buffer.ByteBufferFactory
import io.micronaut.core.io.buffer.ReferenceCounted
import io.micronaut.http.HttpResponse
import io.micronaut.http.HttpStatus
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Post
import io.micronaut.http.client.annotation.Client
import io.micronaut.http.client.multipart.MultipartBody
import io.micronaut.http.codec.CodecException
import io.micronaut.http.multipart.PartData
import io.micronaut.http.multipart.StreamingFileUpload
import io.micronaut.test.annotation.MicronautTest
import io.reactivex.Flowable
import io.reactivex.Single
Expand All @@ -35,12 +41,15 @@ import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import spock.lang.AutoCleanup
import spock.lang.Issue
import spock.lang.Shared
import spock.lang.Specification
import spock.util.concurrent.PollingConditions

import javax.inject.Inject
import javax.print.attribute.standard.Media
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

/**
* @author graemerocher
Expand Down Expand Up @@ -170,6 +179,36 @@ class DataStreamSpec extends Specification {
data == [188309,188310] as byte[]
}

@Issue("https://github.com/micronaut-projects/micronaut-core/issues/1286")
void "test returning a stream and sending a multipart request"() {
def requestBody = MultipartBody.builder()
.addPart(
"data",
"randomFileName.dat",
MediaType.APPLICATION_OCTET_STREAM_TYPE,
new byte[4096]
)

def request = HttpRequest.POST("/datastream/upload", requestBody)
.contentType(MediaType.MULTIPART_FORM_DATA_TYPE)
.accept(MediaType.TEXT_PLAIN_TYPE)

HttpResponse response
String body
client.exchangeStream(request)
.doOnNext(resp -> {
response = resp
body = new String(resp.body().toByteArray())
})
.subscribe()

expect:
new PollingConditions(timeout: 3).eventually {
assert response.status() == HttpStatus.OK
assert body == "Read 4096 bytes"
}
}

static class Book {
String title
}
Expand All @@ -192,5 +231,38 @@ class DataStreamSpec extends Specification {
byte[] data() {
[188309,188310] as byte[]
}

@Post(uri = "/upload", consumes = MediaType.MULTIPART_FORM_DATA, produces = MediaType.TEXT_PLAIN)
Single<HttpResponse<String>> test(StreamingFileUpload data) {
AtomicInteger bytes = new AtomicInteger()

Single.<HttpResponse<String>>create { emitter ->
data.subscribe(new Subscriber<PartData>() {
private Subscription s

@Override
void onSubscribe(Subscription s) {
this.s = s
s.request(1)
}

@Override
void onNext(PartData partData) {
bytes.addAndGet(partData.bytes.length)
s.request(1)
}

@Override
void onError(Throwable t) {
emitter.onError(t)
}

@Override
void onComplete() {
emitter.onSuccess(HttpResponse.ok("Read ${bytes.get()} bytes".toString()))
}
})
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.micronaut.core.annotation.Internal;
import io.micronaut.http.HttpStatus;
import io.micronaut.http.netty.channel.ChannelPipelineCustomizer;
import io.micronaut.http.netty.reactive.CancelledSubscriber;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
Expand All @@ -27,6 +28,7 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -187,4 +189,13 @@ public void cancel() {
super.channelRead(ctx, msg);
}
}

@Override
public void write(final ChannelHandlerContext ctx, Object msg, final ChannelPromise promise) throws Exception {
if (ctx.channel().attr(AttributeKey.valueOf(ChannelPipelineCustomizer.HANDLER_HTTP_CHUNK)).get() == Boolean.TRUE) {
ctx.write(msg, promise);
} else {
super.write(ctx, msg, promise);
}
}
}

0 comments on commit d07feeb

Please sign in to comment.