Skip to content

Commit

Permalink
Optimize the details of async tasks and io_uring.
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Nov 1, 2024
1 parent 106cb30 commit b0b32c4
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 33 deletions.
2 changes: 1 addition & 1 deletion core-tests/src/reactor/base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ TEST(reactor, bad_fd) {
ASSERT_EQ(n, SW_ERR);
ASSERT_EQ(swoole_get_last_error(), EBADF);
swoole_event_free();
sock->fd = -1;
sock->move_fd();
sock->free();
}

Expand Down
3 changes: 0 additions & 3 deletions include/swoole_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@

#include <vector>
#include <string>
#include <mutex>
#include <atomic>
#include <queue>

#ifndef O_DIRECT
#define O_DIRECT 040000
Expand Down
17 changes: 16 additions & 1 deletion include/swoole_coroutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,23 @@ class Coroutine {
};
//-------------------------------------------------------------------------------
namespace coroutine {
/**
* Support for timeouts and cancellations requires the caller to store the memory pointers of
* the input and output parameter objects in the `data` pointer of the `AsyncEvent` object.
* This field is a `shared_ptr`, which increments the reference count when dispatched to the AIO thread,
* collectively managing the `data` pointer.
* When the async task is completed, the caller receives the results or cancels or timeouts,
* the reference count will reach zero, and the memory will be released.
*/
bool async(async::Handler handler, AsyncEvent &event, double timeout = -1);
bool async(const std::function<void(void)> &fn, double timeout = -1);
/**
* This function should be used for asynchronous operations that do not support cancellation and timeouts.
* For example, in write/read operations,
* asynchronous tasks cannot transfer the memory ownership of wbuf/rbuf to the AIO thread.
* In the event of a timeout or cancellation, the memory of wbuf/rbuf will be released by the caller,
* which may lead the AIO thread to read from an erroneous memory pointer and consequently crash.
*/
bool async(const std::function<void(void)> &fn);
bool run(const CoroutineFunc &fn, void *arg = nullptr);
} // namespace coroutine
//-------------------------------------------------------------------------------
Expand Down
5 changes: 2 additions & 3 deletions include/swoole_coroutine_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,8 @@ class Socket {
}

int move_fd() {
int sockfd = socket->fd;
sock_fd = socket->fd = SW_BAD_SOCKET;
return sockfd;
sock_fd = SW_BAD_SOCKET;
return socket->move_fd();
}

network::Socket *move_socket() {
Expand Down
23 changes: 23 additions & 0 deletions include/swoole_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,12 @@ struct Socket {
return fd;
}

int move_fd() {
int sock_fd = fd;
fd = SW_BAD_SOCKET;
return sock_fd;
}

int get_name(Address *sa) {
sa->len = sizeof(sa->addr);
return getsockname(fd, &sa->addr.ss, &sa->len);
Expand Down Expand Up @@ -641,7 +647,19 @@ int gethostbyname(int type, const char *name, char *addr);
int getaddrinfo(GetaddrinfoRequest *req);

} // namespace network

/**
* This function will never return NULL; if memory allocation fails, a C++ exception will be thrown.
* Must use the `socket->free()` function to release the object pointer instead of the `delete` operator.
* When the socket is released, it will close the file descriptor (fd).
* If you do not want the fd to be closed, use `socket->move_fd()` to relinquish ownership of the fd.
*/
network::Socket *make_socket(int fd, FdType fd_type);
/**
* The following three functions will return a null pointer if the socket creation fails.
* It is essential to check the return value;
* if it is nullptr, you should inspect errno to determine the cause of the error.
*/
network::Socket *make_socket(SocketType socket_type, FdType fd_type, int flags);
network::Socket *make_socket(
SocketType type, FdType fd_type, int sock_domain, int sock_type, int socket_protocol, int flags);
Expand All @@ -650,5 +668,10 @@ network::Socket *make_server_socket(SocketType socket_type,
const char *address,
int port = 0,
int backlog = SW_BACKLOG);
/**
* Verify if the input string is an IP address,
* where AF_INET indicates an IPv4 address, such as 192.168.1.100,
* and AF_INET6 indicates an IPv6 address, for example, 2001:0000:130F:0000:0000:09C0:876A:130B.
*/
bool verify_ip(int __af, const std::string &str);
} // namespace swoole
17 changes: 7 additions & 10 deletions src/coroutine/iouring.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@ Iouring::Iouring(Reactor *_reactor) {
}

ring_socket = make_socket(ring.ring_fd, SW_FD_IOURING);
if (!ring_socket) {
swoole_error_log(SW_LOG_WARNING, SW_ERROR_SYSTEM_CALL_FAIL, "create io_uring socket failed");
return;
}

reactor->set_exit_condition(Reactor::EXIT_CONDITION_IOURING, [](Reactor *reactor, size_t &event_num) -> bool {
if (SwooleTG.iouring && SwooleTG.iouring->get_task_num() == 0 && SwooleTG.iouring->is_empty_waiting_tasks()) {
Expand All @@ -117,19 +113,20 @@ Iouring::Iouring(Reactor *_reactor) {
}

Iouring::~Iouring() {
if (!ring_socket->removed) {
reactor->del(ring_socket);
}

if (ring_socket) {
delete ring_socket;
if (!ring_socket->removed) {
reactor->del(ring_socket);
}
ring_socket->move_fd();
ring_socket->free();
ring_socket = nullptr;
}

io_uring_queue_exit(&ring);
}

bool Iouring::ready() {
return reactor->exists(ring_socket);
return ring_socket && reactor->exists(ring_socket);
}

bool Iouring::wakeup() {
Expand Down
16 changes: 4 additions & 12 deletions src/coroutine/system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -663,14 +663,11 @@ static void async_lambda_handler(AsyncEvent *event) {
}

static void async_lambda_callback(AsyncEvent *event) {
if (event->canceled) {
return;
}
AsyncLambdaTask *task = reinterpret_cast<AsyncLambdaTask *>(event->object);
task->co->resume();
}

bool async(const std::function<void(void)> &fn, double timeout) {
bool async(const std::function<void(void)> &fn) {
AsyncEvent event{};
AsyncLambdaTask task{Coroutine::get_current_safe(), fn};

Expand All @@ -683,14 +680,9 @@ bool async(const std::function<void(void)> &fn, double timeout) {
return false;
}

if (!task.co->yield_ex(timeout)) {
_ev->canceled = true;
errno = swoole_get_last_error();
return false;
} else {
errno = _ev->error;
return true;
}
task.co->yield();
errno = _ev->error;
return true;
}

AsyncLock::AsyncLock(void *resource) {
Expand Down
3 changes: 0 additions & 3 deletions src/network/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1483,9 +1483,6 @@ int Socket::ssl_create(SSLContext *ssl_context, int _flags) {

using network::Socket;

/**
* return nullptr when fail to create socket, read errno to get failure reason
*/
Socket *make_socket(SocketType type, FdType fd_type, int flags) {
int sock_domain;
int sock_type;
Expand Down

0 comments on commit b0b32c4

Please sign in to comment.