Skip to content

Commit 9719bf2

Browse files
committed
Fixed handling of negative request. Added BodyPublishers::concat to the TCK tests.
1 parent c8c0768 commit 9719bf2

File tree

6 files changed

+155
-12
lines changed

6 files changed

+155
-12
lines changed

src/java.net.http/share/classes/jdk/internal/net/http/PullPublisher.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ protected void run() {
102102
return;
103103
}
104104

105-
while (demand.tryDecrement() && !cancelled) {
105+
while (demand.tryDecrement() && !cancelled && error == null) {
106106
T next;
107107
try {
108108
if (!iter.hasNext()) {
@@ -121,6 +121,11 @@ protected void run() {
121121
completed = true;
122122
pullScheduler.stop();
123123
subscriber.onComplete();
124+
} else if ((t = error) != null && !completed) {
125+
completed = true;
126+
pullScheduler.stop();
127+
subscriber.onError(t);
128+
return;
124129
}
125130
}
126131
}

src/java.net.http/share/classes/jdk/internal/net/http/RequestPublishers.java

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,7 @@ private static final class AggregateSubscription
560560
final Demand demand = new Demand(); // from upstream
561561
final Demand demanded = new Demand(); // requested downstream
562562
final AtomicReference<Throwable> error = new AtomicReference<>();
563+
volatile Throwable illegalRequest;
563564
volatile BodyPublisher publisher; // downstream
564565
volatile Flow.Subscription subscription; // downstream
565566
volatile boolean cancelled;
@@ -574,7 +575,11 @@ public void request(long n) {
574575
if (cancelled || publisher == null && bodies.isEmpty()) {
575576
return;
576577
}
577-
demand.increase(n);
578+
try {
579+
demand.increase(n);
580+
} catch (IllegalArgumentException x) {
581+
illegalRequest = x;
582+
}
578583
scheduler.runOrSchedule();
579584
}
580585

@@ -584,6 +589,17 @@ public void cancel() {
584589
scheduler.runOrSchedule();
585590
}
586591

592+
private boolean cancelSubscription() {
593+
Flow.Subscription subscription = this.subscription;
594+
if (subscription != null) {
595+
this.subscription = null;
596+
this.publisher = null;
597+
subscription.cancel();
598+
}
599+
scheduler.stop();
600+
return subscription != null;
601+
}
602+
587603
public void run() {
588604
try {
589605
while (error.get() == null
@@ -592,13 +608,10 @@ public void run() {
592608
boolean cancelled = this.cancelled;
593609
BodyPublisher publisher = this.publisher;
594610
Flow.Subscription subscription = this.subscription;
611+
Throwable illegalRequest = this.illegalRequest;
595612
if (cancelled) {
596-
this.publisher = null;
597-
bodies.removeIf((b) -> true);
598-
if (subscription != null) {
599-
subscription.cancel();
600-
}
601-
scheduler.stop();
613+
bodies.clear();
614+
cancelSubscription();
602615
return;
603616
}
604617
if (publisher == null && !bodies.isEmpty()) {
@@ -608,6 +621,10 @@ public void run() {
608621
} else if (publisher == null) {
609622
return;
610623
}
624+
if (illegalRequest != null) {
625+
onError(illegalRequest);
626+
return;
627+
}
611628
if (subscription == null) return;
612629
if (!demand.isFulfilled()) {
613630
long n = demand.decreaseAndGet(demand.get());
@@ -629,14 +646,24 @@ public void onSubscribe(Flow.Subscription subscription) {
629646

630647
@Override
631648
public void onNext(ByteBuffer item) {
649+
// make sure to cancel the subscription if we receive
650+
// an item after the subscription was cancelled or
651+
// an error was reported.
652+
if (cancelled || error.get() != null) {
653+
cancelSubscription();
654+
return;
655+
}
632656
demanded.tryDecrement();
633657
subscriber.onNext(item);
634658
}
635659

636660
@Override
637661
public void onError(Throwable throwable) {
638662
if (error.compareAndSet(null, throwable)) {
663+
publisher = null;
664+
subscription = null;
639665
subscriber.onError(throwable);
666+
scheduler.stop();
640667
}
641668
}
642669

@@ -652,7 +679,9 @@ public void onComplete() {
652679
} else {
653680
publisher = null;
654681
subscription = null;
655-
subscriber.onComplete();
682+
if (!cancelled) {
683+
subscriber.onComplete();
684+
}
656685
scheduler.stop();
657686
}
658687
}

src/java.net.http/share/classes/jdk/internal/net/http/common/Demand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public final class Demand {
4747
*/
4848
public boolean increase(long n) {
4949
if (n <= 0) {
50-
throw new IllegalArgumentException(String.valueOf(n));
50+
throw new IllegalArgumentException("non-positive subscription request: " + String.valueOf(n));
5151
}
5252
long prev = val.getAndAccumulate(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i);
5353
return prev == 0;

test/jdk/java/net/httpclient/AggregateRequestBodyTest.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public class AggregateRequestBodyTest implements HttpServerAdapters {
103103
static final int RESPONSE_CODE = 200;
104104
static final int ITERATION_COUNT = 4;
105105
static final Class<IllegalArgumentException> IAE = IllegalArgumentException.class;
106+
static final Class<CompletionException> CE = CompletionException.class;
106107
// a shared executor helps reduce the amount of threads created by the test
107108
static final Executor executor = new TestExecutor(Executors.newCachedThreadPool());
108109
static final ConcurrentMap<String, Throwable> FAILURES = new ConcurrentHashMap<>();
@@ -543,6 +544,13 @@ public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
543544
description.replace("null", "length(-1)"));
544545
}
545546

547+
private static final Throwable completionCause(CompletionException x) {
548+
while (x.getCause() instanceof CompletionException) {
549+
x = (CompletionException)x.getCause();
550+
}
551+
return x.getCause();
552+
}
553+
546554
@Test(dataProvider = "negativeRequests")
547555
public void testNegativeRequest(long n) {
548556
assert n <= 0 : "test for negative request called with n > 0 : " + n;
@@ -551,8 +559,15 @@ public void testNegativeRequest(long n) {
551559
RequestSubscriber subscriber = new RequestSubscriber();
552560
publisher.subscribe(subscriber);
553561
Subscription subscription = subscriber.subscriptionCF.join();
554-
IllegalArgumentException iae = expectThrows(IAE, () -> subscription.request(n));
555-
System.out.printf("Got expected IAE for %d: %s%n", n, iae);
562+
subscription.request(n);
563+
CompletionException expected = expectThrows(CE, () -> subscriber.resultCF.join());
564+
Throwable cause = completionCause(expected);
565+
if (cause instanceof IllegalArgumentException) {
566+
System.out.printf("Got expected IAE for %d: %s%n", n, cause);
567+
} else {
568+
throw new AssertionError("Unexpected exception: " + cause,
569+
(cause == null) ? expected : cause);
570+
}
556571
}
557572

558573
@Test(dataProvider = "contentLengths")
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright (c) 2020, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* This code is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU General Public License version 2 only, as
7+
* published by the Free Software Foundation.
8+
*
9+
* This code is distributed in the hope that it will be useful, but WITHOUT
10+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12+
* version 2 for more details (a copy is included in the LICENSE file that
13+
* accompanied this code).
14+
*
15+
* You should have received a copy of the GNU General Public License version
16+
* 2 along with this work; if not, write to the Free Software Foundation,
17+
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18+
*
19+
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20+
* or visit www.oracle.com if you need additional information or have any
21+
* questions.
22+
*/
23+
24+
import org.reactivestreams.tck.TestEnvironment;
25+
import org.reactivestreams.tck.flow.FlowPublisherVerification;
26+
27+
import java.net.http.HttpRequest.BodyPublisher;
28+
import java.net.http.HttpRequest.BodyPublishers;
29+
import java.nio.ByteBuffer;
30+
import java.util.Collections;
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
import java.util.concurrent.Flow.Publisher;
34+
35+
/* See TckDriver.java for more information */
36+
public class BodyPublishersConcat
37+
extends FlowPublisherVerification<ByteBuffer> {
38+
39+
private static final int ELEMENT_SIZE = 16 * 1024;
40+
41+
public BodyPublishersConcat() {
42+
super(new TestEnvironment(450L));
43+
}
44+
45+
private static BodyPublisher ofByteArrays(int n, byte[] bytes) {
46+
return BodyPublishers.ofByteArrays(Collections.nCopies((int) n, bytes));
47+
}
48+
49+
@Override
50+
public Publisher<ByteBuffer> createFlowPublisher(long nElements) {
51+
System.out.println("BodyPublishersConcat: %d elements requested"
52+
.formatted(nElements));
53+
byte[] bytes = S.arrayOfNRandomBytes(ELEMENT_SIZE);
54+
if (nElements == 0) {
55+
System.out.println("BodyPublishersConcat: empty publisher");
56+
return BodyPublishers.concat();
57+
} else if (nElements == 1) {
58+
System.out.println("BodyPublishersConcat: singleton publisher");
59+
return BodyPublishers.concat(ofByteArrays(1, bytes));
60+
} else if (nElements < 4) {
61+
int left = (int)nElements/2;
62+
int right = (int)nElements - left;
63+
System.out.println("BodyPublishersConcat: dual publisher (%d, %d)".formatted(left, right));
64+
return BodyPublishers.concat(ofByteArrays(left, bytes),
65+
ofByteArrays(right, bytes));
66+
} else {
67+
List<BodyPublisher> publishers = new ArrayList<>();
68+
List<Integer> sizes = new ArrayList<>();
69+
long remaining = nElements;
70+
int max = (int) Math.min((long)Integer.MAX_VALUE, nElements/2L);
71+
while (remaining > 0) {
72+
int length = S.randomIntUpTo(max);
73+
if (length == 0) length = 1;
74+
sizes.add(length);
75+
if (remaining > length) {
76+
publishers.add(ofByteArrays(length, bytes));
77+
remaining = remaining - length;
78+
} else {
79+
publishers.add(ofByteArrays((int)remaining, bytes));
80+
remaining = 0;
81+
}
82+
}
83+
System.out.println("BodyPublishersConcat: multi publisher " + sizes);
84+
return BodyPublishers.concat(publishers.toArray(BodyPublisher[]::new));
85+
}
86+
}
87+
88+
@Override
89+
public Publisher<ByteBuffer> createFailedFlowPublisher() {
90+
return null;
91+
}
92+
}

test/jdk/java/net/httpclient/reactivestreams-tck-tests/TckDriver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
* @compile -encoding UTF-8 BodyPublishersOfFile.java
3939
* @compile -encoding UTF-8 BodyPublishersOfInputStream.java
4040
* @compile -encoding UTF-8 BodyPublishersOfSubByteArray.java
41+
* @compile -encoding UTF-8 BodyPublishersConcat.java
4142
*
4243
* @compile -encoding UTF-8 BodySubscribersBuffering.java
4344
* @compile -encoding UTF-8 BodySubscribersDiscarding.java
@@ -65,6 +66,7 @@
6566
* @run testng/othervm BodyPublishersOfFile
6667
* @run testng/othervm BodyPublishersOfInputStream
6768
* @run testng/othervm BodyPublishersOfSubByteArray
69+
* @run testng/othervm BodyPublishersConcat
6870
*
6971
* @run testng/othervm BodySubscribersBuffering
7072
* @run testng/othervm BodySubscribersDiscarding

0 commit comments

Comments
 (0)