diff --git a/framework-platform/framework-platform.gradle b/framework-platform/framework-platform.gradle index 675469628794..36e9c31c55fe 100644 --- a/framework-platform/framework-platform.gradle +++ b/framework-platform/framework-platform.gradle @@ -16,8 +16,8 @@ dependencies { api(platform("org.apache.groovy:groovy-bom:4.0.21")) api(platform("org.apache.logging.log4j:log4j-bom:2.21.1")) api(platform("org.assertj:assertj-bom:3.26.0")) - api(platform("org.eclipse.jetty:jetty-bom:12.0.10")) - api(platform("org.eclipse.jetty.ee10:jetty-ee10-bom:12.0.10")) + api(platform("org.eclipse.jetty:jetty-bom:12.0.11")) + api(platform("org.eclipse.jetty.ee10:jetty-ee10-bom:12.0.11")) api(platform("org.jetbrains.kotlinx:kotlinx-coroutines-bom:1.7.3")) api(platform("org.jetbrains.kotlinx:kotlinx-serialization-bom:1.6.0")) api(platform("org.junit:junit-bom:5.10.3")) diff --git a/spring-core/spring-core.gradle b/spring-core/spring-core.gradle index fedd203d553c..91710c31c30b 100644 --- a/spring-core/spring-core.gradle +++ b/spring-core/spring-core.gradle @@ -81,6 +81,7 @@ dependencies { optional("io.smallrye.reactive:mutiny") optional("net.sf.jopt-simple:jopt-simple") optional("org.aspectj:aspectjweaver") + optional("org.eclipse.jetty:jetty-io") optional("org.jetbrains.kotlin:kotlin-reflect") optional("org.jetbrains.kotlin:kotlin-stdlib") optional("org.jetbrains.kotlinx:kotlinx-coroutines-core") diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java index 16b444dfb40f..d9d43da4eeaf 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DefaultDataBuffer.java @@ -355,7 +355,7 @@ public DefaultDataBuffer slice(int index, int length) { } @Override - public DataBuffer split(int index) { + public DefaultDataBuffer split(int index) { checkIndex(index); ByteBuffer split = this.byteBuffer.duplicate().clear() diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/JettyDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/JettyDataBuffer.java new file mode 100644 index 000000000000..f2eb53063197 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/JettyDataBuffer.java @@ -0,0 +1,359 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntPredicate; + +import org.eclipse.jetty.io.Content; + +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * Implementation of the {@code DataBuffer} interface that can wrap a Jetty + * {@link Content.Chunk}. Typically constructed with {@link JettyDataBufferFactory}. + * + * @author Greg Wilkins + * @author Lachlan Roberts + * @author Arjen Poutsma + * @since 6.2 + */ +public final class JettyDataBuffer implements PooledDataBuffer { + + private final DefaultDataBuffer delegate; + + @Nullable + private final Content.Chunk chunk; + + private final JettyDataBufferFactory bufferFactory; + + private final AtomicInteger refCount = new AtomicInteger(1); + + + JettyDataBuffer(JettyDataBufferFactory bufferFactory, DefaultDataBuffer delegate, Content.Chunk chunk) { + Assert.notNull(bufferFactory, "BufferFactory must not be null"); + Assert.notNull(delegate, "Delegate must not be null"); + Assert.notNull(chunk, "Chunk must not be null"); + + this.bufferFactory = bufferFactory; + this.delegate = delegate; + this.chunk = chunk; + this.chunk.retain(); + } + + JettyDataBuffer(JettyDataBufferFactory bufferFactory, DefaultDataBuffer delegate) { + Assert.notNull(bufferFactory, "BufferFactory must not be null"); + Assert.notNull(delegate, "Delegate must not be null"); + + this.bufferFactory = bufferFactory; + this.delegate = delegate; + this.chunk = null; + } + + @Override + public boolean isAllocated() { + return this.refCount.get() > 0; + } + + @Override + public PooledDataBuffer retain() { + int result = this.refCount.updateAndGet(c -> { + if (c != 0) { + return c + 1; + } + else { + return 0; + } + }); + if (result != 0 && this.chunk != null) { + this.chunk.retain(); + } + return this; + } + + @Override + public PooledDataBuffer touch(Object hint) { + return this; + } + + @Override + public boolean release() { + int result = this.refCount.updateAndGet(c -> { + if (c != 0) { + return c - 1; + } + else { + throw new IllegalStateException("JettyDataBuffer already released: " + this); + } + }); + if (this.chunk != null) { + return this.chunk.release(); + } + else { + return result == 0; + } + } + + @Override + public DataBufferFactory factory() { + return this.bufferFactory; + } + + // delegation + + @Override + public int indexOf(IntPredicate predicate, int fromIndex) { + return this.delegate.indexOf(predicate, fromIndex); + } + + @Override + public int lastIndexOf(IntPredicate predicate, int fromIndex) { + return this.delegate.lastIndexOf(predicate, fromIndex); + } + + @Override + public int readableByteCount() { + return this.delegate.readableByteCount(); + } + + @Override + public int writableByteCount() { + return this.delegate.writableByteCount(); + } + + @Override + public int capacity() { + return this.delegate.capacity(); + } + + @Override + @Deprecated + public DataBuffer capacity(int capacity) { + this.delegate.capacity(capacity); + return this; + } + + @Override + public DataBuffer ensureWritable(int capacity) { + this.delegate.ensureWritable(capacity); + return this; + } + + @Override + public int readPosition() { + return this.delegate.readPosition(); + } + + @Override + public DataBuffer readPosition(int readPosition) { + this.delegate.readPosition(readPosition); + return this; + } + + @Override + public int writePosition() { + return this.delegate.writePosition(); + } + + @Override + public DataBuffer writePosition(int writePosition) { + this.delegate.writePosition(writePosition); + return this; + } + + @Override + public byte getByte(int index) { + return this.delegate.getByte(index); + } + + @Override + public byte read() { + return this.delegate.read(); + } + + @Override + public DataBuffer read(byte[] destination) { + this.delegate.read(destination); + return this; + } + + @Override + public DataBuffer read(byte[] destination, int offset, int length) { + this.delegate.read(destination, offset, length); + return this; + } + + @Override + public DataBuffer write(byte b) { + this.delegate.write(b); + return this; + } + + @Override + public DataBuffer write(byte[] source) { + this.delegate.write(source); + return this; + } + + @Override + public DataBuffer write(byte[] source, int offset, int length) { + this.delegate.write(source, offset, length); + return this; + } + + @Override + public DataBuffer write(DataBuffer... buffers) { + this.delegate.write(buffers); + return this; + } + + @Override + public DataBuffer write(ByteBuffer... buffers) { + this.delegate.write(buffers); + return this; + } + + @Override + @Deprecated + public DataBuffer slice(int index, int length) { + DefaultDataBuffer delegateSlice = this.delegate.slice(index, length); + if (this.chunk != null) { + this.chunk.retain(); + return new JettyDataBuffer(this.bufferFactory, delegateSlice, this.chunk); + } + else { + return new JettyDataBuffer(this.bufferFactory, delegateSlice); + } + } + + @Override + public DataBuffer split(int index) { + DefaultDataBuffer delegateSplit = this.delegate.split(index); + if (this.chunk != null) { + this.chunk.retain(); + return new JettyDataBuffer(this.bufferFactory, delegateSplit, this.chunk); + } + else { + return new JettyDataBuffer(this.bufferFactory, delegateSplit); + } + } + + @Override + @Deprecated + public ByteBuffer asByteBuffer() { + return this.delegate.asByteBuffer(); + } + + @Override + @Deprecated + public ByteBuffer asByteBuffer(int index, int length) { + return this.delegate.asByteBuffer(index, length); + } + + @Override + @Deprecated + public ByteBuffer toByteBuffer(int index, int length) { + return this.delegate.toByteBuffer(index, length); + } + + @Override + public void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length) { + this.delegate.toByteBuffer(srcPos, dest, destPos, length); + } + + @Override + public ByteBufferIterator readableByteBuffers() { + ByteBufferIterator delegateIterator = this.delegate.readableByteBuffers(); + if (this.chunk != null) { + return new JettyByteBufferIterator(delegateIterator, this.chunk); + } + else { + return delegateIterator; + } + } + + @Override + public ByteBufferIterator writableByteBuffers() { + ByteBufferIterator delegateIterator = this.delegate.writableByteBuffers(); + if (this.chunk != null) { + return new JettyByteBufferIterator(delegateIterator, this.chunk); + } + else { + return delegateIterator; + } + } + + @Override + public String toString(int index, int length, Charset charset) { + return this.delegate.toString(index, length, charset); + } + + @Override + public int hashCode() { + return this.delegate.hashCode(); + } + + @Override + public boolean equals(Object o) { + return this == o || (o instanceof JettyDataBuffer other && + this.delegate.equals(other.delegate)); + } + + @Override + public String toString() { + return String.format("JettyDataBuffer (r: %d, w: %d, c: %d)", + readPosition(), writePosition(), capacity()); + } + + private static final class JettyByteBufferIterator implements ByteBufferIterator { + + private final ByteBufferIterator delegate; + + private final Content.Chunk chunk; + + + public JettyByteBufferIterator(ByteBufferIterator delegate, Content.Chunk chunk) { + Assert.notNull(delegate, "Delegate must not be null"); + Assert.notNull(chunk, "Chunk must not be null"); + + this.delegate = delegate; + this.chunk = chunk; + this.chunk.retain(); + } + + + @Override + public void close() { + this.delegate.close(); + this.chunk.release(); + } + + @Override + public boolean hasNext() { + return this.delegate.hasNext(); + } + + @Override + public ByteBuffer next() { + return this.delegate.next(); + } + } + +} diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/JettyDataBufferFactory.java b/spring-core/src/main/java/org/springframework/core/io/buffer/JettyDataBufferFactory.java new file mode 100644 index 000000000000..7034a60f2a22 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/JettyDataBufferFactory.java @@ -0,0 +1,108 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +import java.nio.ByteBuffer; +import java.util.List; + +import org.eclipse.jetty.io.Content; + +/** + * Implementation of the {@code DataBufferFactory} interface that creates + * {@link JettyDataBuffer} instances. + * + * @author Arjen Poutsma + * @since 6.2 + */ +public class JettyDataBufferFactory implements DataBufferFactory { + + private final DefaultDataBufferFactory delegate; + + + /** + * Creates a new {@code JettyDataBufferFactory} with default settings. + */ + public JettyDataBufferFactory() { + this(false); + } + + /** + * Creates a new {@code JettyDataBufferFactory}, indicating whether direct + * buffers should be created by {@link #allocateBuffer()} and + * {@link #allocateBuffer(int)}. + * @param preferDirect {@code true} if direct buffers are to be preferred; + * {@code false} otherwise + */ + public JettyDataBufferFactory(boolean preferDirect) { + this(preferDirect, DefaultDataBufferFactory.DEFAULT_INITIAL_CAPACITY); + } + + /** + * Creates a new {@code JettyDataBufferFactory}, indicating whether direct + * buffers should be created by {@link #allocateBuffer()} and + * {@link #allocateBuffer(int)}, and what the capacity is to be used for + * {@link #allocateBuffer()}. + * @param preferDirect {@code true} if direct buffers are to be preferred; + * {@code false} otherwise + */ + public JettyDataBufferFactory(boolean preferDirect, int defaultInitialCapacity) { + this.delegate = new DefaultDataBufferFactory(preferDirect, defaultInitialCapacity); + } + + + @Override + @Deprecated + public JettyDataBuffer allocateBuffer() { + DefaultDataBuffer delegate = this.delegate.allocateBuffer(); + return new JettyDataBuffer(this, delegate); + } + + @Override + public JettyDataBuffer allocateBuffer(int initialCapacity) { + DefaultDataBuffer delegate = this.delegate.allocateBuffer(initialCapacity); + return new JettyDataBuffer(this, delegate); + } + + @Override + public JettyDataBuffer wrap(ByteBuffer byteBuffer) { + DefaultDataBuffer delegate = this.delegate.wrap(byteBuffer); + return new JettyDataBuffer(this, delegate); + } + + @Override + public JettyDataBuffer wrap(byte[] bytes) { + DefaultDataBuffer delegate = this.delegate.wrap(bytes); + return new JettyDataBuffer(this, delegate); + } + + public JettyDataBuffer wrap(Content.Chunk chunk) { + ByteBuffer byteBuffer = chunk.getByteBuffer(); + DefaultDataBuffer delegate = this.delegate.wrap(byteBuffer); + return new JettyDataBuffer(this, delegate, chunk); + } + + @Override + public JettyDataBuffer join(List dataBuffers) { + DefaultDataBuffer delegate = this.delegate.join(dataBuffers); + return new JettyDataBuffer(this, delegate); + } + + @Override + public boolean isDirect() { + return this.delegate.isDirect(); + } +} diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/JettyDataBufferTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/JettyDataBufferTests.java new file mode 100644 index 000000000000..456338c10917 --- /dev/null +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/JettyDataBufferTests.java @@ -0,0 +1,59 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.io.buffer; + +import java.nio.ByteBuffer; + +import org.eclipse.jetty.io.Content; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalStateException; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.then; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + +/** + * @author Arjen Poutsma + */ +public class JettyDataBufferTests { + + private final JettyDataBufferFactory dataBufferFactory = new JettyDataBufferFactory(); + + @Test + void releaseRetainChunk() { + ByteBuffer buffer = ByteBuffer.allocate(3); + Content.Chunk mockChunk = mock(); + given(mockChunk.getByteBuffer()).willReturn(buffer); + given(mockChunk.release()).willReturn(false, false, true); + + + + JettyDataBuffer dataBuffer = this.dataBufferFactory.wrap(mockChunk); + dataBuffer.retain(); + dataBuffer.retain(); + assertThat(dataBuffer.release()).isFalse(); + assertThat(dataBuffer.release()).isFalse(); + assertThat(dataBuffer.release()).isTrue(); + + assertThatIllegalStateException().isThrownBy(dataBuffer::release); + + then(mockChunk).should(times(3)).retain(); + then(mockChunk).should(times(3)).release(); + } +} diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/PooledDataBufferTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/PooledDataBufferTests.java index 5f353ad5967e..87843ca6b678 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/PooledDataBufferTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/PooledDataBufferTests.java @@ -69,6 +69,15 @@ public DataBufferFactory createDataBufferFactory() { } } + @Nested + class Jetty implements PooledDataBufferTestingTrait { + + @Override + public DataBufferFactory createDataBufferFactory() { + return new JettyDataBufferFactory(); + } + } + interface PooledDataBufferTestingTrait { @@ -82,10 +91,14 @@ default PooledDataBuffer createDataBuffer(int capacity) { default void retainAndRelease() { PooledDataBuffer buffer = createDataBuffer(1); buffer.write((byte) 'a'); + assertThat(buffer.isAllocated()).isTrue(); buffer.retain(); + assertThat(buffer.isAllocated()).isTrue(); assertThat(buffer.release()).isFalse(); + assertThat(buffer.isAllocated()).isTrue(); assertThat(buffer.release()).isTrue(); + assertThat(buffer.isAllocated()).isFalse(); } @Test diff --git a/spring-web/spring-web.gradle b/spring-web/spring-web.gradle index 03b4a0ff19a8..7585e176b996 100644 --- a/spring-web/spring-web.gradle +++ b/spring-web/spring-web.gradle @@ -72,6 +72,7 @@ dependencies { because("needed by Netty's SelfSignedCertificate on JDK 15+") } testFixturesImplementation("org.eclipse.jetty.ee10.websocket:jetty-ee10-websocket-jetty-server") + testFixturesImplementation("org.eclipse.jetty.websocket:jetty-websocket-jetty-server") testImplementation(project(":spring-core-test")) testImplementation(testFixtures(project(":spring-beans"))) testImplementation(testFixtures(project(":spring-context"))) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java index a2895f6ded57..071a161e9096 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java @@ -17,24 +17,16 @@ package org.springframework.http.client.reactive; import java.net.URI; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.IntPredicate; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.Request; -import org.eclipse.jetty.io.Content; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.DefaultDataBufferFactory; -import org.springframework.core.io.buffer.PooledDataBuffer; -import org.springframework.core.io.buffer.TouchableDataBuffer; +import org.springframework.core.io.buffer.JettyDataBufferFactory; import org.springframework.http.HttpMethod; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -50,7 +42,7 @@ public class JettyClientHttpConnector implements ClientHttpConnector { private final HttpClient httpClient; - private DataBufferFactory bufferFactory = DefaultDataBufferFactory.sharedInstance; + private JettyDataBufferFactory bufferFactory = new JettyDataBufferFactory(); /** @@ -103,7 +95,7 @@ public JettyClientHttpConnector(JettyResourceFactory resourceFactory, @Nullable /** * Set the buffer factory to use. */ - public void setBufferFactory(DataBufferFactory bufferFactory) { + public void setBufferFactory(JettyDataBufferFactory bufferFactory) { this.bufferFactory = bufferFactory; } @@ -134,289 +126,9 @@ public Mono connect(HttpMethod method, URI uri, private Mono execute(JettyClientHttpRequest request) { return Mono.fromDirect(request.toReactiveRequest() .response((reactiveResponse, chunkPublisher) -> { - Flux content = Flux.from(chunkPublisher).map(this::toDataBuffer); + Flux content = Flux.from(chunkPublisher).map(this.bufferFactory::wrap); return Mono.just(new JettyClientHttpResponse(reactiveResponse, content)); })); } - private DataBuffer toDataBuffer(Content.Chunk chunk) { - DataBuffer delegate = this.bufferFactory.wrap(chunk.getByteBuffer()); - return new JettyDataBuffer(delegate, chunk); - } - - - private static final class JettyDataBuffer implements PooledDataBuffer { - - private final DataBuffer delegate; - - private final Content.Chunk chunk; - - private final AtomicInteger refCount = new AtomicInteger(1); - - - public JettyDataBuffer(DataBuffer delegate, Content.Chunk chunk) { - Assert.notNull(delegate, "Delegate must not be null"); - Assert.notNull(chunk, "Chunk must not be null"); - - this.delegate = delegate; - this.chunk = chunk; - } - - @Override - public boolean isAllocated() { - return this.refCount.get() > 0; - } - - @Override - public PooledDataBuffer retain() { - if (this.delegate instanceof PooledDataBuffer pooledDelegate) { - pooledDelegate.retain(); - } - this.chunk.retain(); - this.refCount.getAndUpdate(c -> { - if (c != 0) { - return c + 1; - } - else { - return 0; - } - }); - return this; - } - - @Override - public boolean release() { - if (this.delegate instanceof PooledDataBuffer pooledDelegate) { - pooledDelegate.release(); - } - this.chunk.release(); - int refCount = this.refCount.updateAndGet(c -> { - if (c != 0) { - return c - 1; - } - else { - throw new IllegalStateException("already released " + this); - } - }); - return refCount == 0; - } - - @Override - public PooledDataBuffer touch(Object hint) { - if (this.delegate instanceof TouchableDataBuffer touchableDelegate) { - touchableDelegate.touch(hint); - } - return this; - } - - // delegation - - @Override - public DataBufferFactory factory() { - return this.delegate.factory(); - } - - @Override - public int indexOf(IntPredicate predicate, int fromIndex) { - return this.delegate.indexOf(predicate, fromIndex); - } - - @Override - public int lastIndexOf(IntPredicate predicate, int fromIndex) { - return this.delegate.lastIndexOf(predicate, fromIndex); - } - - @Override - public int readableByteCount() { - return this.delegate.readableByteCount(); - } - - @Override - public int writableByteCount() { - return this.delegate.writableByteCount(); - } - - @Override - public int capacity() { - return this.delegate.capacity(); - } - - @Override - @Deprecated - public DataBuffer capacity(int capacity) { - this.delegate.capacity(capacity); - return this; - } - - @Override - public DataBuffer ensureWritable(int capacity) { - this.delegate.ensureWritable(capacity); - return this; - } - - @Override - public int readPosition() { - return this.delegate.readPosition(); - } - - @Override - public DataBuffer readPosition(int readPosition) { - this.delegate.readPosition(readPosition); - return this; - } - - @Override - public int writePosition() { - return this.delegate.writePosition(); - } - - @Override - public DataBuffer writePosition(int writePosition) { - this.delegate.writePosition(writePosition); - return this; - } - - @Override - public byte getByte(int index) { - return this.delegate.getByte(index); - } - - @Override - public byte read() { - return this.delegate.read(); - } - - @Override - public DataBuffer read(byte[] destination) { - this.delegate.read(destination); - return this; - } - - @Override - public DataBuffer read(byte[] destination, int offset, int length) { - this.delegate.read(destination, offset, length); - return this; - } - - @Override - public DataBuffer write(byte b) { - this.delegate.write(b); - return this; - } - - @Override - public DataBuffer write(byte[] source) { - this.delegate.write(source); - return this; - } - - @Override - public DataBuffer write(byte[] source, int offset, int length) { - this.delegate.write(source, offset, length); - return this; - } - - @Override - public DataBuffer write(DataBuffer... buffers) { - this.delegate.write(buffers); - return this; - } - - @Override - public DataBuffer write(ByteBuffer... buffers) { - this.delegate.write(buffers); - return this; - } - - @Override - @Deprecated - public DataBuffer slice(int index, int length) { - DataBuffer delegateSlice = this.delegate.slice(index, length); - this.chunk.retain(); - return new JettyDataBuffer(delegateSlice, this.chunk); - } - - @Override - public DataBuffer split(int index) { - DataBuffer delegateSplit = this.delegate.split(index); - this.chunk.retain(); - return new JettyDataBuffer(delegateSplit, this.chunk); - } - - @Override - @Deprecated - public ByteBuffer asByteBuffer() { - return this.delegate.asByteBuffer(); - } - - @Override - @Deprecated - public ByteBuffer asByteBuffer(int index, int length) { - return this.delegate.asByteBuffer(index, length); - } - - @Override - @Deprecated - public ByteBuffer toByteBuffer(int index, int length) { - return this.delegate.toByteBuffer(index, length); - } - - @Override - public void toByteBuffer(int srcPos, ByteBuffer dest, int destPos, int length) { - this.delegate.toByteBuffer(srcPos, dest, destPos, length); - } - - @Override - public ByteBufferIterator readableByteBuffers() { - ByteBufferIterator delegateIterator = this.delegate.readableByteBuffers(); - return new JettyByteBufferIterator(delegateIterator, this.chunk); - } - - @Override - public ByteBufferIterator writableByteBuffers() { - ByteBufferIterator delegateIterator = this.delegate.writableByteBuffers(); - return new JettyByteBufferIterator(delegateIterator, this.chunk); - } - - @Override - public String toString(int index, int length, Charset charset) { - return this.delegate.toString(index, length, charset); - } - - - private static final class JettyByteBufferIterator implements ByteBufferIterator { - - private final ByteBufferIterator delegate; - - private final Content.Chunk chunk; - - - public JettyByteBufferIterator(ByteBufferIterator delegate, Content.Chunk chunk) { - Assert.notNull(delegate, "Delegate must not be null"); - Assert.notNull(chunk, "Chunk must not be null"); - - this.delegate = delegate; - this.chunk = chunk; - this.chunk.retain(); - } - - - @Override - public void close() { - this.delegate.close(); - this.chunk.release(); - } - - @Override - public boolean hasNext() { - return this.delegate.hasNext(); - } - - @Override - public ByteBuffer next() { - return this.delegate.next(); - } - } - } - } diff --git a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java index 9eb8bff26a46..f3112515eefc 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java +++ b/spring-web/src/main/java/org/springframework/http/codec/multipart/MultipartParser.java @@ -100,7 +100,6 @@ public static Flux parse(Flux buffers, byte[] boundary, int m return Flux.create(sink -> { MultipartParser parser = new MultipartParser(sink, boundary, maxHeadersSize, headersCharset); sink.onCancel(parser::onSinkCancel); - sink.onRequest(n -> parser.requestBuffer()); buffers.subscribe(parser); }); } @@ -112,7 +111,9 @@ public Context currentContext() { @Override protected void hookOnSubscribe(Subscription subscription) { - requestBuffer(); + if (this.sink.requestedFromDownstream() > 0) { + requestBuffer(); + } } @Override diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/DefaultServerHttpRequestBuilder.java b/spring-web/src/main/java/org/springframework/http/server/reactive/DefaultServerHttpRequestBuilder.java index a94e378e3c3c..59ae6eb3f5ce 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/DefaultServerHttpRequestBuilder.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/DefaultServerHttpRequestBuilder.java @@ -30,6 +30,7 @@ import org.springframework.http.HttpMethod; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.util.StringUtils; @@ -70,7 +71,8 @@ public DefaultServerHttpRequestBuilder(ServerHttpRequest original) { Assert.notNull(original, "ServerHttpRequest is required"); this.uri = original.getURI(); - this.headers = new HttpHeaders(original.getHeaders()); + // original headers can be immutable, so create a copy + this.headers = new HttpHeaders(new LinkedMultiValueMap<>(original.getHeaders())); this.httpMethod = original.getMethod(); this.contextPath = original.getPath().contextPath().value(); this.remoteAddress = original.getRemoteAddress(); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreHttpHandlerAdapter.java new file mode 100644 index 000000000000..08994a11f624 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreHttpHandlerAdapter.java @@ -0,0 +1,60 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; +import org.eclipse.jetty.util.Callback; + +import org.springframework.core.io.buffer.JettyDataBufferFactory; +import org.springframework.util.Assert; + +/** + * Adapt {@link HttpHandler} to the Jetty {@link org.eclipse.jetty.server.Handler} abstraction. + * + * @author Greg Wilkins + * @author Lachlan Roberts + * @author Arjen Poutsma + * @since 6.2 + */ +public class JettyCoreHttpHandlerAdapter extends Handler.Abstract.NonBlocking { + + private final HttpHandler httpHandler; + + private JettyDataBufferFactory dataBufferFactory = new JettyDataBufferFactory(); + + + public JettyCoreHttpHandlerAdapter(HttpHandler httpHandler) { + this.httpHandler = httpHandler; + } + + public void setDataBufferFactory(JettyDataBufferFactory dataBufferFactory) { + Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null"); + this.dataBufferFactory = dataBufferFactory; + } + + + @Override + public boolean handle(Request request, Response response, Callback callback) throws Exception { + this.httpHandler.handle(new JettyCoreServerHttpRequest(request, this.dataBufferFactory), + new JettyCoreServerHttpResponse(response, this.dataBufferFactory)) + .subscribe(unused -> {}, callback::failed, callback::succeeded); + return true; + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpRequest.java new file mode 100644 index 000000000000..d97832a2ff7c --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpRequest.java @@ -0,0 +1,120 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Collections; +import java.util.List; + +import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.server.Request; +import org.reactivestreams.FlowAdapters; +import reactor.core.publisher.Flux; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.JettyDataBufferFactory; +import org.springframework.http.HttpCookie; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.support.JettyHeadersAdapter; +import org.springframework.lang.Nullable; +import org.springframework.util.CollectionUtils; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +/** + * Adapt an Eclipse Jetty {@link Request} to a {@link org.springframework.http.server.ServerHttpRequest}. + * + * @author Greg Wilkins + * @author Arjen Poutsma + * @since 6.2 + */ +class JettyCoreServerHttpRequest extends AbstractServerHttpRequest { + + private final JettyDataBufferFactory dataBufferFactory; + + private final Request request; + + + public JettyCoreServerHttpRequest(Request request, JettyDataBufferFactory dataBufferFactory) { + super(HttpMethod.valueOf(request.getMethod()), + request.getHttpURI().toURI(), + request.getContext().getContextPath(), + new HttpHeaders(new JettyHeadersAdapter(request.getHeaders()))); + this.dataBufferFactory = dataBufferFactory; + this.request = request; + } + + @Override + protected MultiValueMap initCookies() { + List httpCookies = Request.getCookies(this.request); + if (httpCookies.isEmpty()) { + return CollectionUtils.toMultiValueMap(Collections.emptyMap()); + } + MultiValueMap cookies =new LinkedMultiValueMap<>(); + for (org.eclipse.jetty.http.HttpCookie c : httpCookies) { + cookies.add(c.getName(), new HttpCookie(c.getName(), c.getValue())); + } + return cookies; + } + + @Override + @Nullable + public SslInfo initSslInfo() { + if (this.request.getConnectionMetaData().isSecure() && + this.request.getAttribute(EndPoint.SslSessionData.ATTRIBUTE) instanceof EndPoint.SslSessionData sessionData) { + return new DefaultSslInfo(sessionData.sslSessionId(), sessionData.peerCertificates()); + } + return null; + } + + @SuppressWarnings("unchecked") + @Override + public T getNativeRequest() { + return (T) this.request; + } + + @Override + protected String initId() { + return this.request.getId(); + } + + @Override + @Nullable + public InetSocketAddress getLocalAddress() { + SocketAddress localAddress = this.request.getConnectionMetaData().getLocalSocketAddress(); + return localAddress instanceof InetSocketAddress inet ? inet : null; + } + + @Override + @Nullable + public InetSocketAddress getRemoteAddress() { + SocketAddress remoteAddress = this.request.getConnectionMetaData().getRemoteSocketAddress(); + return remoteAddress instanceof InetSocketAddress inet ? inet : null; + } + + @Override + public Flux getBody() { + // We access the request body as a Flow.Publisher, which is wrapped as an org.reactivestreams.Publisher and + // then wrapped as a Flux. + return Flux.from(FlowAdapters.toPublisher(Content.Source.asPublisher(this.request))) + .map(this.dataBufferFactory::wrap); + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpResponse.java new file mode 100644 index 000000000000..fbf533ec056d --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/JettyCoreServerHttpResponse.java @@ -0,0 +1,237 @@ +/* + * Copyright 2002-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; + +import org.eclipse.jetty.http.HttpCookie; +import org.eclipse.jetty.http.HttpField; +import org.eclipse.jetty.io.Content; +import org.eclipse.jetty.server.HttpCookieUtils; +import org.eclipse.jetty.server.Response; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IteratingCallback; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.JettyDataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatusCode; +import org.springframework.http.ResponseCookie; +import org.springframework.http.ZeroCopyHttpOutputMessage; +import org.springframework.http.support.JettyHeadersAdapter; +import org.springframework.lang.Nullable; + +/** + * Adapt an Eclipse Jetty {@link Response} to a {@link org.springframework.http.server.ServerHttpResponse}. + * + * @author Greg Wilkins + * @author Lachlan Roberts + * @since 6.2 + */ +class JettyCoreServerHttpResponse extends AbstractServerHttpResponse implements ZeroCopyHttpOutputMessage { + + private final Response response; + + public JettyCoreServerHttpResponse(Response response, JettyDataBufferFactory dataBufferFactory) { + super(dataBufferFactory, new HttpHeaders(new JettyHeadersAdapter(response.getHeaders()))); + this.response = response; + + // remove all existing cookies from the response and add them to the cookie map, to be added back later + for (ListIterator i = this.response.getHeaders().listIterator(); i.hasNext(); ) { + HttpField f = i.next(); + if (f instanceof HttpCookieUtils.SetCookieHttpField setCookieHttpField) { + HttpCookie httpCookie = setCookieHttpField.getHttpCookie(); + ResponseCookie responseCookie = ResponseCookie.from(httpCookie.getName(), httpCookie.getValue()) + .httpOnly(httpCookie.isHttpOnly()) + .domain(httpCookie.getDomain()) + .maxAge(httpCookie.getMaxAge()) + .sameSite(httpCookie.getSameSite().name()) + .secure(httpCookie.isSecure()) + .partitioned(httpCookie.isPartitioned()) + .build(); + this.addCookie(responseCookie); + i.remove(); + } + } + } + + @Override + protected Mono writeWithInternal(Publisher body) { + return Flux.from(body) + .concatMap(this::sendDataBuffer) + .then(); + } + + @Override + protected Mono writeAndFlushWithInternal(Publisher> body) { + return Flux.from(body).concatMap(this::writeWithInternal).then(); + } + + @Override + protected void applyStatusCode() { + HttpStatusCode status = getStatusCode(); + this.response.setStatus(status == null ? 0 : status.value()); + } + + @Override + protected void applyHeaders() { + } + + @Override + protected void applyCookies() { + this.getCookies().values().stream() + .flatMap(List::stream) + .forEach(cookie -> Response.addCookie(this.response, new ResponseHttpCookie(cookie))); + } + + @Override + public Mono writeWith(Path file, long position, long count) { + Callback.Completable callback = new Callback.Completable(); + Mono mono = Mono.fromFuture(callback); + try { + Content.copy(Content.Source.from(null, file, position, count), this.response, callback); + } + catch (Throwable th) { + callback.failed(th); + } + return doCommit(() -> mono); + } + + private Mono sendDataBuffer(DataBuffer dataBuffer) { + return Mono.defer(() -> { + DataBuffer.ByteBufferIterator byteBufferIterator = dataBuffer.readableByteBuffers(); + Callback.Completable callback = new Callback.Completable(); + new IteratingCallback() { + @Override + protected Action process() { + if (!byteBufferIterator.hasNext()) { + return Action.SUCCEEDED; + } + response.write(false, byteBufferIterator.next(), this); + return Action.SCHEDULED; + } + + @Override + protected void onCompleteSuccess() { + byteBufferIterator.close(); + DataBufferUtils.release(dataBuffer); + callback.complete(null); + } + + @Override + protected void onCompleteFailure(Throwable cause) { + byteBufferIterator.close(); + DataBufferUtils.release(dataBuffer); + callback.failed(cause); + } + }.iterate(); + + return Mono.fromFuture(callback); + }); + } + + @SuppressWarnings("unchecked") + @Override + public T getNativeResponse() { + return (T) this.response; + } + + private static class ResponseHttpCookie implements org.eclipse.jetty.http.HttpCookie { + private final ResponseCookie responseCookie; + + public ResponseHttpCookie(ResponseCookie responseCookie) { + this.responseCookie = responseCookie; + } + + public ResponseCookie getResponseCookie() { + return this.responseCookie; + } + + @Override + public String getName() { + return this.responseCookie.getName(); + } + + @Override + public String getValue() { + return this.responseCookie.getValue(); + } + + @Override + public int getVersion() { + return 0; + } + + @Override + public long getMaxAge() { + return this.responseCookie.getMaxAge().toSeconds(); + } + + @Override + @Nullable + public String getComment() { + return null; + } + + @Override + @Nullable + public String getDomain() { + return this.responseCookie.getDomain(); + } + + @Override + @Nullable + public String getPath() { + return this.responseCookie.getPath(); + } + + @Override + public boolean isSecure() { + return this.responseCookie.isSecure(); + } + + @Nullable + @Override + public SameSite getSameSite() { + // Adding non-null return site breaks tests. + return null; + } + + @Override + public boolean isHttpOnly() { + return this.responseCookie.isHttpOnly(); + } + + @Override + public boolean isPartitioned() { + return this.responseCookie.isPartitioned(); + } + + @Override + public Map getAttributes() { + return Collections.emptyMap(); + } + } +} diff --git a/spring-web/src/main/java/org/springframework/http/support/JettyHeadersAdapter.java b/spring-web/src/main/java/org/springframework/http/support/JettyHeadersAdapter.java index 57f9b7ab8943..83c7b9fff490 100644 --- a/spring-web/src/main/java/org/springframework/http/support/JettyHeadersAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/support/JettyHeadersAdapter.java @@ -17,9 +17,11 @@ package org.springframework.http.support; import java.util.AbstractSet; +import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Set; @@ -44,6 +46,9 @@ public final class JettyHeadersAdapter implements MultiValueMap private final HttpFields headers; + @Nullable + private final HttpFields.Mutable mutable; + /** * Creates a new {@code JettyHeadersAdapter} based on the given @@ -53,6 +58,7 @@ public final class JettyHeadersAdapter implements MultiValueMap public JettyHeadersAdapter(HttpFields headers) { Assert.notNull(headers, "Headers must not be null"); this.headers = headers; + this.mutable = headers instanceof HttpFields.Mutable m ? m : null; } @@ -119,22 +125,36 @@ public boolean isEmpty() { @Override public boolean containsKey(Object key) { - return (key instanceof String headerName && this.headers.contains(headerName)); + return (key instanceof String name && this.headers.contains(name)); } @Override public boolean containsValue(Object value) { - return (value instanceof String searchString && - this.headers.stream().anyMatch(field -> field.contains(searchString))); + if (value instanceof String searchString) { + for (HttpField field : this.headers) { + if (field.contains(searchString)) { + return true; + } + } + } + return false; } @Nullable @Override public List get(Object key) { - if (containsKey(key)) { - return this.headers.getValuesList((String) key); + List list = null; + if (key instanceof String name) { + for (HttpField f : this.headers) { + if (f.is(name)) { + if (list == null) { + list = new ArrayList<>(); + } + list.add(f.getValue()); + } + } } - return null; + return list; } @Nullable @@ -142,7 +162,21 @@ public List get(Object key) { public List put(String key, List value) { HttpFields.Mutable mutableHttpFields = mutableFields(); List oldValues = get(key); - mutableHttpFields.put(key, value); + + if (oldValues == null) { + switch (value.size()) { + case 0 -> {} + case 1 -> mutableHttpFields.add(key, value.get(0)); + default -> mutableHttpFields.add(key, value); + } + } + else { + switch (value.size()) { + case 0 -> mutableHttpFields.remove(key); + case 1 -> mutableHttpFields.put(key, value.get(0)); + default -> mutableHttpFields.put(key, value); + } + } return oldValues; } @@ -150,12 +184,20 @@ public List put(String key, List value) { @Override public List remove(Object key) { HttpFields.Mutable mutableHttpFields = mutableFields(); + List list = null; if (key instanceof String name) { - List oldValues = get(key); - mutableHttpFields.remove(name); - return oldValues; + for (ListIterator i = mutableHttpFields.listIterator(); i.hasNext(); ) { + HttpField f = i.next(); + if (f.is(name)) { + if (list == null) { + list = new ArrayList<>(); + } + list.add(f.getValue()); + i.remove(); + } + } } - return null; + return list; } @Override @@ -187,6 +229,7 @@ public Set>> entrySet() { public Iterator>> iterator() { return new EntryIterator(); } + @Override public int size() { return headers.size(); @@ -195,16 +238,12 @@ public int size() { } private HttpFields.Mutable mutableFields() { - if (this.headers instanceof HttpFields.Mutable mutableHttpFields) { - return mutableHttpFields; - } - else { + if (this.mutable == null) { throw new IllegalStateException("Immutable headers"); } + return this.mutable; } - - @Override public String toString() { return HttpHeaders.formatHeaders(this); diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ErrorHandlerIntegrationTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ErrorHandlerIntegrationTests.java index eeb1bda99a20..145d6a8cedc8 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ErrorHandlerIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ErrorHandlerIntegrationTests.java @@ -27,6 +27,7 @@ import org.springframework.web.client.RestTemplate; import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests; import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyCoreHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer; import static org.assertj.core.api.Assertions.assertThat; @@ -87,8 +88,8 @@ void emptyPathSegments(HttpServer httpServer) throws Exception { // but an application can apply CompactPathRule via RewriteHandler: // https://www.eclipse.org/jetty/documentation/jetty-11/programming_guide.php - HttpStatus expectedStatus = - (httpServer instanceof JettyHttpServer ? HttpStatus.BAD_REQUEST : HttpStatus.OK); + HttpStatus expectedStatus = (httpServer instanceof JettyHttpServer || httpServer instanceof JettyCoreHttpServer + ? HttpStatus.BAD_REQUEST : HttpStatus.OK); assertThat(response.getStatusCode()).isEqualTo(expectedStatus); } diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java index 587825b0b066..eb243953e085 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ZeroCopyIntegrationTests.java @@ -30,6 +30,7 @@ import org.springframework.web.client.RestTemplate; import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests; import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyCoreHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.UndertowHttpServer; @@ -54,8 +55,8 @@ protected HttpHandler createHttpHandler() { @ParameterizedHttpServerTest void zeroCopy(HttpServer httpServer) throws Exception { - assumeTrue(httpServer instanceof ReactorHttpServer || httpServer instanceof UndertowHttpServer, - "Zero-copy does not support Servlet"); + assumeTrue(httpServer instanceof ReactorHttpServer || httpServer instanceof UndertowHttpServer + || httpServer instanceof JettyCoreHttpServer, "Zero-copy does not support Servlet"); startServer(httpServer); diff --git a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/AbstractHttpHandlerIntegrationTests.java b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/AbstractHttpHandlerIntegrationTests.java index f49b91fec5f6..0aa39aa025e6 100644 --- a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/AbstractHttpHandlerIntegrationTests.java +++ b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/AbstractHttpHandlerIntegrationTests.java @@ -126,6 +126,7 @@ public static Flux testInterval(Duration period, int count) { static Stream> httpServers() { return Stream.of( named("Jetty", new JettyHttpServer()), + named("Jetty Core", new JettyCoreHttpServer()), named("Reactor Netty", new ReactorHttpServer()), named("Tomcat", new TomcatHttpServer()), named("Undertow", new UndertowHttpServer()) diff --git a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/JettyCoreHttpServer.java b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/JettyCoreHttpServer.java new file mode 100644 index 000000000000..488e0e429ef9 --- /dev/null +++ b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/JettyCoreHttpServer.java @@ -0,0 +1,98 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.testfixture.http.server.reactive.bootstrap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.eclipse.jetty.io.ArrayByteBufferPool; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.websocket.server.ServerWebSocketContainer; + +import org.springframework.http.server.reactive.JettyCoreHttpHandlerAdapter; + +/** + * @author Rossen Stoyanchev + * @author Sam Brannen + * @author Greg Wilkins + * @since 6.2 + */ +public class JettyCoreHttpServer extends AbstractHttpServer { + + protected Log logger = LogFactory.getLog(getClass().getName()); + + private ArrayByteBufferPool byteBufferPool; + + private Server jettyServer; + + @Override + protected void initServer() { + if (logger.isTraceEnabled()) + this.byteBufferPool = new ArrayByteBufferPool.Tracking(); + this.jettyServer = new Server(null, null, byteBufferPool); + + ServerConnector connector = new ServerConnector(this.jettyServer); + connector.setHost(getHost()); + connector.setPort(getPort()); + this.jettyServer.addConnector(connector); + this.jettyServer.setHandler(createHandlerAdapter()); + + ServerWebSocketContainer.ensure(jettyServer); + } + + private JettyCoreHttpHandlerAdapter createHandlerAdapter() { + return new JettyCoreHttpHandlerAdapter(resolveHttpHandler()); + } + + @Override + protected void startInternal() throws Exception { + this.jettyServer.start(); + setPort(((ServerConnector) this.jettyServer.getConnectors()[0]).getLocalPort()); + } + + @Override + protected void stopInternal() { + boolean wasRunning = this.jettyServer.isRunning(); + try { + this.jettyServer.stop(); + } + catch (Exception ex) { + // ignore + } + + // TODO remove this or make debug only + if (wasRunning && this.byteBufferPool instanceof ArrayByteBufferPool.Tracking tracking) { + if (!tracking.getLeaks().isEmpty()) { + System.err.println("Leaks:\n" + tracking.dumpLeaks()); + throw new IllegalStateException("LEAKS"); + } + } + } + + @Override + protected void resetInternal() { + try { + if (this.jettyServer.isRunning()) { + stopInternal(); + } + this.jettyServer.destroy(); + } + finally { + this.jettyServer = null; + } + } +} diff --git a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/JettyHttpServer.java b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/JettyHttpServer.java index d3912ac2df8c..12878528ff1f 100644 --- a/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/JettyHttpServer.java +++ b/spring-web/src/testFixtures/java/org/springframework/web/testfixture/http/server/reactive/bootstrap/JettyHttpServer.java @@ -54,7 +54,6 @@ protected void initServer() throws Exception { connector.setPort(getPort()); this.jettyServer.addConnector(connector); this.jettyServer.setHandler(this.contextHandler); - this.contextHandler.start(); } private ServletHttpHandlerAdapter createServletAdapter() { @@ -70,24 +69,10 @@ protected void startInternal() throws Exception { @Override protected void stopInternal() throws Exception { try { - if (this.contextHandler.isRunning()) { - this.contextHandler.stop(); - } + this.jettyServer.stop(); } - finally { - try { - if (this.jettyServer.isRunning()) { - // Do not configure a large stop timeout. For example, setting a stop timeout - // of 5000 adds an additional 1-2 seconds to the runtime of each test using - // the Jetty sever, resulting in 2-4 extra minutes of overall build time. - this.jettyServer.setStopTimeout(100); - this.jettyServer.stop(); - this.jettyServer.destroy(); - } - } - catch (Exception ex) { - // ignore - } + catch (Exception ex) { + // ignore } } @@ -95,18 +80,14 @@ protected void stopInternal() throws Exception { protected void resetInternal() { try { if (this.jettyServer.isRunning()) { - // Do not configure a large stop timeout. For example, setting a stop timeout - // of 5000 adds an additional 1-2 seconds to the runtime of each test using - // the Jetty sever, resulting in 2-4 extra minutes of overall build time. - this.jettyServer.setStopTimeout(100); this.jettyServer.stop(); - this.jettyServer.destroy(); } } catch (Exception ex) { throw new IllegalStateException(ex); } finally { + this.jettyServer.destroy(); this.jettyServer = null; this.contextHandler = null; } diff --git a/spring-webflux/spring-webflux.gradle b/spring-webflux/spring-webflux.gradle index 29b7021f033b..93c7ee5b5292 100644 --- a/spring-webflux/spring-webflux.gradle +++ b/spring-webflux/spring-webflux.gradle @@ -27,6 +27,8 @@ dependencies { optional("org.eclipse.jetty.ee10.websocket:jetty-ee10-websocket-jetty-server") { exclude group: "jakarta.servlet", module: "jakarta.servlet-api" } + optional("org.eclipse.jetty.websocket:jetty-websocket-jetty-server") + optional("org.eclipse.jetty.websocket:jetty-websocket-jetty-client") optional("org.freemarker:freemarker") optional("org.jetbrains.kotlin:kotlin-reflect") optional("org.jetbrains.kotlin:kotlin-stdlib") diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java index 71c0a2c840c1..1e5ee65e0d75 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java @@ -22,16 +22,9 @@ import java.util.function.Function; import java.util.function.IntPredicate; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.api.Callback; -import org.eclipse.jetty.websocket.api.Frame; import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen; -import org.eclipse.jetty.websocket.api.annotations.WebSocket; -import org.eclipse.jetty.websocket.core.OpCode; import org.springframework.core.io.buffer.CloseableDataBuffer; import org.springframework.core.io.buffer.DataBuffer; @@ -44,18 +37,14 @@ import org.springframework.web.reactive.socket.WebSocketMessage.Type; /** - * Jetty {@link WebSocket @WebSocket} handler that delegates events to a + * Jetty {@link org.eclipse.jetty.websocket.api.Session.Listener} handler that delegates events to a * reactive {@link WebSocketHandler} and its session. * * @author Violeta Georgieva * @author Rossen Stoyanchev * @since 5.0 */ -@WebSocket -public class JettyWebSocketHandlerAdapter { - - private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer.wrap(new byte[0]); - +public class JettyWebSocketHandlerAdapter implements Session.Listener { private final WebSocketHandler delegateHandler; @@ -74,70 +63,62 @@ public JettyWebSocketHandlerAdapter(WebSocketHandler handler, this.sessionFactory = sessionFactory; } - - @OnWebSocketOpen + @Override public void onWebSocketOpen(Session session) { - this.delegateSession = this.sessionFactory.apply(session); - this.delegateHandler.handle(this.delegateSession) + JettyWebSocketSession delegateSession = this.sessionFactory.apply(session); + this.delegateSession = delegateSession; + this.delegateHandler.handle(delegateSession) .checkpoint(session.getUpgradeRequest().getRequestURI() + " [JettyWebSocketHandlerAdapter]") - .subscribe(this.delegateSession); + .subscribe(unused -> {}, delegateSession::onHandlerError, delegateSession::onHandleComplete); } - @OnWebSocketMessage + @Override public void onWebSocketText(String message) { - if (this.delegateSession != null) { - byte[] bytes = message.getBytes(StandardCharsets.UTF_8); - DataBuffer buffer = this.delegateSession.bufferFactory().wrap(bytes); - WebSocketMessage webSocketMessage = new WebSocketMessage(Type.TEXT, buffer); - this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); - } + Assert.state(this.delegateSession != null, "No delegate session available"); + byte[] bytes = message.getBytes(StandardCharsets.UTF_8); + DataBuffer buffer = this.delegateSession.bufferFactory().wrap(bytes); + WebSocketMessage webSocketMessage = new WebSocketMessage(Type.TEXT, buffer); + this.delegateSession.handleMessage(webSocketMessage); } - @OnWebSocketMessage + @Override public void onWebSocketBinary(ByteBuffer byteBuffer, Callback callback) { - if (this.delegateSession != null) { - DataBuffer buffer = this.delegateSession.bufferFactory().wrap(byteBuffer); - buffer = new JettyDataBuffer(buffer, callback); - WebSocketMessage webSocketMessage = new WebSocketMessage(Type.BINARY, buffer); - this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); - } + Assert.state(this.delegateSession != null, "No delegate session available"); + DataBuffer buffer = this.delegateSession.bufferFactory().wrap(byteBuffer); + buffer = new JettyCallbackDataBuffer(buffer, callback); + WebSocketMessage webSocketMessage = new WebSocketMessage(Type.BINARY, buffer); + this.delegateSession.handleMessage(webSocketMessage); } - @OnWebSocketFrame - public void onWebSocketFrame(Frame frame, Callback callback) { - if (this.delegateSession != null) { - if (OpCode.PONG == frame.getOpCode()) { - ByteBuffer byteBuffer = (frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD); - DataBuffer buffer = this.delegateSession.bufferFactory().wrap(byteBuffer); - buffer = new JettyDataBuffer(buffer, callback); - WebSocketMessage webSocketMessage = new WebSocketMessage(Type.PONG, buffer); - this.delegateSession.handleMessage(webSocketMessage.getType(), webSocketMessage); - } - } + @Override + public void onWebSocketPong(ByteBuffer payload) { + Assert.state(this.delegateSession != null, "No delegate session available"); + DataBuffer buffer = this.delegateSession.bufferFactory().wrap(BufferUtil.copy(payload)); + WebSocketMessage webSocketMessage = new WebSocketMessage(Type.PONG, buffer); + this.delegateSession.handleMessage(webSocketMessage); } - @OnWebSocketClose + @Override public void onWebSocketClose(int statusCode, String reason) { - if (this.delegateSession != null) { - this.delegateSession.handleClose(CloseStatus.create(statusCode, reason)); - } + Assert.state(this.delegateSession != null, "No delegate session available"); + this.delegateSession.handleClose(CloseStatus.create(statusCode, reason)); } - @OnWebSocketError + @Override public void onWebSocketError(Throwable cause) { - if (this.delegateSession != null) { - this.delegateSession.handleError(cause); - } + Assert.state(this.delegateSession != null, "No delegate session available"); + this.delegateSession.handleError(cause); } - private static final class JettyDataBuffer implements CloseableDataBuffer { + private static final class JettyCallbackDataBuffer implements CloseableDataBuffer { private final DataBuffer delegate; private final Callback callback; - public JettyDataBuffer(DataBuffer delegate, Callback callback) { + + public JettyCallbackDataBuffer(DataBuffer delegate, Callback callback) { Assert.notNull(delegate, "'delegate` must not be null"); Assert.notNull(callback, "Callback must not be null"); this.delegate = delegate; @@ -272,13 +253,13 @@ public DataBuffer write(ByteBuffer... buffers) { @Deprecated public DataBuffer slice(int index, int length) { DataBuffer delegateSlice = this.delegate.slice(index, length); - return new JettyDataBuffer(delegateSlice, this.callback); + return new JettyCallbackDataBuffer(delegateSlice, this.callback); } @Override public DataBuffer split(int index) { DataBuffer delegateSplit = this.delegate.split(index); - return new JettyDataBuffer(delegateSplit, this.callback); + return new JettyCallbackDataBuffer(delegateSplit, this.callback); } @Override diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index d6adca7e9298..35aeac20a808 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -16,18 +16,26 @@ package org.springframework.web.reactive.socket.adapter; -import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.websocket.api.Callback; import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.lang.Nullable; +import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.HandshakeInfo; @@ -36,13 +44,29 @@ /** * Spring {@link WebSocketSession} implementation that adapts to a Jetty - * WebSocket {@link org.eclipse.jetty.websocket.api.Session}. + * WebSocket {@link Session}. * * @author Violeta Georgieva * @author Rossen Stoyanchev * @since 5.0 */ -public class JettyWebSocketSession extends AbstractListenerWebSocketSession { +public class JettyWebSocketSession extends AbstractWebSocketSession { + + private final Flux flux; + + private final Sinks.One closeStatusSink = Sinks.one(); + + private final Lock lock = new ReentrantLock(); + + private long requested = 0; + + private boolean awaitingMessage = false; + + @Nullable + private FluxSink sink; + + @Nullable + private final Sinks.Empty handlerCompletionSink; public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) { this(session, info, factory, null); @@ -51,52 +75,88 @@ public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFact public JettyWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory, @Nullable Sinks.Empty completionSink) { - super(session, ObjectUtils.getIdentityHexString(session), info, factory, completionSink); - // TODO: suspend causes failures if invoked at this stage - // suspendReceiving(); + super(session, ObjectUtils.getIdentityHexString(session), info, factory); + this.handlerCompletionSink = completionSink; + this.flux = Flux.create(emitter -> { + this.sink = emitter; + emitter.onRequest(n -> { + boolean demand = false; + this.lock.lock(); + try { + this.requested = Math.addExact(this.requested, n); + if (this.requested < 0L) { + this.requested = Long.MAX_VALUE; + } + + if (!this.awaitingMessage && this.requested > 0) { + if (this.requested != Long.MAX_VALUE) { + this.requested--; + } + this.awaitingMessage = true; + demand = true; + } + } + finally { + this.lock.unlock(); + } + + if (demand) { + getDelegate().demand(); + } + }); + }); } + void handleMessage(WebSocketMessage message) { + Assert.state(this.sink != null, "No sink available"); + this.sink.next(message); - @Override - protected boolean canSuspendReceiving() { - // Jetty 12 TODO: research suspend functionality in Jetty 12 - return false; + boolean demand = false; + this.lock.lock(); + try { + if (!this.awaitingMessage) { + throw new IllegalStateException(); + } + this.awaitingMessage = false; + if (this.requested > 0) { + if (this.requested != Long.MAX_VALUE) { + this.requested--; + } + this.awaitingMessage = true; + demand = true; + } + } + finally { + this.lock.unlock(); + } + + if (demand) { + getDelegate().demand(); + } } - @Override - protected void suspendReceiving() { + void handleError(Throwable ex) { } - @Override - protected void resumeReceiving() { + void handleClose(CloseStatus closeStatus) { + this.closeStatusSink.tryEmitValue(closeStatus); + if (this.sink != null) { + this.sink.complete(); + } } - @Override - protected boolean sendMessage(WebSocketMessage message) throws IOException { - DataBuffer dataBuffer = message.getPayload(); - Session session = getDelegate(); - if (WebSocketMessage.Type.TEXT.equals(message.getType())) { - getSendProcessor().setReadyToSend(false); - String text = dataBuffer.toString(StandardCharsets.UTF_8); - session.sendText(text, new SendProcessorCallback()); + void onHandlerError(Throwable error) { + if (JettyWebSocketSession.this.handlerCompletionSink != null) { + JettyWebSocketSession.this.handlerCompletionSink.tryEmitError(error); } - else { - if (WebSocketMessage.Type.BINARY.equals(message.getType())) { - getSendProcessor().setReadyToSend(false); - } - try (DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers()) { - while (iterator.hasNext()) { - ByteBuffer byteBuffer = iterator.next(); - switch (message.getType()) { - case BINARY -> session.sendBinary(byteBuffer, new SendProcessorCallback()); - case PING -> session.sendPing(byteBuffer, new SendProcessorCallback()); - case PONG -> session.sendPong(byteBuffer, new SendProcessorCallback()); - default -> throw new IllegalArgumentException("Unexpected message type: " + message.getType()); - } - } - } + getDelegate().close(StatusCode.SERVER_ERROR, error.getMessage(), Callback.NOOP); + } + + void onHandleComplete() { + if (JettyWebSocketSession.this.handlerCompletionSink != null) { + JettyWebSocketSession.this.handlerCompletionSink.tryEmitEmpty(); } - return true; + getDelegate().close(StatusCode.NORMAL, null, Callback.NOOP); } @Override @@ -108,25 +168,81 @@ public boolean isOpen() { public Mono close(CloseStatus status) { Callback.Completable callback = new Callback.Completable(); getDelegate().close(status.getCode(), status.getReason(), callback); - return Mono.fromFuture(callback); } + @Override + public Mono closeStatus() { + return this.closeStatusSink.asMono(); + } - private final class SendProcessorCallback implements Callback { + @Override + public Flux receive() { + return this.flux; + } - @Override - public void fail(Throwable x) { - getSendProcessor().cancel(); - getSendProcessor().onError(x); - } + @Override + public Mono send(Publisher messages) { + return Flux.from(messages) + .flatMap(this::sendMessage, 1) + .then(); + } - @Override - public void succeed() { - getSendProcessor().setReadyToSend(true); - getSendProcessor().onWritePossible(); - } + protected Mono sendMessage(WebSocketMessage message) { + Callback.Completable completable = new Callback.Completable(); + DataBuffer dataBuffer = message.getPayload(); + Session session = getDelegate(); + if (WebSocketMessage.Type.TEXT.equals(message.getType())) { + String text = dataBuffer.toString(StandardCharsets.UTF_8); + session.sendText(text, completable); + } + else { + switch (message.getType()) { + case BINARY -> { + @SuppressWarnings("resource") + DataBuffer.ByteBufferIterator iterator = dataBuffer.readableByteBuffers(); + new IteratingCallback() { + @Override + protected Action process() { + if (!iterator.hasNext()) { + return Action.SUCCEEDED; + } + + ByteBuffer buffer = iterator.next(); + boolean last = iterator.hasNext(); + session.sendPartialBinary(buffer, last, Callback.from(this::succeeded, this::failed)); + return Action.SCHEDULED; + } + + @Override + protected void onCompleteSuccess() { + iterator.close(); + completable.succeed(); + } + + @Override + protected void onCompleteFailure(Throwable cause) { + iterator.close(); + completable.fail(cause); + } + }.iterate(); + } + case PING -> { + // Maximum size of Control frame payload is 125, per RFC 6455. + ByteBuffer buffer = BufferUtil.allocate(125); + dataBuffer.toByteBuffer(buffer); + session.sendPing(buffer, completable); + } + case PONG -> { + // Maximum size of Control frame payload is 125, per RFC 6455. + ByteBuffer buffer = BufferUtil.allocate(125); + dataBuffer.toByteBuffer(buffer); + session.sendPong(buffer, completable); + } + default -> throw new IllegalArgumentException("Unexpected message type: " + message.getType()); + } + } + return Mono.fromFuture(completable); } - } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java new file mode 100644 index 000000000000..bc770de68147 --- /dev/null +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java @@ -0,0 +1,111 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.client; + +import java.io.IOException; +import java.net.URI; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.client.Request; +import org.eclipse.jetty.client.Response; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.util.component.LifeCycle; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.JettyUpgradeListener; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; + +import org.springframework.context.Lifecycle; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.http.HttpHeaders; +import org.springframework.lang.Nullable; +import org.springframework.web.reactive.socket.HandshakeInfo; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession; + +public class JettyWebSocketClient implements WebSocketClient, Lifecycle { + + private final org.eclipse.jetty.websocket.client.WebSocketClient client; + + public JettyWebSocketClient() { + this(new org.eclipse.jetty.websocket.client.WebSocketClient()); + } + + public JettyWebSocketClient(org.eclipse.jetty.websocket.client.WebSocketClient client) { + this.client = client; + } + + @Override + public void start() { + LifeCycle.start(this.client); + } + + @Override + public void stop() { + LifeCycle.stop(this.client); + } + + @Override + public boolean isRunning() { + return this.client.isRunning(); + } + + @Override + public Mono execute(URI url, WebSocketHandler handler) { + return execute(url, null, handler); + } + + @Override + public Mono execute(URI url, @Nullable HttpHeaders headers, WebSocketHandler handler) { + + ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); + upgradeRequest.setSubProtocols(handler.getSubProtocols()); + if (headers != null) { + headers.keySet().forEach(header -> upgradeRequest.setHeader(header, headers.getValuesAsList(header))); + } + + final AtomicReference handshakeInfo = new AtomicReference<>(); + JettyUpgradeListener jettyUpgradeListener = new JettyUpgradeListener() { + @Override + public void onHandshakeResponse(Request request, Response response) { + String protocol = response.getHeaders().get(HttpHeader.SEC_WEBSOCKET_SUBPROTOCOL); + HttpHeaders responseHeaders = new HttpHeaders(); + response.getHeaders().forEach(header -> responseHeaders.add(header.getName(), header.getValue())); + handshakeInfo.set(new HandshakeInfo(url, responseHeaders, Mono.empty(), protocol)); + } + }; + + Sinks.Empty completion = Sinks.empty(); + JettyWebSocketHandlerAdapter handlerAdapter = new JettyWebSocketHandlerAdapter(handler, session -> + new JettyWebSocketSession(session, Objects.requireNonNull(handshakeInfo.get()), DefaultDataBufferFactory.sharedInstance, completion)); + try { + this.client.connect(handlerAdapter, url, upgradeRequest, jettyUpgradeListener) + .exceptionally(throwable -> { + // Only fail the completion if we have an error + // as the JettyWebSocketSession will never be opened. + completion.tryEmitError(throwable); + return null; + }); + return completion.asMono(); + } + catch (IOException ex) { + return Mono.error(ex); + } + } +} diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java index 81b5326e817f..603d8e618340 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java @@ -43,6 +43,7 @@ import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.WebSocketService; +import org.springframework.web.reactive.socket.server.upgrade.JettyCoreRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.JettyRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.ReactorNetty2RequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy; @@ -76,6 +77,8 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle { private static final boolean jettyWsPresent; + private static final boolean jettyCoreWsPresent; + private static final boolean undertowWsPresent; private static final boolean reactorNettyPresent; @@ -88,6 +91,8 @@ public class HandshakeWebSocketService implements WebSocketService, Lifecycle { "org.apache.tomcat.websocket.server.WsHttpUpgradeHandler", classLoader); jettyWsPresent = ClassUtils.isPresent( "org.eclipse.jetty.ee10.websocket.server.JettyWebSocketServerContainer", classLoader); + jettyCoreWsPresent = ClassUtils.isPresent( + "org.eclipse.jetty.websocket.server.ServerWebSocketContainer", classLoader); undertowWsPresent = ClassUtils.isPresent( "io.undertow.websockets.WebSocketProtocolHandshakeHandler", classLoader); reactorNettyPresent = ClassUtils.isPresent( @@ -278,6 +283,9 @@ static RequestUpgradeStrategy initUpgradeStrategy() { else if (jettyWsPresent) { return new JettyRequestUpgradeStrategy(); } + else if (jettyCoreWsPresent) { + return new JettyCoreRequestUpgradeStrategy(); + } else if (undertowWsPresent) { return new UndertowRequestUpgradeStrategy(); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyCoreRequestUpgradeStrategy.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyCoreRequestUpgradeStrategy.java new file mode 100644 index 000000000000..509981c6dced --- /dev/null +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyCoreRequestUpgradeStrategy.java @@ -0,0 +1,127 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.server.upgrade; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +import org.eclipse.jetty.ee10.websocket.server.JettyWebSocketServerContainer; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.websocket.api.Configurable; +import org.eclipse.jetty.websocket.api.exceptions.WebSocketException; +import org.eclipse.jetty.websocket.server.ServerWebSocketContainer; +import org.eclipse.jetty.websocket.server.WebSocketCreator; +import reactor.core.publisher.Mono; + +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpRequestDecorator; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.http.server.reactive.ServerHttpResponseDecorator; +import org.springframework.lang.Nullable; +import org.springframework.web.reactive.socket.HandshakeInfo; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.adapter.ContextWebSocketHandler; +import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.adapter.JettyWebSocketSession; +import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; +import org.springframework.web.server.ServerWebExchange; + +/** + * A WebSocket {@code RequestUpgradeStrategy} for Jetty 12 Core. + * + * @author Rossen Stoyanchev + * @since 5.3.4 + */ +public class JettyCoreRequestUpgradeStrategy implements RequestUpgradeStrategy { + + @Nullable + private Consumer webSocketConfigurer; + + @Nullable + private ServerWebSocketContainer serverContainer; + + /** + * Add a callback to configure WebSocket server parameters on + * {@link JettyWebSocketServerContainer}. + * @since 6.1 + */ + public void addWebSocketConfigurer(Consumer webSocketConfigurer) { + this.webSocketConfigurer = (this.webSocketConfigurer != null ? + this.webSocketConfigurer.andThen(webSocketConfigurer) : webSocketConfigurer); + } + + @Override + public Mono upgrade( + ServerWebExchange exchange, WebSocketHandler handler, + @Nullable String subProtocol, Supplier handshakeInfoFactory) { + + ServerHttpRequest request = exchange.getRequest(); + ServerHttpResponse response = exchange.getResponse(); + + Request jettyRequest = ServerHttpRequestDecorator.getNativeRequest(request); + Response jettyResponse = ServerHttpResponseDecorator.getNativeResponse(response); + + HandshakeInfo handshakeInfo = handshakeInfoFactory.get(); + DataBufferFactory factory = response.bufferFactory(); + + // Trigger WebFlux preCommit actions before upgrade + return exchange.getResponse().setComplete() + .then(Mono.deferContextual(contextView -> { + JettyWebSocketHandlerAdapter adapter = new JettyWebSocketHandlerAdapter( + ContextWebSocketHandler.decorate(handler, contextView), + session -> new JettyWebSocketSession(session, handshakeInfo, factory)); + + WebSocketCreator webSocketCreator = (upgradeRequest, upgradeResponse, callback) -> { + if (subProtocol != null) { + upgradeResponse.setAcceptedSubProtocol(subProtocol); + } + return adapter; + }; + + Callback.Completable callback = new Callback.Completable(); + Mono mono = Mono.fromFuture(callback); + ServerWebSocketContainer container = getWebSocketServerContainer(jettyRequest); + try { + if (!container.upgrade(webSocketCreator, jettyRequest, jettyResponse, callback)) { + throw new WebSocketException("request could not be upgraded to websocket"); + } + } + catch (WebSocketException ex) { + callback.failed(ex); + } + + return mono; + })); + } + + private ServerWebSocketContainer getWebSocketServerContainer(Request jettyRequest) { + if (this.serverContainer == null) { + Server server = jettyRequest.getConnectionMetaData().getConnector().getServer(); + ServerWebSocketContainer container = ServerWebSocketContainer.get(server.getContext()); + if (this.webSocketConfigurer != null) { + this.webSocketConfigurer.accept(container); + } + this.serverContainer = container; + } + return this.serverContainer; + } + +} diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java index 66b41aa3acf1..46d8b7090365 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/server/upgrade/JettyRequestUpgradeStrategy.java @@ -42,7 +42,7 @@ import org.springframework.web.server.ServerWebExchange; /** - * A WebSocket {@code RequestUpgradeStrategy} for Jetty 11. + * A WebSocket {@code RequestUpgradeStrategy} for Jetty 12 EE10. * * @author Rossen Stoyanchev * @since 5.3.4 diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ContextPathIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ContextPathIntegrationTests.java index 293cbdeec5bd..06c01fbfef69 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ContextPathIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/ContextPathIntegrationTests.java @@ -16,7 +16,12 @@ package org.springframework.web.reactive.result.method.annotation; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Named; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; @@ -28,10 +33,16 @@ import org.springframework.web.client.RestTemplate; import org.springframework.web.reactive.config.EnableWebFlux; import org.springframework.web.server.adapter.WebHttpHandlerBuilder; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpServer; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyCoreHttpServer; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.UndertowHttpServer; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Named.named; /** * Integration tests related to the use of context paths. @@ -40,15 +51,25 @@ */ class ContextPathIntegrationTests { - @Test - void multipleWebFluxApps() throws Exception { + static Stream> httpServers() { + return Stream.of( + named("Jetty", new JettyHttpServer()), + named("Jetty Core", new JettyCoreHttpServer()), + named("Reactor Netty", new ReactorHttpServer()), + named("Tomcat", new TomcatHttpServer()), + named("Undertow", new UndertowHttpServer()) + ); + } + + @ParameterizedTest(name = "[{index}] {0}") + @MethodSource("httpServers") + void multipleWebFluxApps(AbstractHttpServer server) throws Exception { AnnotationConfigApplicationContext context1 = new AnnotationConfigApplicationContext(WebAppConfig.class); AnnotationConfigApplicationContext context2 = new AnnotationConfigApplicationContext(WebAppConfig.class); HttpHandler webApp1Handler = WebHttpHandlerBuilder.applicationContext(context1).build(); HttpHandler webApp2Handler = WebHttpHandlerBuilder.applicationContext(context2).build(); - ReactorHttpServer server = new ReactorHttpServer(); server.registerHttpHandler("/webApp1", webApp1Handler); server.registerHttpHandler("/webApp2", webApp2Handler); server.afterPropertiesSet(); diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java index 6ffd05022e82..39b24fc58cdd 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java @@ -53,6 +53,7 @@ import org.springframework.web.server.adapter.WebHttpHandlerBuilder; import org.springframework.web.testfixture.http.server.reactive.bootstrap.AbstractHttpHandlerIntegrationTests; import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyCoreHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer; @@ -127,7 +128,7 @@ void sseAsPerson(HttpServer httpServer, ClientHttpConnector connector) throws Ex @ParameterizedSseTest void sseAsEvent(HttpServer httpServer, ClientHttpConnector connector) throws Exception { - assumeTrue(httpServer instanceof JettyHttpServer); + assumeTrue(httpServer instanceof JettyHttpServer || httpServer instanceof JettyCoreHttpServer); startServer(httpServer, connector); @@ -302,18 +303,21 @@ public String toString() { static Stream arguments() { return Stream.of( - args(new JettyHttpServer(), new ReactorClientHttpConnector()), - args(new JettyHttpServer(), new JettyClientHttpConnector()), - args(new JettyHttpServer(), new HttpComponentsClientHttpConnector()), - args(new ReactorHttpServer(), new ReactorClientHttpConnector()), - args(new ReactorHttpServer(), new JettyClientHttpConnector()), - args(new ReactorHttpServer(), new HttpComponentsClientHttpConnector()), - args(new TomcatHttpServer(), new ReactorClientHttpConnector()), - args(new TomcatHttpServer(), new JettyClientHttpConnector()), - args(new TomcatHttpServer(), new HttpComponentsClientHttpConnector()), - args(new UndertowHttpServer(), new ReactorClientHttpConnector()), - args(new UndertowHttpServer(), new JettyClientHttpConnector()), - args(new UndertowHttpServer(), new HttpComponentsClientHttpConnector()) + args(new JettyHttpServer(), new ReactorClientHttpConnector()), + args(new JettyHttpServer(), new JettyClientHttpConnector()), + args(new JettyHttpServer(), new HttpComponentsClientHttpConnector()), + args(new JettyCoreHttpServer(), new ReactorClientHttpConnector()), + args(new JettyCoreHttpServer(), new JettyClientHttpConnector()), + args(new JettyCoreHttpServer(), new HttpComponentsClientHttpConnector()), + args(new ReactorHttpServer(), new ReactorClientHttpConnector()), + args(new ReactorHttpServer(), new JettyClientHttpConnector()), + args(new ReactorHttpServer(), new HttpComponentsClientHttpConnector()), + args(new TomcatHttpServer(), new ReactorClientHttpConnector()), + args(new TomcatHttpServer(), new JettyClientHttpConnector()), + args(new TomcatHttpServer(), new HttpComponentsClientHttpConnector()), + args(new UndertowHttpServer(), new ReactorClientHttpConnector()), + args(new UndertowHttpServer(), new JettyClientHttpConnector()), + args(new UndertowHttpServer(), new HttpComponentsClientHttpConnector()) ); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractReactiveWebSocketIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractReactiveWebSocketIntegrationTests.java index d6954505fa1e..9392f0cca98e 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractReactiveWebSocketIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/AbstractReactiveWebSocketIntegrationTests.java @@ -45,6 +45,7 @@ import org.springframework.http.server.reactive.HttpHandler; import org.springframework.web.filter.reactive.ServerWebExchangeContextFilter; import org.springframework.web.reactive.DispatcherHandler; +import org.springframework.web.reactive.socket.client.JettyWebSocketClient; import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; import org.springframework.web.reactive.socket.client.TomcatWebSocketClient; import org.springframework.web.reactive.socket.client.UndertowWebSocketClient; @@ -53,6 +54,7 @@ import org.springframework.web.reactive.socket.server.WebSocketService; import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService; import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.server.upgrade.JettyCoreRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.JettyRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.ReactorNetty2RequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy; @@ -61,6 +63,7 @@ import org.springframework.web.server.WebFilter; import org.springframework.web.server.adapter.WebHttpHandlerBuilder; import org.springframework.web.testfixture.http.server.reactive.bootstrap.HttpServer; +import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyCoreHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.JettyHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.ReactorHttpServer; import org.springframework.web.testfixture.http.server.reactive.bootstrap.TomcatHttpServer; @@ -90,6 +93,7 @@ static Stream arguments() throws IOException { WebSocketClient[] clients = new WebSocketClient[] { new TomcatWebSocketClient(), + new JettyWebSocketClient(), new ReactorNettyWebSocketClient(), new UndertowWebSocketClient(Xnio.getInstance().createWorker(OptionMap.EMPTY)) }; @@ -97,6 +101,7 @@ static Stream arguments() throws IOException { Map> servers = new LinkedHashMap<>(); servers.put(new TomcatHttpServer(TMP_DIR.getAbsolutePath(), WsContextListener.class), TomcatConfig.class); servers.put(new JettyHttpServer(), JettyConfig.class); + servers.put(new JettyCoreHttpServer(), JettyCoreConfig.class); servers.put(new ReactorHttpServer(), ReactorNettyConfig.class); servers.put(new UndertowHttpServer(), UndertowConfig.class); @@ -241,4 +246,12 @@ protected RequestUpgradeStrategy getUpgradeStrategy() { } } + @Configuration + static class JettyCoreConfig extends AbstractHandlerAdapterConfig { + + @Override + protected RequestUpgradeStrategy getUpgradeStrategy() { + return new JettyCoreRequestUpgradeStrategy(); + } + } } diff --git a/spring-websocket/spring-websocket.gradle b/spring-websocket/spring-websocket.gradle index 72df03b20dd0..2250f5fdf38b 100644 --- a/spring-websocket/spring-websocket.gradle +++ b/spring-websocket/spring-websocket.gradle @@ -19,6 +19,7 @@ dependencies { optional("org.eclipse.jetty.ee10:jetty-ee10-webapp") { exclude group: "jakarta.servlet", module: "jakarta.servlet-api" } + optional("org.eclipse.jetty.websocket:jetty-websocket-jetty-api") optional("org.eclipse.jetty.ee10.websocket:jetty-ee10-websocket-jakarta-server") optional("org.eclipse.jetty.ee10.websocket:jetty-ee10-websocket-jetty-server") { exclude group: "jakarta.servlet", module: "jakarta.servlet-api" diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/JettyWebSocketHandlerAdapter.java b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/JettyWebSocketHandlerAdapter.java index b996b920e1c3..f57dff500877 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/JettyWebSocketHandlerAdapter.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/jetty/JettyWebSocketHandlerAdapter.java @@ -20,17 +20,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.websocket.api.Callback; -import org.eclipse.jetty.websocket.api.Frame; import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen; -import org.eclipse.jetty.websocket.api.annotations.WebSocket; -import org.eclipse.jetty.websocket.core.OpCode; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.web.socket.BinaryMessage; import org.springframework.web.socket.CloseStatus; @@ -40,23 +34,22 @@ import org.springframework.web.socket.handler.ExceptionWebSocketHandlerDecorator; /** - * Adapts {@link WebSocketHandler} to the Jetty WebSocket API. + * Adapts {@link WebSocketHandler} to the Jetty WebSocket API {@link org.eclipse.jetty.websocket.api.Session.Listener}. * * @author Rossen Stoyanchev * @since 4.0 */ -@WebSocket -public class JettyWebSocketHandlerAdapter { - - private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer.wrap(new byte[0]); +public class JettyWebSocketHandlerAdapter implements Session.Listener { private static final Log logger = LogFactory.getLog(JettyWebSocketHandlerAdapter.class); - private final WebSocketHandler webSocketHandler; private final JettyWebSocketSession wsSession; + @Nullable + private Session nativeSession; + public JettyWebSocketHandlerAdapter(WebSocketHandler webSocketHandler, JettyWebSocketSession wsSession) { Assert.notNull(webSocketHandler, "WebSocketHandler must not be null"); @@ -65,69 +58,60 @@ public JettyWebSocketHandlerAdapter(WebSocketHandler webSocketHandler, JettyWebS this.wsSession = wsSession; } - - @OnWebSocketOpen + @Override public void onWebSocketOpen(Session session) { try { + this.nativeSession = session; this.wsSession.initializeNativeSession(session); this.webSocketHandler.afterConnectionEstablished(this.wsSession); + this.nativeSession.demand(); } catch (Exception ex) { - ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); + tryCloseWithError(ex); } } - @OnWebSocketMessage + @Override public void onWebSocketText(String payload) { + Assert.state(this.nativeSession != null, "No native session available"); TextMessage message = new TextMessage(payload); try { this.webSocketHandler.handleMessage(this.wsSession, message); + this.nativeSession.demand(); } catch (Exception ex) { - ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); + tryCloseWithError(ex); } } - @OnWebSocketMessage + @Override public void onWebSocketBinary(ByteBuffer payload, Callback callback) { - BinaryMessage message = new BinaryMessage(copyByteBuffer(payload), true); + Assert.state(this.nativeSession != null, "No native session available"); + BinaryMessage message = new BinaryMessage(BufferUtil.copy(payload), true); + callback.succeed(); try { this.webSocketHandler.handleMessage(this.wsSession, message); - callback.succeed(); + this.nativeSession.demand(); } catch (Exception ex) { - callback.fail(ex); - ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); + tryCloseWithError(ex); } } - @OnWebSocketFrame - public void onWebSocketFrame(Frame frame, Callback callback) { - if (OpCode.PONG == frame.getOpCode()) { - ByteBuffer payload = frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD; - PongMessage message = new PongMessage(copyByteBuffer(payload)); - try { - this.webSocketHandler.handleMessage(this.wsSession, message); - callback.succeed(); - } - catch (Exception ex) { - callback.fail(ex); - ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); - } + @Override + public void onWebSocketPong(ByteBuffer payload) { + Assert.state(this.nativeSession != null, "No native session available"); + PongMessage message = new PongMessage(BufferUtil.copy(payload)); + try { + this.webSocketHandler.handleMessage(this.wsSession, message); + this.nativeSession.demand(); } - else { - callback.succeed(); + catch (Exception ex) { + tryCloseWithError(ex); } } - private static ByteBuffer copyByteBuffer(ByteBuffer src) { - ByteBuffer dest = ByteBuffer.allocate(src.remaining()); - dest.put(src); - dest.flip(); - return dest; - } - - @OnWebSocketClose + @Override public void onWebSocketClose(int statusCode, String reason) { CloseStatus closeStatus = new CloseStatus(statusCode, reason); try { @@ -135,19 +119,32 @@ public void onWebSocketClose(int statusCode, String reason) { } catch (Exception ex) { if (logger.isWarnEnabled()) { - logger.warn("Unhandled exception after connection closed for " + this, ex); + logger.warn("Unhandled exception from afterConnectionClosed for " + this, ex); } } } - @OnWebSocketError + @Override public void onWebSocketError(Throwable cause) { try { this.webSocketHandler.handleTransportError(this.wsSession, cause); } catch (Exception ex) { - ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, ex, logger); + if (logger.isWarnEnabled()) { + logger.warn("Unhandled exception from handleTransportError for " + this, ex); + } } } + private void tryCloseWithError(Throwable t) { + if (this.nativeSession != null) { + if (this.nativeSession.isOpen()) { + ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger); + } + else { + // Session might be O-SHUT waiting for response close frame, so abort to close the connection. + this.nativeSession.disconnect(); + } + } + } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/server/jetty/JettyRequestUpgradeStrategy.java b/spring-websocket/src/main/java/org/springframework/web/socket/server/jetty/JettyRequestUpgradeStrategy.java index 026f9af4fd73..2ed4542e3111 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/server/jetty/JettyRequestUpgradeStrategy.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/server/jetty/JettyRequestUpgradeStrategy.java @@ -45,7 +45,7 @@ import org.springframework.web.socket.server.RequestUpgradeStrategy; /** - * A {@link RequestUpgradeStrategy} for Jetty 11. + * A {@link RequestUpgradeStrategy} for Jetty 12 EE10. * * @author Rossen Stoyanchev * @since 5.3.4