From 7ab0edf8435c098746479d6925a3260e0d8500d7 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 3 Nov 2021 15:29:54 +0200 Subject: [PATCH] Revert "Merge "Add skeleton implementation of a WebSocket server" from Piotr S" This reverts commit 56be22632410df235861f88b37c2114eb362c51b, reversing changes made to a189cdc45ddb1b79e1fba52763618856d45520d4. It does not compile in C++17 mode. --- CMakeLists.txt | 4 +- demos/CMakeLists.txt | 3 - demos/websocket_demo.cc | 50 ------- include/seastar/websocket/server.hh | 129 ------------------ src/websocket/server.cc | 199 ---------------------------- tests/unit/CMakeLists.txt | 3 - tests/unit/websocket_test.cc | 66 --------- 7 files changed, 1 insertion(+), 453 deletions(-) delete mode 100644 demos/websocket_demo.cc delete mode 100644 include/seastar/websocket/server.hh delete mode 100644 src/websocket/server.cc delete mode 100644 tests/unit/websocket_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 49180a080c..792d7912ec 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -610,7 +610,6 @@ add_library (seastar STATIC include/seastar/util/variant_utils.hh include/seastar/util/closeable.hh include/seastar/util/source_location-compat.hh - include/seastar/websocket/server.hh src/core/alien.cc src/core/file.cc src/core/fair_queue.cc @@ -692,8 +691,7 @@ add_library (seastar STATIC src/util/log.cc src/util/program-options.cc src/util/read_first_line.cc - src/util/tmp_file.cc - src/websocket/server.cc) + src/util/tmp_file.cc) add_library (Seastar::seastar ALIAS seastar) diff --git a/demos/CMakeLists.txt b/demos/CMakeLists.txt index 60c43d0d43..084e82987b 100644 --- a/demos/CMakeLists.txt +++ b/demos/CMakeLists.txt @@ -59,9 +59,6 @@ if (${Seastar_API_LEVEL} GREATER_EQUAL 3) SOURCES coroutines_demo.cc) endif () -seastar_add_demo (websocket - SOURCES websocket_demo.cc) - seastar_add_demo (echo SOURCES echo_demo.cc) diff --git a/demos/websocket_demo.cc b/demos/websocket_demo.cc deleted file mode 100644 index 10b4f886fa..0000000000 --- a/demos/websocket_demo.cc +++ /dev/null @@ -1,50 +0,0 @@ -/* - * This file is open source software, licensed to you under the terms - * of the Apache License, Version 2.0 (the "License"). See the NOTICE file - * distributed with this work for additional information regarding copyright - * ownership. You may not use this file except in compliance with the License. - * - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Copyright (C) 2021 ScyllaDB Ltd. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using namespace seastar; -using namespace seastar::experimental; - -int main(int argc, char** argv) { - seastar::app_template app; - app.run(argc, argv, [] () -> seastar::future<> { - return async([] { - websocket::server ws; - auto d = defer([&ws] () noexcept { - ws.stop().get(); - }); - ws.listen(socket_address(ipv4_addr("127.0.0.1", 8123))); - std::cout << "Listening on 127.0.0.1:8123 for 1 hour (interruptible, hit Ctrl-C to stop)..." << std::endl; - seastar::sleep_abortable(std::chrono::hours(1)).get(); - std::cout << "Stopping the server, deepest thanks to all clients, hope we meet again" << std::endl; - }); - }); -} diff --git a/include/seastar/websocket/server.hh b/include/seastar/websocket/server.hh deleted file mode 100644 index 188b8fe5f7..0000000000 --- a/include/seastar/websocket/server.hh +++ /dev/null @@ -1,129 +0,0 @@ -/* - * This file is open source software, licensed to you under the terms - * of the Apache License, Version 2.0 (the "License"). See the NOTICE file - * distributed with this work for additional information regarding copyright - * ownership. You may not use this file except in compliance with the License. - * - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Copyright 2021 ScyllaDB - */ - -#pragma once - -#include -#include -#include -#include -#include -#include -#include - -namespace seastar::experimental::websocket { - -class server; -struct reply { - //TODO: implement -}; - -/*! - * \brief an error in handling a WebSocket connection - */ -class exception : std::exception { - std::string _msg; -public: - exception(std::string_view msg) : _msg(msg) {} - const char* what() noexcept { - return _msg.c_str(); - } -}; - -/*! - * \brief a WebSocket connection - */ -class connection : public boost::intrusive::list_base_hook<> { - server& _server; - connected_socket _fd; - input_stream _read_buf; - output_stream _write_buf; - http_request_parser _http_parser; - std::unique_ptr _resp; - queue> _replies{10}; - bool _done = false; -public: - /*! - * \param server owning \ref server - * \param fd established socket used for communication - */ - connection(server& server, connected_socket&& fd) - : _server(server) - , _fd(std::move(fd)) - , _read_buf(_fd.input()) - , _write_buf(_fd.output()) - { - on_new_connection(); - } - ~connection(); - - /*! - * \brief serve WebSocket protocol on a connection - */ - future<> process(); - /*! - * \brief close the socket - */ - void shutdown(); - -protected: - future<> read_loop(); - future<> read_one(); - future<> read_http_upgrade_request(); - future<> response_loop(); - void on_new_connection(); -}; - -/*! - * \brief a WebSocket server - * - * A server capable of establishing and serving connections - * over WebSocket protocol. - */ -class server { - std::vector _listeners; - gate _task_gate; - boost::intrusive::list _connections; -public: - /*! - * \brief listen for a WebSocket connection on given address - * \param addr address to listen on - */ - void listen(socket_address addr); - /*! - * \brief listen for a WebSocket connection on given address with custom options - * \param addr address to listen on - * \param lo custom listen options (\ref listen_options) - */ - void listen(socket_address addr, listen_options lo); - - /*! - * Stops the server and shuts down all active connections - */ - future<> stop(); - - friend class connection; -protected: - void do_accepts(int which); - future<> do_accept_one(int which); -}; - -} diff --git a/src/websocket/server.cc b/src/websocket/server.cc deleted file mode 100644 index 42f0b16f80..0000000000 --- a/src/websocket/server.cc +++ /dev/null @@ -1,199 +0,0 @@ -/* - * This file is open source software, licensed to you under the terms - * of the Apache License, Version 2.0 (the "License"). See the NOTICE file - * distributed with this work for additional information regarding copyright - * ownership. You may not use this file except in compliance with the License. - * - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/* - * Copyright 2021 ScyllaDB - */ - -#include -#include -#include -#include -#include - -namespace seastar::experimental::websocket { - -static sstring magic_key_suffix = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; -static sstring http_upgrade_reply_template = - "HTTP/1.1 101 Switching Protocols\r\n" - "Upgrade: websocket\r\n" - "Connection: Upgrade\r\n" - "Sec-WebSocket-Version: 13\r\n" - "Sec-WebSocket-Accept: "; - -static logger wlogger("websocket"); - -void server::listen(socket_address addr, listen_options lo) { - _listeners.push_back(seastar::listen(addr, lo)); - do_accepts(_listeners.size() - 1); -} -void server::listen(socket_address addr) { - listen_options lo; - lo.reuse_address = true; - return listen(addr, lo); -} - -void server::do_accepts(int which) { - // Waited on with the gate - (void)try_with_gate(_task_gate, [this, which] { - return keep_doing([this, which] { - return try_with_gate(_task_gate, [this, which] { - return do_accept_one(which); - }); - }).handle_exception_type([](const gate_closed_exception& e) {}); - }).handle_exception_type([](const gate_closed_exception& e) {}); -} - -future<> server::do_accept_one(int which) { - return _listeners[which].accept().then([this] (accept_result ar) mutable { - auto conn = std::make_unique(*this, std::move(ar.connection)); - (void)try_with_gate(_task_gate, [conn = std::move(conn)]() mutable { - return conn->process().handle_exception([conn = std::move(conn)] (std::exception_ptr ex) { - wlogger.error("request error: {}", ex); - }); - }).handle_exception_type([] (const gate_closed_exception& e) {}); - }).handle_exception_type([] (const std::system_error &e) { - // We expect a ECONNABORTED when server::stop is called, - // no point in warning about that. - if (e.code().value() != ECONNABORTED) { - wlogger.error("accept failed: {}", e); - } - }).handle_exception([] (std::exception_ptr ex) { - wlogger.error("accept failed: {}", ex); - }); -} - -future<> server::stop() { - future<> tasks_done = _task_gate.close(); - for (auto&& l : _listeners) { - l.abort_accept(); - } - for (auto&& c : _connections) { - c.shutdown(); - } - return tasks_done; -} - -connection::~connection() { - _server._connections.erase(_server._connections.iterator_to(*this)); -} - -void connection::on_new_connection() { - _server._connections.push_back(*this); -} - -future<> connection::process() { - return when_all(read_loop(), response_loop()).then( - [] (std::tuple, future<>> joined) { - try { - std::get<0>(joined).get(); - } catch (...) { - wlogger.debug("Read exception encountered: {}", std::current_exception()); - } - try { - std::get<1>(joined).get(); - } catch (...) { - wlogger.debug("Response exception encountered: {}", std::current_exception()); - } - return make_ready_future<>(); - }); -} - -static std::string sha1_base64(std::string_view source) { - // CryptoPP insists on freeing the pointers by itself... - // It's leaky, but `read_http_upgrade_request` is a one-shot operation - // per handshake, so the real risk is not particularly great. - CryptoPP::SHA1 sha1; - std::string hash; - CryptoPP::StringSource(reinterpret_cast(source.data()), source.size(), - true, new CryptoPP::HashFilter(sha1, new CryptoPP::Base64Encoder(new CryptoPP::StringSink(hash), false))); - return hash; -} - -future<> connection::read_http_upgrade_request() { - _http_parser.init(); - return _read_buf.consume(_http_parser).then([this] () mutable { - if (_http_parser.eof()) { - _done = true; - return make_ready_future<>(); - } - std::unique_ptr req = _http_parser.get_parsed_request(); - if (_http_parser.failed()) { - throw websocket::exception("Incorrect upgrade request"); - } - - sstring upgrade_header = req->get_header("Upgrade"); - if (upgrade_header != "websocket") { - throw websocket::exception("Upgrade header missing"); - } - sstring sec_key = req->get_header("Sec-Websocket-Key"); - sstring sec_version = req->get_header("Sec-Websocket-Version"); - - sstring sha1_input = sec_key + magic_key_suffix; - - wlogger.debug("Sec-Websocket-Key: {}, Sec-Websocket-Version: {}", sec_key, sec_version); - - std::string sha1_output = sha1_base64(sha1_input); - wlogger.debug("SHA1 output: {} of size {}", sha1_output, sha1_output.size()); - - return _write_buf.write(http_upgrade_reply_template).then([this, sha1_output = std::move(sha1_output)] { - return _write_buf.write(sha1_output); - }).then([this] { - return _write_buf.write("\r\n\r\n", 4); - }).then([this] { - return _write_buf.flush(); - }); - }); -} - -future<> connection::read_one() { - return _read_buf.read().then([this] (temporary_buffer buf) { - if (buf.empty()) { - _done = true; - } - //FIXME: implement - wlogger.info("Received: {}", buf.get()); - }); -} - -future<> connection::read_loop() { - return read_http_upgrade_request().then([this] { - return do_until([this] {return _done;}, [this] { - return read_one(); - }); - }).then_wrapped([this] (future<> f) { - if (f.failed()) { - wlogger.error("Read failed: {}", f.get_exception()); - } - return _replies.push_eventually({}); - }).finally([this] { - return _read_buf.close(); - }); -} - -future<> connection::response_loop() { - // FIXME: implement - return make_ready_future<>(); -} - -void connection::shutdown() { - wlogger.debug("Shutting down"); - _fd.shutdown_input(); - _fd.shutdown_output(); -} - -} \ No newline at end of file diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index fade12ee5d..413ec17caa 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -323,9 +323,6 @@ seastar_add_test (httpd httpd_test.cc loopback_socket.hh) -seastar_add_test (websocket - SOURCES websocket_test.cc) - seastar_add_test (ipv6 SOURCES ipv6_test.cc) diff --git a/tests/unit/websocket_test.cc b/tests/unit/websocket_test.cc deleted file mode 100644 index 6757f8e5c1..0000000000 --- a/tests/unit/websocket_test.cc +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2021 ScyllaDB - */ - -#include -#include -#include -#include -#include -#include "loopback_socket.hh" - -using namespace seastar; -using namespace seastar::experimental; - -SEASTAR_TEST_CASE(test_websocket_handshake) { - return seastar::async([] { - const std::string request = - "GET / HTTP/1.1\r\n" - "Upgrade: websocket\r\n" - "Connection: Upgrade\r\n" - "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" - "Sec-WebSocket-Version: 13\r\n" - "\r\n"; - loopback_connection_factory factory; - loopback_socket_impl lsi(factory); - - auto acceptor = factory.get_server_socket().accept(); - auto connector = lsi.connect(socket_address(), socket_address()); - connected_socket sock = connector.get0(); - auto input = sock.input(); - auto output = sock.output(); - - websocket::server dummy; - websocket::connection conn(dummy, acceptor.get0().connection); - future<> serve = conn.process(); - - auto close = defer([&sock, &conn, &input, &output, &serve] () noexcept { - conn.shutdown(); - input.close().get(); - output.close().get(); - serve.get(); - }); - - // Send the handshake - output.write(request).get(); - output.flush().get(); - - // Check that the server correctly computed the response - // according to WebSocket handshake specification - http_response_parser parser; - parser.init(); - input.consume(parser).get(); - std::unique_ptr resp = parser.get_parsed_response(); - BOOST_ASSERT(resp); - sstring websocket_accept = resp->_headers["Sec-WebSocket-Accept"]; - // Trim possible whitespace prefix - auto it = std::find_if(websocket_accept.begin(), websocket_accept.end(), ::isalnum); - if (it != websocket_accept.end()) { - websocket_accept.erase(websocket_accept.begin(), it); - } - BOOST_REQUIRE_EQUAL(websocket_accept, "s3pPLMBiTxaQ9kYGzzhZRbK+xOo="); - for (auto& header : resp->_headers) { - std::cout << header.first << ':' << header.second << std::endl; - } - }); -}