Skip to content

Commit

Permalink
InputStreamSubscriber instantiation and Javadoc
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev committed Oct 28, 2024
1 parent dfaf7a0 commit f1cfe7a
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -457,23 +458,19 @@ public static Publisher<DataBuffer> outputStreamPublisher(
}

/**
* Subscribes to given {@link Publisher} and returns subscription
* as {@link InputStream} that allows reading all propagated {@link DataBuffer} messages via its imperative API.
* Given the {@link InputStream} implementation buffers messages as per configuration.
* The returned {@link InputStream} is considered terminated when the given {@link Publisher} signaled one of the
* terminal signal ({@link Subscriber#onComplete() or {@link Subscriber#onError(Throwable)}})
* and all the stored {@link DataBuffer} polled from the internal buffer.
* The returned {@link InputStream} will call {@link Subscription#cancel()} and release all stored {@link DataBuffer}
* when {@link InputStream#close()} is called.
* <p>
* Note: The implementation of the returned {@link InputStream} disallow concurrent call on
* any of the {@link InputStream#read} methods
* <p>
* Note: {@link Subscription#request(long)} happens eagerly for the first time upon subscription
* and then repeats every time {@code bufferSize - (bufferSize >> 2)} consumed.
* @param publisher the source of {@link DataBuffer} which should be represented as an {@link InputStream}
* @param demand the maximum number of buffers to request from the Publisher and buffer on an ongoing basis
* @return an {@link InputStream} instance representing given {@link Publisher} messages
* Subscribe to given {@link Publisher} of {@code DataBuffer}s, and return an
* {@link InputStream} to consume the byte content with.
* <p>Byte buffers are stored in a queue. The {@code demand} constructor value
* determines the number of buffers requested initially. When storage falls
* below a {@code (demand - (demand >> 2))} limit, a request is made to refill
* the queue.
* <p>The {@code InputStream} terminates after an onError or onComplete signal,
* and stored buffers are read. If the {@code InputStream} is closed,
* the {@link Flow.Subscription} is cancelled, and stored buffers released.
* @param publisher the source of {@code DataBuffer}s
* @param demand the number of buffers to request initially, and buffer
* internally on an ongoing basis.
* @return an {@link InputStream} backed by the {@link Publisher}
*/
public static <T extends DataBuffer> InputStream subscriberInputStream(Publisher<T> publisher, int demand) {
Assert.notNull(publisher, "Publisher must not be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
Expand All @@ -36,7 +36,17 @@
import org.springframework.util.Assert;

/**
* Bridges between {@link Publisher Publisher&lt;DataBuffer&gt;} and {@link InputStream}.
* An {@link InputStream} backed by {@link Flow.Subscriber Flow.Subscriber}
* receiving byte buffers from a {@link Flow.Publisher} source.
*
* <p>Byte buffers are stored in a queue. The {@code demand} constructor value
* determines the number of buffers requested initially. When storage falls
* below a {@code (demand - (demand >> 2))} limit, a request is made to refill
* the queue.
*
* <p>The {@code InputStream} terminates after an onError or onComplete signal,
* and stored buffers are read. If the {@code InputStream} is closed,
* the {@link Flow.Subscription} is cancelled, and stored buffers released.
*
* <p>Note that this class has a near duplicate in
* {@link org.springframework.http.client.SubscriberInputStream}.
Expand Down Expand Up @@ -82,6 +92,11 @@ final class SubscriberInputStream extends InputStream implements Subscriber<Data
private Throwable error;


/**
* Create an instance.
* @param demand the number of buffers to request initially, and buffer
* internally on an ongoing basis.
*/
SubscriberInputStream(int demand) {
this.prefetch = demand;
this.limit = (demand == Integer.MAX_VALUE ? Integer.MAX_VALUE : demand - (demand >> 2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,21 @@
import org.apache.commons.logging.LogFactory;
import reactor.core.Exceptions;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/**
* Bridges between {@link Flow.Publisher Flow.Publisher&lt;T&gt;} and {@link InputStream}.
* An {@link InputStream} backed by {@link Flow.Subscriber Flow.Subscriber}
* receiving byte buffers from a {@link Flow.Publisher} source.
*
* <p>Byte buffers are stored in a queue. The {@code demand} constructor value
* determines the number of buffers requested initially. When storage falls
* below a {@code (demand - (demand >> 2))} limit, a request is made to refill
* the queue.
*
* <p>The {@code InputStream} terminates after an onError or onComplete signal,
* and stored buffers are read. If the {@code InputStream} is closed,
* the {@link Flow.Subscription} is cancelled, and stored buffers released.
*
* <p>Note that this class has a near duplicate in
* {@link org.springframework.core.io.buffer.SubscriberInputStream}.
Expand Down Expand Up @@ -94,7 +103,20 @@ final class SubscriberInputStream<T> extends InputStream implements Flow.Subscri
private Throwable error;


private SubscriberInputStream(Function<T, byte[]> mapper, Consumer<T> onDiscardHandler, int demand) {
/**
* Create an instance.
* @param mapper function to transform byte buffers to {@code byte[]};
* the function should also release the byte buffer if necessary.
* @param onDiscardHandler a callback to release byte buffers if the
* {@link InputStream} is closed prematurely.
* @param demand the number of buffers to request initially, and buffer
* internally on an ongoing basis.
*/
SubscriberInputStream(Function<T, byte[]> mapper, Consumer<T> onDiscardHandler, int demand) {
Assert.notNull(mapper, "mapper must not be null");
Assert.notNull(onDiscardHandler, "onDiscardHandler must not be null");
Assert.isTrue(demand > 0, "demand must be greater than 0");

this.mapper = mapper;
this.onDiscardHandler = onDiscardHandler;
this.prefetch = demand;
Expand All @@ -104,38 +126,6 @@ private SubscriberInputStream(Function<T, byte[]> mapper, Consumer<T> onDiscardH
}


/**
* Subscribes to given {@link Flow.Publisher} and returns subscription
* as {@link InputStream} that allows reading all propagated {@link DataBuffer} messages via its imperative API.
* Given the {@link InputStream} implementation buffers messages as per configuration.
* The returned {@link InputStream} is considered terminated when the given {@link Flow.Publisher} signaled one of the
* terminal signal ({@link Flow.Subscriber#onComplete() or {@link Flow.Subscriber#onError(Throwable)}})
* and all the stored {@link DataBuffer} polled from the internal buffer.
* The returned {@link InputStream} will call {@link Flow.Subscription#cancel()} and release all stored {@link DataBuffer}
* when {@link InputStream#close()} is called.
* <p>
* Note: The implementation of the returned {@link InputStream} disallow concurrent call on
* any of the {@link InputStream#read} methods
* <p>
* Note: {@link Flow.Subscription#request(long)} happens eagerly for the first time upon subscription
* and then repeats every time {@code bufferSize - (bufferSize >> 2)} consumed.
* @param publisher the source of {@link DataBuffer} which should be represented as an {@link InputStream}
* @param mapper function to transform &lt;T&gt; element to {@code byte[]}. Note, &lt;T&gt; should be released during the mapping if needed.
* @param onDiscardHandler &lt;T&gt; element consumer if returned {@link InputStream} is closed prematurely.
* @param demand the maximum number of buffers to request from the Publisher and buffer on an ongoing basis
* @return an {@link InputStream} instance representing given {@link Flow.Publisher} messages
*/
public static <T> InputStream subscribeTo(Flow.Publisher<T> publisher, Function<T, byte[]> mapper, Consumer<T> onDiscardHandler, int demand) {
Assert.notNull(publisher, "Flow.Publisher must not be null");
Assert.notNull(mapper, "mapper must not be null");
Assert.notNull(onDiscardHandler, "onDiscardHandler must not be null");
Assert.isTrue(demand > 0, "demand must be greater than 0");

SubscriberInputStream<T> iss = new SubscriberInputStream<>(mapper, onDiscardHandler, demand);
publisher.subscribe(iss);
return iss;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
if (this.subscription != null) {
Expand Down Expand Up @@ -222,7 +212,7 @@ private void resume() {
if (this.parkedThread != READY) {
Object old = this.parkedThread.getAndSet(READY);
if (old != READY) {
LockSupport.unpark((Thread)old);
LockSupport.unpark((Thread) old);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.springframework.http.client;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -28,7 +27,6 @@

import org.junit.jupiter.api.Test;
import org.reactivestreams.FlowAdapters;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -79,8 +77,8 @@ void basic() {
},
this.byteMapper, this.executor, null);

StepVerifier.create(toStringFlux(publisher))
.assertNext(s -> assertThat(s).isEqualTo("foobarbaz"))
StepVerifier.create(FlowAdapters.toPublisher(publisher))
.assertNext(s -> assertThat(s).containsExactly("foobarbaz".getBytes(UTF_8)))
.verifyComplete();
}

Expand All @@ -98,17 +96,20 @@ void flush() throws IOException {
this.byteMapper, this.executor, null);


try (InputStream is = SubscriberInputStream.subscribeTo(
toStringPublisher(publisher), s -> s.getBytes(UTF_8), s -> {}, 1)) {
try (SubscriberInputStream<byte[]> is = new SubscriberInputStream<>(s -> s, s -> {}, 1)) {
publisher.subscribe(is);

byte[] chunk = new byte[3];

assertThat(is.read(chunk)).isEqualTo(3);
assertThat(chunk).containsExactly(FOO);

assertThat(is.read(chunk)).isEqualTo(3);
assertThat(chunk).containsExactly(BAR);

assertThat(is.read(chunk)).isEqualTo(3);
assertThat(chunk).containsExactly(BAZ);

assertThat(is.read(chunk)).isEqualTo(-1);
}
}
Expand All @@ -123,8 +124,8 @@ void chunkSize() {
},
this.byteMapper, this.executor, 2);

try (InputStream is = SubscriberInputStream.subscribeTo(
toStringPublisher(publisher), s -> s.getBytes(UTF_8), s -> {}, 1)) {
try (SubscriberInputStream<byte[]> is = new SubscriberInputStream<>(s -> s, s -> {}, 1)) {
publisher.subscribe(is);

StringBuilder stringBuilder = new StringBuilder();
byte[] chunk = new byte[3];
Expand All @@ -148,7 +149,7 @@ void chunkSize() {
}

@Test
void cancel() throws InterruptedException {
void cancel() throws InterruptedException, IOException {
CountDownLatch latch = new CountDownLatch(1);

Flow.Publisher<byte[]> publisher = new OutputStreamPublisher<>(
Expand All @@ -165,27 +166,23 @@ void cancel() throws InterruptedException {

}, this.byteMapper, this.executor, null);

List<String> discarded = new ArrayList<>();

try (InputStream is = SubscriberInputStream.subscribeTo(
toStringPublisher(publisher), s -> s.getBytes(UTF_8), discarded::add, 1)) {
List<byte[]> discarded = new ArrayList<>();

try (SubscriberInputStream<byte[]> is = new SubscriberInputStream<>(s -> s, discarded::add, 1)) {
publisher.subscribe(is);
byte[] chunk = new byte[3];

assertThat(is.read(chunk)).isEqualTo(3);
assertThat(chunk).containsExactly(FOO);
}
catch (IOException e) {
throw new RuntimeException(e);
}

latch.await();

assertThat(discarded).containsExactly("bar");
assertThat(discarded).containsExactly("bar".getBytes(UTF_8));
}

@Test
void closed() throws InterruptedException {
void closed() throws InterruptedException, IOException {
CountDownLatch latch = new CountDownLatch(1);

Flow.Publisher<byte[]> publisher = new OutputStreamPublisher<>(
Expand All @@ -198,18 +195,14 @@ void closed() throws InterruptedException {
},
this.byteMapper, this.executor, null);

try (InputStream is = SubscriberInputStream.subscribeTo(
toStringPublisher(publisher), s -> s.getBytes(UTF_8), s -> {}, 1)) {

try (SubscriberInputStream<byte[]> is = new SubscriberInputStream<>(s -> s, s -> {}, 1)) {
publisher.subscribe(is);
byte[] chunk = new byte[3];

assertThat(is.read(chunk)).isEqualTo(3);
assertThat(chunk).containsExactly(FOO);
assertThat(is.read(chunk)).isEqualTo(-1);
}
catch (IOException e) {
throw new RuntimeException(e);
}

latch.await();
}
Expand All @@ -234,19 +227,11 @@ void mapperThrowsException() throws InterruptedException {
Throwable savedEx = null;

StringBuilder sb = new StringBuilder();
try (InputStream is = SubscriberInputStream.subscribeTo(
publisher, s -> { throw new NullPointerException("boom"); }, s -> {}, 1)) {
try (SubscriberInputStream<byte[]> is = new SubscriberInputStream<>(
s -> { throw new NullPointerException("boom"); }, s -> {}, 1)) {

byte[] chunk = new byte[3];

sb.append(new String(new byte[]{(byte)is.read()}, UTF_8));
assertThat(is.read(chunk)).isEqualTo(3);
sb.append(new String(chunk, UTF_8));
assertThat(is.read(chunk)).isEqualTo(3);
sb.append(new String(chunk, UTF_8));
assertThat(is.read(chunk)).isEqualTo(2);
sb.append(new String(chunk,0, 2, UTF_8));
assertThat(is.read()).isEqualTo(-1);
publisher.subscribe(is);
sb.append(new String(new byte[] {(byte) is.read()}, UTF_8));
}
catch (Throwable ex) {
savedEx = ex;
Expand All @@ -258,12 +243,4 @@ void mapperThrowsException() throws InterruptedException {
assertThat(savedEx).hasMessage("boom");
}

private static Flow.Publisher<String> toStringPublisher(Flow.Publisher<byte[]> publisher) {
return FlowAdapters.toFlowPublisher(toStringFlux(publisher));
}

private static Flux<String> toStringFlux(Flow.Publisher<byte[]> publisher) {
return Flux.from(FlowAdapters.toPublisher(publisher)).map(bytes -> new String(bytes, UTF_8));
}

}

0 comments on commit f1cfe7a

Please sign in to comment.