Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.dubbo.springboot.demo.servlet;

import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.remoting.http12.message.ServerSentEvent;

import java.util.concurrent.CompletableFuture;

Expand All @@ -39,6 +40,8 @@ public interface GreeterService {

void sayHelloServerStreamNoParameter(StreamObserver<HelloReply> responseObserver);

void sayHelloServerStreamSSE(StreamObserver<ServerSentEvent<HelloReply>> responseObserver);

/**
* Sends greetings with bi streaming
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.config.annotation.DubboService;
import org.apache.dubbo.remoting.http12.message.ServerSentEvent;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
Expand Down Expand Up @@ -63,6 +65,26 @@ public void sayHelloServerStreamNoParameter(StreamObserver<HelloReply> responseO
responseObserver.onCompleted();
}

@Override
public void sayHelloServerStreamSSE(StreamObserver<ServerSentEvent<HelloReply>> responseObserver) {
LOGGER.info("Received sayHelloServerStreamSSE request");
responseObserver.onNext(ServerSentEvent.<HelloReply>builder()
.retry(Duration.ofSeconds(20))
.build());
responseObserver.onNext(ServerSentEvent.<HelloReply>builder()
.event("say")
.comment("hello world")
.build());
for (int i = 1; i < 6; i++) {
LOGGER.info("sayHelloServerStreamSSE onNext: {} times", i);
responseObserver.onNext(ServerSentEvent.<HelloReply>builder()
.data(toReply("Hello " + ' ' + i + " times"))
.build());
}
LOGGER.info("sayHelloServerStreamSSE onCompleted");
responseObserver.onCompleted();
}

@Override
public StreamObserver<HelloRequest> sayHelloBiStream(StreamObserver<HelloReply> responseObserver) {
LOGGER.info("Received sayHelloBiStream request");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ protected Object resolveValue(NamedValueMeta meta, HttpRequest request, HttpResp
if (value != null) {
return value;
}
if (meta.parameter().isStream()) {
return null;
}
if (meta.parameter().isSimple()) {
return request.parameter(meta.name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ protected NamedValueMeta createNamedValueMeta(ParameterMeta param) {
@Override
protected Object resolveValue(NamedValueMeta meta, HttpRequest request, HttpResponse response) {
ParameterMeta parameter = meta.parameter();
if (parameter.isStream()) {
return null;
}
if (parameter.isSimple()) {
return request.parameter(meta.name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.dubbo.remoting.http12;

import java.nio.charset.StandardCharsets;

public final class HttpConstants {

public static final String TRAILERS = "trailers";
Expand All @@ -33,8 +31,5 @@ public final class HttpConstants {
public static final String HTTPS = "https";
public static final String HTTP = "http";

public static final byte[] SERVER_SENT_EVENT_DATA_PREFIX_BYTES = "data:".getBytes(StandardCharsets.US_ASCII);
public static final byte[] SERVER_SENT_EVENT_LF_BYTES = "\n\n".getBytes(StandardCharsets.US_ASCII);

private HttpConstants() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.http12.message;

import java.time.Duration;

/**
* Represents a Server-Sent Event according to the HTML specification.
* <p>
* Server-Sent Events (SSE) is a server push technology enabling a client to receive automatic updates from a server via HTTP connection.
* The server can send new data to the client at any time by pushing messages, without the need to reestablish the connection.
* <p>
* This class encapsulates the structure of a Server-Sent Event, which may include:
* <ul>
* <li>An event ID</li>
* <li>An event type</li>
* <li>A retry interval</li>
* <li>A comment</li>
* <li>Data payload</li>
* </ul>
* <p>
* Use the {@link #builder()} method to create instances of this class.
*
* @param <T> the type of data that this event contains
* @see <a href="https://html.spec.whatwg.org/multipage/server-sent-events.html">Server-Sent Events</a>
*/
public final class ServerSentEvent<T> {

/**
* The event ID that can be used for tracking or resuming event streams.
*/
private final String id;

/**
* The event type or name that identifies the type of event.
*/
private final String event;

/**
* The reconnection time in milliseconds that the client should wait before reconnecting
* after a connection is closed.
*/
private final Duration retry;

/**
* A comment that will be ignored by event-processing clients but can be useful for debugging.
*/
private final String comment;

/**
* The data payload of this event.
*/
private final T data;

/**
* Constructs a new ServerSentEvent with the specified properties.
* <p>
* It's recommended to use the {@link #builder()} method instead of this constructor directly.
*
* @param id the event ID, can be null
* @param event the event type, can be null
* @param retry the reconnection time, can be null
* @param comment the comment, can be null
* @param data the data payload, can be null
*/
public ServerSentEvent(String id, String event, Duration retry, String comment, T data) {
this.id = id;
this.event = event;
this.retry = retry;
this.comment = comment;
this.data = data;
}

/**
* Returns the event ID.
*
* @return the event ID, may be null
*/
public String getId() {
return id;
}

/**
* Returns the event type.
*
* @return the event type, may be null
*/
public String getEvent() {
return event;
}

/**
* Returns the reconnection time that clients should wait before reconnecting.
*
* @return the reconnection time as a Duration, may be null
*/
public Duration getRetry() {
return retry;
}

/**
* Returns the comment associated with this event.
*
* @return the comment, may be null
*/
public String getComment() {
return comment;
}

/**
* Returns the data payload of this event.
*
* @return the data payload, may be null
*/
public T getData() {
return data;
}

@Override
public String toString() {
return "ServerSentEvent{id='" + id + '\'' + ", event='" + event + '\'' + ", retry=" + retry + ", comment='"
+ comment + '\'' + ", data=" + data + '}';
}

/**
* Creates a new {@link Builder} instance.
*
* @param <T> the type of data that the event will contain
* @return a new builder
*/
public static <T> Builder<T> builder() {
return new Builder<>();
}

/**
* Builder for {@link ServerSentEvent}.
*
* @param <T> the type of data that the event will contain
*/
public static final class Builder<T> {
private String id;
private String event;
private Duration retry;
private String comment;
private T data;

private Builder() {}

/**
* Sets the id of the event.
*
* @param id the id
* @return this builder
*/
public Builder<T> id(String id) {
this.id = id;
return this;
}

/**
* Sets the event type.
*
* @param event the event type
* @return this builder
*/
public Builder<T> event(String event) {
this.event = event;
return this;
}

/**
* Sets the retry duration.
*
* @param retry the retry duration
* @return this builder
*/
public Builder<T> retry(Duration retry) {
this.retry = retry;
return this;
}

/**
* Sets the comment.
*
* @param comment the comment
* @return this builder
*/
public Builder<T> comment(String comment) {
this.comment = comment;
return this;
}

/**
* Sets the data.
*
* @param data the data
* @return this builder
*/
public Builder<T> data(T data) {
this.data = data;
return this;
}

/**
* Builds a new {@link ServerSentEvent} with the configured properties.
*
* @return the built event
*/
public ServerSentEvent<T> build() {
return new ServerSentEvent<>(id, event, retry, comment, data);
}
}
}
Loading
Loading