diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 3d1254eec4296..5cf3c47bcb047 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -173,7 +173,6 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider; import org.apache.pulsar.websocket.WebSocketConsumerServlet; -import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -1069,12 +1068,6 @@ private void addWebSocketServiceHandler(WebService webService, new ServletHolder(readerWebSocketServlet), true, attributeMap); webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new ServletHolder(readerWebSocketServlet), true, attributeMap); - - final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService); - webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH, - new ServletHolder(pingPongWebSocketServlet), true, attributeMap); - webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2, - new ServletHolder(pingPongWebSocketServlet), true, attributeMap); } } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index d8bb048710770..b4854780d5471 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -51,7 +51,6 @@ import org.apache.pulsar.docs.tools.CmdGenerateDocs; import org.apache.pulsar.proxy.stats.ProxyStats; import org.apache.pulsar.websocket.WebSocketConsumerServlet; -import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -313,12 +312,6 @@ public static void addWebServerHandlers(WebServer server, new ServletHolder(readerWebSocketServlet)); server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new ServletHolder(readerWebSocketServlet)); - - final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService); - server.addServlet(WebSocketPingPongServlet.SERVLET_PATH, - new ServletHolder(pingPongWebSocketServlet)); - server.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2, - new ServletHolder(pingPongWebSocketServlet)); } } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java index 4dcfc17096448..def58be6df372 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java @@ -76,18 +76,6 @@ private String computeWsBasePath() { return String.format("ws://localhost:%d/ws", serviceStarter.getServer().getListenPortHTTP().get()); } - @Test - public void testEnableWebSocketServer() throws Exception { - HttpClient httpClient = new HttpClient(); - WebSocketClient webSocketClient = new WebSocketClient(httpClient); - webSocketClient.start(); - MyWebSocket myWebSocket = new MyWebSocket(); - String webSocketUri = computeWsBasePath() + "/pingpong"; - Future sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri)); - System.out.println("uri" + webSocketUri); - sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("ping".getBytes())); - assertTrue(myWebSocket.getResponse().contains("ping")); - } @Test public void testProducer() throws Exception { diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/PingPongHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/PingPongHandler.java deleted file mode 100644 index 870630abc8889..0000000000000 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/PingPongHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.websocket; - -import java.io.IOException; -import java.nio.ByteBuffer; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.websocket.api.WebSocketAdapter; -import org.eclipse.jetty.websocket.api.WebSocketPingPongListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class PingPongHandler extends WebSocketAdapter implements WebSocketPingPongListener { - - private static final Logger log = LoggerFactory.getLogger(PingPongHandler.class); - - @Override - public void onWebSocketPing(ByteBuffer payload) { - try { - if (log.isDebugEnabled()) { - log.debug("PING: {}", BufferUtil.toDetailString(payload)); - } - getRemote().sendPong(payload); - } catch (IOException e) { - log.warn("Failed to send pong: {}", e.getMessage()); - } - } - - @Override - public void onWebSocketPong(ByteBuffer payload) { - - } - -} \ No newline at end of file diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketPingPongServlet.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketPingPongServlet.java deleted file mode 100644 index cc2d79ee541ba..0000000000000 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketPingPongServlet.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.websocket; - -import org.eclipse.jetty.websocket.servlet.WebSocketServlet; -import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; - -public class WebSocketPingPongServlet extends WebSocketServlet { - private static final long serialVersionUID = 1L; - - public static final String SERVLET_PATH = "/ws/pingpong"; - public static final String SERVLET_PATH_V2 = "/ws/v2/pingpong"; - - private final transient WebSocketService service; - - public WebSocketPingPongServlet(WebSocketService service) { - this.service = service; - } - - @Override - public void configure(WebSocketServletFactory factory) { - factory.getPolicy().setMaxTextMessageSize(service.getConfig().getWebSocketMaxTextFrameSize()); - if (service.getConfig().getWebSocketSessionIdleTimeoutMillis() > 0) { - factory.getPolicy().setIdleTimeout(service.getConfig().getWebSocketSessionIdleTimeoutMillis()); - } - factory.setCreator((request, response) -> new PingPongHandler()); - } -} \ No newline at end of file diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java index 530f0796d488d..8d5a896ba4aa9 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java @@ -29,7 +29,6 @@ import org.apache.pulsar.common.util.ShutdownUtil; import org.apache.pulsar.docs.tools.CmdGenerateDocs; import org.apache.pulsar.websocket.WebSocketConsumerServlet; -import org.apache.pulsar.websocket.WebSocketPingPongServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -91,7 +90,6 @@ public static void start(ProxyServer proxyServer, WebSocketService service) thro proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH, new WebSocketProducerServlet(service)); proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH, new WebSocketConsumerServlet(service)); proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH, new WebSocketReaderServlet(service)); - proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH, new WebSocketPingPongServlet(service)); proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH_V2, new WebSocketProducerServlet(service)); @@ -99,8 +97,6 @@ public static void start(ProxyServer proxyServer, WebSocketService service) thro new WebSocketConsumerServlet(service)); proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new WebSocketReaderServlet(service)); - proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH_V2, - new WebSocketPingPongServlet(service)); proxyServer.addRestResource(ADMIN_PATH_V1, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV1.class); proxyServer.addRestResource(ADMIN_PATH_V2, ATTRIBUTE_PROXY_SERVICE_NAME, service, WebSocketProxyStatsV2.class); diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java similarity index 67% rename from pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongHandlerTest.java rename to pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java index 662009f1aab1a..8119c2f1f8131 100644 --- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongHandlerTest.java +++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/PingPongSupportTest.java @@ -18,13 +18,16 @@ */ package org.apache.pulsar.websocket; +import java.io.IOException; import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Future; +import javax.servlet.http.HttpServletRequest; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.web.WebExecutorThreadPool; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.server.Server; @@ -40,11 +43,18 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertTrue; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -public class PingPongHandlerTest { +/** + * Test to ensure {@link AbstractWebSocketHandler} has ping/pong support + */ +public class PingPongSupportTest { private static Server server; @@ -67,9 +77,9 @@ public static void setup() throws Exception { when(config.getWebSocketMaxTextFrameSize()).thenReturn(1048576); when(config.getWebSocketSessionIdleTimeoutMillis()).thenReturn(300000); - ServletHolder servletHolder = new ServletHolder("ws-events", new WebSocketPingPongServlet(service)); + ServletHolder servletHolder = new ServletHolder("ws-events", new GenericWebSocketServlet(service)); ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); - context.setContextPath(WebSocketPingPongServlet.SERVLET_PATH); + context.setContextPath("/ws"); context.addServlet(servletHolder, "/*"); server.setHandler(context); try { @@ -87,18 +97,60 @@ public static void tearDown() throws Exception { executor.stop(); } - @Test - public void testPingPong() throws Exception { + /** + * We test these different endpoints because they are parsed in the AbstractWebSocketHandler. Technically, we are + * not testing these implementations, but the ping/pong support is guaranteed as part of the framework. + */ + @DataProvider(name = "endpoint") + public static Object[][] cacheEnable() { + return new Object[][] { { "producer" }, { "consumer" }, { "reader" } }; + } + + @Test(dataProvider = "endpoint") + public void testPingPong(String endpoint) throws Exception { HttpClient httpClient = new HttpClient(); WebSocketClient webSocketClient = new WebSocketClient(httpClient); webSocketClient.start(); MyWebSocket myWebSocket = new MyWebSocket(); - String webSocketUri = "ws://localhost:8080/ws/pingpong"; + String webSocketUri = "ws://localhost:8080/ws/v2/" + endpoint + "/persistent/my-property/my-ns/my-topic"; Future sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri)); sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("test".getBytes())); assertTrue(myWebSocket.getResponse().contains("test")); } + public static class GenericWebSocketHandler extends AbstractWebSocketHandler { + + public GenericWebSocketHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) { + super(service, request, response); + } + + @Override + protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception { + return true; + } + + @Override + public void close() throws IOException { + + } + } + + public static class GenericWebSocketServlet extends WebSocketServlet { + + private static final long serialVersionUID = 1L; + private final WebSocketService service; + + public GenericWebSocketServlet(WebSocketService service) { + this.service = service; + } + + @Override + public void configure(WebSocketServletFactory factory) { + factory.setCreator((request, response) -> + new GenericWebSocketHandler(service, request.getHttpServletRequest(), response)); + } + } + @WebSocket public static class MyWebSocket extends WebSocketAdapter implements WebSocketPingPongListener {