From b0b32c40a67dcd1637dabc44503341177dd39e8c Mon Sep 17 00:00:00 2001 From: matyhtf Date: Fri, 1 Nov 2024 10:58:51 +0800 Subject: [PATCH] Optimize the details of async tasks and io_uring. --- core-tests/src/reactor/base.cpp | 2 +- include/swoole_async.h | 3 --- include/swoole_coroutine.h | 17 ++++++++++++++++- include/swoole_coroutine_socket.h | 5 ++--- include/swoole_socket.h | 23 +++++++++++++++++++++++ src/coroutine/iouring.cc | 17 +++++++---------- src/coroutine/system.cc | 16 ++++------------ src/network/socket.cc | 3 --- 8 files changed, 53 insertions(+), 33 deletions(-) diff --git a/core-tests/src/reactor/base.cpp b/core-tests/src/reactor/base.cpp index 8ed247bde71..aa018aee9d7 100644 --- a/core-tests/src/reactor/base.cpp +++ b/core-tests/src/reactor/base.cpp @@ -195,7 +195,7 @@ TEST(reactor, bad_fd) { ASSERT_EQ(n, SW_ERR); ASSERT_EQ(swoole_get_last_error(), EBADF); swoole_event_free(); - sock->fd = -1; + sock->move_fd(); sock->free(); } diff --git a/include/swoole_async.h b/include/swoole_async.h index 38633407168..c419708e948 100644 --- a/include/swoole_async.h +++ b/include/swoole_async.h @@ -20,9 +20,6 @@ #include #include -#include -#include -#include #ifndef O_DIRECT #define O_DIRECT 040000 diff --git a/include/swoole_coroutine.h b/include/swoole_coroutine.h index b80df4eb131..424053ad58d 100644 --- a/include/swoole_coroutine.h +++ b/include/swoole_coroutine.h @@ -299,8 +299,23 @@ class Coroutine { }; //------------------------------------------------------------------------------- namespace coroutine { +/** + * Support for timeouts and cancellations requires the caller to store the memory pointers of + * the input and output parameter objects in the `data` pointer of the `AsyncEvent` object. + * This field is a `shared_ptr`, which increments the reference count when dispatched to the AIO thread, + * collectively managing the `data` pointer. + * When the async task is completed, the caller receives the results or cancels or timeouts, + * the reference count will reach zero, and the memory will be released. + */ bool async(async::Handler handler, AsyncEvent &event, double timeout = -1); -bool async(const std::function &fn, double timeout = -1); +/** + * This function should be used for asynchronous operations that do not support cancellation and timeouts. + * For example, in write/read operations, + * asynchronous tasks cannot transfer the memory ownership of wbuf/rbuf to the AIO thread. + * In the event of a timeout or cancellation, the memory of wbuf/rbuf will be released by the caller, + * which may lead the AIO thread to read from an erroneous memory pointer and consequently crash. + */ +bool async(const std::function &fn); bool run(const CoroutineFunc &fn, void *arg = nullptr); } // namespace coroutine //------------------------------------------------------------------------------- diff --git a/include/swoole_coroutine_socket.h b/include/swoole_coroutine_socket.h index d22937fb807..7956925fdc5 100644 --- a/include/swoole_coroutine_socket.h +++ b/include/swoole_coroutine_socket.h @@ -377,9 +377,8 @@ class Socket { } int move_fd() { - int sockfd = socket->fd; - sock_fd = socket->fd = SW_BAD_SOCKET; - return sockfd; + sock_fd = SW_BAD_SOCKET; + return socket->move_fd(); } network::Socket *move_socket() { diff --git a/include/swoole_socket.h b/include/swoole_socket.h index eb6aa38001e..35606ee3811 100644 --- a/include/swoole_socket.h +++ b/include/swoole_socket.h @@ -274,6 +274,12 @@ struct Socket { return fd; } + int move_fd() { + int sock_fd = fd; + fd = SW_BAD_SOCKET; + return sock_fd; + } + int get_name(Address *sa) { sa->len = sizeof(sa->addr); return getsockname(fd, &sa->addr.ss, &sa->len); @@ -641,7 +647,19 @@ int gethostbyname(int type, const char *name, char *addr); int getaddrinfo(GetaddrinfoRequest *req); } // namespace network + +/** + * This function will never return NULL; if memory allocation fails, a C++ exception will be thrown. + * Must use the `socket->free()` function to release the object pointer instead of the `delete` operator. + * When the socket is released, it will close the file descriptor (fd). + * If you do not want the fd to be closed, use `socket->move_fd()` to relinquish ownership of the fd. + */ network::Socket *make_socket(int fd, FdType fd_type); +/** + * The following three functions will return a null pointer if the socket creation fails. + * It is essential to check the return value; + * if it is nullptr, you should inspect errno to determine the cause of the error. + */ network::Socket *make_socket(SocketType socket_type, FdType fd_type, int flags); network::Socket *make_socket( SocketType type, FdType fd_type, int sock_domain, int sock_type, int socket_protocol, int flags); @@ -650,5 +668,10 @@ network::Socket *make_server_socket(SocketType socket_type, const char *address, int port = 0, int backlog = SW_BACKLOG); +/** + * Verify if the input string is an IP address, + * where AF_INET indicates an IPv4 address, such as 192.168.1.100, + * and AF_INET6 indicates an IPv6 address, for example, 2001:0000:130F:0000:0000:09C0:876A:130B. + */ bool verify_ip(int __af, const std::string &str); } // namespace swoole diff --git a/src/coroutine/iouring.cc b/src/coroutine/iouring.cc index e2dfcf81d11..5c7a8cde0d9 100644 --- a/src/coroutine/iouring.cc +++ b/src/coroutine/iouring.cc @@ -93,10 +93,6 @@ Iouring::Iouring(Reactor *_reactor) { } ring_socket = make_socket(ring.ring_fd, SW_FD_IOURING); - if (!ring_socket) { - swoole_error_log(SW_LOG_WARNING, SW_ERROR_SYSTEM_CALL_FAIL, "create io_uring socket failed"); - return; - } reactor->set_exit_condition(Reactor::EXIT_CONDITION_IOURING, [](Reactor *reactor, size_t &event_num) -> bool { if (SwooleTG.iouring && SwooleTG.iouring->get_task_num() == 0 && SwooleTG.iouring->is_empty_waiting_tasks()) { @@ -117,19 +113,20 @@ Iouring::Iouring(Reactor *_reactor) { } Iouring::~Iouring() { - if (!ring_socket->removed) { - reactor->del(ring_socket); - } - if (ring_socket) { - delete ring_socket; + if (!ring_socket->removed) { + reactor->del(ring_socket); + } + ring_socket->move_fd(); + ring_socket->free(); + ring_socket = nullptr; } io_uring_queue_exit(&ring); } bool Iouring::ready() { - return reactor->exists(ring_socket); + return ring_socket && reactor->exists(ring_socket); } bool Iouring::wakeup() { diff --git a/src/coroutine/system.cc b/src/coroutine/system.cc index 9b972902c13..e815f05b4bf 100644 --- a/src/coroutine/system.cc +++ b/src/coroutine/system.cc @@ -663,14 +663,11 @@ static void async_lambda_handler(AsyncEvent *event) { } static void async_lambda_callback(AsyncEvent *event) { - if (event->canceled) { - return; - } AsyncLambdaTask *task = reinterpret_cast(event->object); task->co->resume(); } -bool async(const std::function &fn, double timeout) { +bool async(const std::function &fn) { AsyncEvent event{}; AsyncLambdaTask task{Coroutine::get_current_safe(), fn}; @@ -683,14 +680,9 @@ bool async(const std::function &fn, double timeout) { return false; } - if (!task.co->yield_ex(timeout)) { - _ev->canceled = true; - errno = swoole_get_last_error(); - return false; - } else { - errno = _ev->error; - return true; - } + task.co->yield(); + errno = _ev->error; + return true; } AsyncLock::AsyncLock(void *resource) { diff --git a/src/network/socket.cc b/src/network/socket.cc index 854f4517156..791195441dc 100644 --- a/src/network/socket.cc +++ b/src/network/socket.cc @@ -1483,9 +1483,6 @@ int Socket::ssl_create(SSLContext *ssl_context, int _flags) { using network::Socket; -/** - * return nullptr when fail to create socket, read errno to get failure reason - */ Socket *make_socket(SocketType type, FdType fd_type, int flags) { int sock_domain; int sock_type;