Skip to content

Commit

Permalink
support websocket text payload
Browse files Browse the repository at this point in the history
  • Loading branch information
freynder committed Apr 12, 2023
1 parent bbc4164 commit bc96657
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
obj.setMaxSubscriptionsByClient(((Number)member.getValue()).intValue());
}
break;
case "payloadMode":
if (member.getValue() instanceof String) {
obj.setPayloadMode(io.vertx.ext.stomp.PayloadMode.valueOf((String)member.getValue()));
}
break;
case "secured":
if (member.getValue() instanceof Boolean) {
obj.setSecured((Boolean)member.getValue());
Expand Down Expand Up @@ -112,6 +117,9 @@ public static void toJson(StompServerOptions obj, java.util.Map<String, Object>
json.put("maxHeaderLength", obj.getMaxHeaderLength());
json.put("maxHeaders", obj.getMaxHeaders());
json.put("maxSubscriptionsByClient", obj.getMaxSubscriptionsByClient());
if (obj.getPayloadMode() != null) {
json.put("payloadMode", obj.getPayloadMode().name());
}
json.put("secured", obj.isSecured());
json.put("sendErrorOnNoSubscriptions", obj.isSendErrorOnNoSubscriptions());
if (obj.getSupportedVersions() != null) {
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/io/vertx/ext/stomp/Destination.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,25 @@ static Destination bridge(Vertx vertx, BridgeOptions options) {
@Fluent
Destination dispatch(StompServerConnection connection, Frame frame);

/**
* Dispatches the given frame as a text frame
*
* @param connection the connection
* @param frame the frame
* @return the current instance of {@link Destination}
*/
@Fluent
Destination dispatchText(StompServerConnection connection, Frame frame);

/**
* Dispatches the given frame as a binary frame
*
* @param connection the connection
* @param frame the frame
* @return the current instance of {@link Destination}
*/
Destination dispatchBinary(StompServerConnection connection, Frame frame);

/**
* Handles a subscription request to the current {@link Destination}.
*
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/io/vertx/ext/stomp/PayloadMode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.vertx.ext.stomp;

public enum PayloadMode {
TEXT,
BINARY
}
14 changes: 12 additions & 2 deletions src/main/java/io/vertx/ext/stomp/StompServerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,24 @@
public interface StompServerConnection {

/**
* Writes the given frame to the socket.
* Writes the given frame to the socket using default payload type
*
* @param frame the frame, must not be {@code null}.
* @param frame the frame, must not be {@code null}.
* @return the current {@link StompServerConnection}
*/
@Fluent
StompServerConnection write(Frame frame);

/**
* Writes the given frame to the socket.
*
* @param frame the frame, must not be {@code null}.
* @param payloadMode explicitely specify the payload type for the underlying socket to use (e.g. websockets)
* @return the current {@link StompServerConnection}
*/
@Fluent
StompServerConnection write(Frame frame, PayloadMode payloadMode);

/**
* Writes the given buffer to the socket. This is a low level API that should be used carefully.
*
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/io/vertx/ext/stomp/StompServerOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class StompServerOptions extends NetServerOptions implements StompOptions

public static final String DEFAULT_WEBSOCKET_PATH = "/stomp";

public static final PayloadMode DEFAULT_PAYLOAD_MODE = PayloadMode.BINARY;

private int maxHeaderLength = DEFAULT_MAX_HEADER_LENGTH;
private int maxHeaders = DEFAULT_MAX_HEADERS;
Expand All @@ -68,6 +69,7 @@ public class StompServerOptions extends NetServerOptions implements StompOptions
private boolean disableTCPServer;
private boolean trailingLine = DEFAULT_TRAILING_LINE;

private PayloadMode payloadMode = DEFAULT_PAYLOAD_MODE;
/**
* Default constructor.
*/
Expand Down Expand Up @@ -102,6 +104,8 @@ public StompServerOptions(StompServerOptions other) {

this.disableTCPServer = other.disableTCPServer;
this.trailingLine = other.trailingLine;

this.payloadMode = other.payloadMode;
}

/**
Expand Down Expand Up @@ -471,4 +475,24 @@ public StompServerOptions setTrailingLine(boolean trailingLine) {
this.trailingLine = trailingLine;
return this;
}

/**
* Specify the default payload type to be used by the underlying socket. Useful for websocket transports.
*
* @return the default payload mode
*/
public PayloadMode getPayloadMode() {
return payloadMode;
}

/**
* Specify the default payload type to be used by the underlying socket. Useful for websocket transports.
*
* @param payloadMode the default payload mode to use
* @return the current {@link StompServerOptions}
*/
public StompServerOptions setPayloadMode(PayloadMode payloadMode) {
this.payloadMode = payloadMode;
return this;
}
}
6 changes: 5 additions & 1 deletion src/main/java/io/vertx/ext/stomp/impl/FrameParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ private void handleLine(Buffer buffer) {
String length = headers.get(Frame.CONTENT_LENGTH);
if (length != null) {
int contentLength = Integer.parseInt(length);
frameParser.fixedSizeMode(contentLength);
if (contentLength != 0) {
frameParser.fixedSizeMode(contentLength);
} else {
frameParser.delimitedMode(NULL);
}
} else {
frameParser.delimitedMode(NULL);
}
Expand Down
34 changes: 25 additions & 9 deletions src/main/java/io/vertx/ext/stomp/impl/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
package io.vertx.ext.stomp.impl;

import io.vertx.core.Vertx;
import io.vertx.ext.stomp.Command;
import io.vertx.ext.stomp.Destination;
import io.vertx.ext.stomp.Frame;
import io.vertx.ext.stomp.StompServerConnection;
import io.vertx.ext.stomp.*;
import io.vertx.ext.stomp.utils.Headers;

import java.util.ArrayList;
Expand Down Expand Up @@ -58,23 +55,42 @@ public String destination() {
/**
* Dispatches the given frame.
*
* @param connection the connection
* @param frame the frame ({@code SEND} frame).
* @param connection the connection
* @param frame the frame
* @param payloadMode only for websocket bridge, explicitely specify payload type or null
* @return the current instance of {@link Destination}
*/
@Override
public synchronized Destination dispatch(StompServerConnection connection, Frame frame) {
private synchronized Destination dispatch(StompServerConnection connection, Frame frame, PayloadMode payloadMode) {
if (subscriptions.isEmpty()) {
lastUsedSubscriptions = -1;
return this;
}
Subscription subscription = getNextSubscription();
String messageId = UUID.randomUUID().toString();
Frame message = transform(frame, subscription, messageId);
subscription.connection.write(message);
if(payloadMode == null) {
subscription.connection.write(message); // Uses server defaults
} else {
subscription.connection.write(message, payloadMode); // Explicit
}
return this;
}

@Override
public Destination dispatch(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, null);
}

@Override
public Destination dispatchText(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, PayloadMode.TEXT);
}

@Override
public Destination dispatchBinary(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, PayloadMode.BINARY);
}

private Subscription getNextSubscription() {
lastUsedSubscriptions = lastUsedSubscriptions + 1;
if (lastUsedSubscriptions >= subscriptions.size()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class StompServerTCPConnectionImpl implements StompServerConnection {

private static final Logger log = LoggerFactory.getLogger(StompServerTCPConnectionImpl.class);

private final StompServer server;
protected final StompServer server;
private final NetSocket socket;
private final String sessionId;
protected final Handler<ServerFrame> handler;
Expand Down Expand Up @@ -72,6 +72,11 @@ public StompServerConnection write(Frame frame) {
return write(frame.toBuffer(server.options().isTrailingLine()));
}

@Override
public StompServerConnection write(Frame frame, PayloadMode payloadMode) {
return write(frame);
}

@Override
public StompServerConnection write(Buffer buffer) {
socket.write(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,26 @@ public StompServerWebSocketConnectionImpl(ServerWebSocket socket, StompServer se
}

@Override
public SSLSession sslSession() {
return this.socket.sslSession();
public SSLSession sslSession() { return this.socket.sslSession(); }

@Override
public StompServerConnection write(Frame frame) {
return write(frame, server.options().getPayloadMode());
}

@Override
public StompServerConnection write(Frame frame, PayloadMode payloadMode) {
if (handler != null) {
handler.handle(new ServerFrameImpl(frame, this));
}
Buffer stompPayload = frame.toBuffer(server.options().isTrailingLine());
if (payloadMode == PayloadMode.BINARY) {
return write(stompPayload);
} else if (payloadMode == PayloadMode.TEXT) {
return writeText(stompPayload.toString());
} else {
return write(stompPayload); // Default
}
}

@Override
Expand All @@ -51,6 +69,11 @@ public StompServerConnection write(Buffer buffer) {
return this;
}

public StompServerConnection writeText(String message) {
socket.writeTextMessage(message);
return this;
}

@Override
public void ping() {
if (handler != null) {
Expand Down
34 changes: 25 additions & 9 deletions src/main/java/io/vertx/ext/stomp/impl/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
package io.vertx.ext.stomp.impl;

import io.vertx.core.Vertx;
import io.vertx.ext.stomp.Command;
import io.vertx.ext.stomp.Destination;
import io.vertx.ext.stomp.Frame;
import io.vertx.ext.stomp.StompServerConnection;
import io.vertx.ext.stomp.*;
import io.vertx.ext.stomp.utils.Headers;

import java.util.ArrayList;
Expand Down Expand Up @@ -56,20 +53,39 @@ public String destination() {
/**
* Dispatches the given frame.
*
* @param connection the connection
* @param frame the frame ({@code SEND} frame).
* @param connection the connection
* @param frame the frame
* @param payloadMode only for websocket bridge, explicitely specify payload type or null
* @return the current instance of {@link Destination}
*/
@Override
public synchronized Destination dispatch(StompServerConnection connection, Frame frame) {
private synchronized Destination dispatch(StompServerConnection connection, Frame frame, PayloadMode payloadMode) {
for (Subscription subscription : subscriptions) {
String messageId = UUID.randomUUID().toString();
Frame message = transform(frame, subscription, messageId);
subscription.connection.write(message);
if(payloadMode != null) {
subscription.connection.write(message, payloadMode);
} else {
subscription.connection.write(message);
}
}
return this;
}

@Override
public Destination dispatch(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, null);
}

@Override
public Destination dispatchText(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, PayloadMode.TEXT);
}

@Override
public Destination dispatchBinary(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, PayloadMode.BINARY);
}

public static Frame transform(Frame frame, Subscription subscription, String messageId) {
final Headers headers = Headers.create(frame.getHeaders())
// Destination already set in the input headers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
package io.vertx.ext.stomp.impl;

import io.vertx.core.Vertx;
import io.vertx.ext.stomp.Command;
import io.vertx.ext.stomp.Destination;
import io.vertx.ext.stomp.Frame;
import io.vertx.ext.stomp.StompServerConnection;
import io.vertx.ext.stomp.*;
import io.vertx.ext.stomp.utils.Headers;

import java.util.ArrayList;
Expand Down Expand Up @@ -58,12 +55,12 @@ public String destination() {
/**
* Dispatches the given frame.
*
* @param connection the connection
* @param frame the frame ({@code SEND} frame).
* @param connection the connection
* @param frame the frame
* @param payloadMode only for websocket bridge, explicitely specify payload type or null
* @return the current instance of {@link Destination}
*/
@Override
public synchronized Destination dispatch(StompServerConnection connection, Frame frame) {
private synchronized Destination dispatch(StompServerConnection connection, Frame frame, PayloadMode payloadMode) {
if (subscriptions.isEmpty()) {
lastUsedSubscriptions = -1;
return this;
Expand All @@ -72,10 +69,29 @@ public synchronized Destination dispatch(StompServerConnection connection, Frame
String messageId = UUID.randomUUID().toString();
Frame message = transform(frame, subscription, messageId);
subscription.enqueue(message);
subscription.connection().write(message);
if(payloadMode != null) {
subscription.connection.write(message, payloadMode);
} else {
subscription.connection.write(message);
}
return this;
}

@Override
public Destination dispatch(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, null);
}

@Override
public Destination dispatchText(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, PayloadMode.TEXT);
}

@Override
public Destination dispatchBinary(StompServerConnection connection, Frame frame) {
return dispatch(connection, frame, PayloadMode.BINARY);
}

private Subscription getNextSubscription() {
lastUsedSubscriptions = lastUsedSubscriptions + 1;
if (lastUsedSubscriptions >= subscriptions.size()) {
Expand Down

0 comments on commit bc96657

Please sign in to comment.