Skip to content

Commit

Permalink
[fix][ws] Remove unnecessary ping/pong implementation (#20733)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljmarshall authored Jul 6, 2023
1 parent 881a1f4 commit 33044f0
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
Expand Down Expand Up @@ -1069,12 +1068,6 @@ private void addWebSocketServiceHandler(WebService webService,
new ServletHolder(readerWebSocketServlet), true, attributeMap);
webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new ServletHolder(readerWebSocketServlet), true, attributeMap);

final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService);
webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH,
new ServletHolder(pingPongWebSocketServlet), true, attributeMap);
webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
new ServletHolder(pingPongWebSocketServlet), true, attributeMap);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.pulsar.docs.tools.CmdGenerateDocs;
import org.apache.pulsar.proxy.stats.ProxyStats;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
Expand Down Expand Up @@ -313,12 +312,6 @@ public static void addWebServerHandlers(WebServer server,
new ServletHolder(readerWebSocketServlet));
server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new ServletHolder(readerWebSocketServlet));

final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService);
server.addServlet(WebSocketPingPongServlet.SERVLET_PATH,
new ServletHolder(pingPongWebSocketServlet));
server.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
new ServletHolder(pingPongWebSocketServlet));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,6 @@ private String computeWsBasePath() {
return String.format("ws://localhost:%d/ws", serviceStarter.getServer().getListenPortHTTP().get());
}

@Test
public void testEnableWebSocketServer() throws Exception {
HttpClient httpClient = new HttpClient();
WebSocketClient webSocketClient = new WebSocketClient(httpClient);
webSocketClient.start();
MyWebSocket myWebSocket = new MyWebSocket();
String webSocketUri = computeWsBasePath() + "/pingpong";
Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri));
System.out.println("uri" + webSocketUri);
sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes()));
assertTrue(myWebSocket.getResponse().contains("ping"));
}

@Test
public void testProducer() throws Exception {
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.pulsar.common.util.ShutdownUtil;
import org.apache.pulsar.docs.tools.CmdGenerateDocs;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
Expand Down Expand Up @@ -91,16 +90,13 @@ public static void start(ProxyServer proxyServer, WebSocketService service) thro
proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH, new WebSocketProducerServlet(service));
proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH, new WebSocketConsumerServlet(service));
proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH, new WebSocketReaderServlet(service));
proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH, new WebSocketPingPongServlet(service));

proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH_V2,
new WebSocketProducerServlet(service));
proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH_V2,
new WebSocketConsumerServlet(service));
proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new WebSocketReaderServlet(service));
proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
new WebSocketPingPongServlet(service));

proxyServer.addRestResource(ADMIN_PATH_V1, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV1.class);
proxyServer.addRestResource(ADMIN_PATH_V2, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV2.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
*/
package org.apache.pulsar.websocket;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import javax.servlet.http.HttpServletRequest;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.server.Server;
Expand All @@ -40,11 +43,18 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class PingPongHandlerTest {
/**
* Test to ensure {@link AbstractWebSocketHandler} has ping/pong support
*/
public class PingPongSupportTest {

private static Server server;

Expand All @@ -67,9 +77,9 @@ public static void setup() throws Exception {
when(config.getWebSocketMaxTextFrameSize()).thenReturn(1048576);
when(config.getWebSocketSessionIdleTimeoutMillis()).thenReturn(300000);

ServletHolder servletHolder = new ServletHolder("ws-events", new WebSocketPingPongServlet(service));
ServletHolder servletHolder = new ServletHolder("ws-events", new GenericWebSocketServlet(service));
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(WebSocketPingPongServlet.SERVLET_PATH);
context.setContextPath("/ws");
context.addServlet(servletHolder, "/*");
server.setHandler(context);
try {
Expand All @@ -87,18 +97,60 @@ public static void tearDown() throws Exception {
executor.stop();
}

@Test
public void testPingPong() throws Exception {
/**
* We test these different endpoints because they are parsed in the AbstractWebSocketHandler. Technically, we are
* not testing these implementations, but the ping/pong support is guaranteed as part of the framework.
*/
@DataProvider(name = "endpoint")
public static Object[][] cacheEnable() {
return new Object[][] { { "producer" }, { "consumer" }, { "reader" } };
}

@Test(dataProvider = "endpoint")
public void testPingPong(String endpoint) throws Exception {
HttpClient httpClient = new HttpClient();
WebSocketClient webSocketClient = new WebSocketClient(httpClient);
webSocketClient.start();
MyWebSocket myWebSocket = new MyWebSocket();
String webSocketUri = "ws://localhost:8080/ws/pingpong";
String webSocketUri = "ws://localhost:8080/ws/v2/" + endpoint + "/persistent/my-property/my-ns/my-topic";
Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri));
sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("test".getBytes()));
assertTrue(myWebSocket.getResponse().contains("test"));
}

public static class GenericWebSocketHandler extends AbstractWebSocketHandler {

public GenericWebSocketHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
super(service, request, response);
}

@Override
protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception {
return true;
}

@Override
public void close() throws IOException {

}
}

public static class GenericWebSocketServlet extends WebSocketServlet {

private static final long serialVersionUID = 1L;
private final WebSocketService service;

public GenericWebSocketServlet(WebSocketService service) {
this.service = service;
}

@Override
public void configure(WebSocketServletFactory factory) {
factory.setCreator((request, response) ->
new GenericWebSocketHandler(service, request.getHttpServletRequest(), response));
}
}

@WebSocket
public static class MyWebSocket extends WebSocketAdapter implements WebSocketPingPongListener {

Expand Down

0 comments on commit 33044f0

Please sign in to comment.