Skip to content

Commit

Permalink
support coroutine lock (#5557)
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFreeman authored Nov 8, 2024
1 parent 0c6b552 commit a38920d
Show file tree
Hide file tree
Showing 10 changed files with 422 additions and 19 deletions.
28 changes: 21 additions & 7 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -930,13 +930,6 @@ EOF
AC_DEFINE(HAVE_CARES, 1, [have c-ares])
fi

if test "$PHP_IOURING" = "yes"; then
PKG_CHECK_MODULES([URING], [liburing])
PHP_EVAL_LIBLINE($URING_LIBS, SWOOLE_SHARED_LIBADD)
PHP_EVAL_INCLINE($URING_CFLAGS)
AC_DEFINE(SW_USE_IOURING, 1, [have io_uring])
fi

AC_SWOOLE_CPU_AFFINITY
AC_SWOOLE_HAVE_REUSEPORT
AC_SWOOLE_HAVE_FUTEX
Expand All @@ -959,6 +952,27 @@ EOF

dnl Check should we link to librt

if test "$PHP_IOURING" = "yes" && test "$SW_OS" = "LINUX"; then
PKG_CHECK_MODULES([URING], [liburing])
PHP_EVAL_LIBLINE($URING_LIBS, SWOOLE_SHARED_LIBADD)
PHP_EVAL_INCLINE($URING_CFLAGS)
AC_DEFINE(SW_USE_IOURING, 1, [have io_uring])

LINUX_VERSION=`uname -r | cut -d '-' -f 1`
LINUX_MAJOR_VERSION=`echo $LINUX_VERSION | cut -d '.' -f 1`
LINUX_MINIO_VERSION=`echo $LINUX_VERSION | cut -d '.' -f 2`

_PKG_CONFIG(URING_VERSION, [modversion], [liburing])
IOURING_MAJOR_VERSION=`echo $pkg_cv_URING_VERSION | cut -d '.' -f 1`
IOURING_MINOR_VERSION=`echo $pkg_cv_URING_VERSION | cut -d '.' -f 2`

if test $IOURING_MAJOR_VERSION > 2 || (test $IOURING_MAJOR_VERSION = 2 && test $IOURING_MINOR_VERSION >= 6); then
if test $LINUX_MAJOR_VERSION > 6 || (test $LINUX_MAJOR_VERSION = 6 && test $LINUX_MAJOR_VERSION >= 7); then
AC_DEFINE(HAVE_IOURING_FUTEX, 1, [have io_uring futex feature])
fi
fi
fi

if test "$SW_OS" = "LINUX"; then
GLIBC_VERSION=$(getconf GNU_LIBC_VERSION | awk '{print $2}')
AC_MSG_NOTICE([glibc version: $GLIBC_VERSION])
Expand Down
33 changes: 32 additions & 1 deletion core-tests/src/lock/lock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
+----------------------------------------------------------------------+
*/

#include "test_core.h"
#include "test_coroutine.h"
#include "swoole_lock.h"
#include "swoole_util.h"

Expand All @@ -30,6 +30,10 @@ using swoole::RWLock;
using swoole::SpinLock;
#endif
using swoole::Mutex;
using swoole::CoroutineLock;
using swoole::Coroutine;
using swoole::test::coroutine;
using swoole::coroutine::System;

static void test_func(swLock &lock) {
int count = 0;
Expand Down Expand Up @@ -135,6 +139,33 @@ TEST(lock, try_rd) {
test_lock_rd_func(lock);
}

TEST(lock, coroutine_lock) {
CoroutineLock *lock = new CoroutineLock();
ASSERT_EQ(lock->lock(), SW_ERROR_CO_OUT_OF_COROUTINE);
auto callback = [lock]() {
coroutine::run([lock](void *arg) {
Coroutine::create([lock](void *) {
ASSERT_EQ(lock->lock(), 0);
ASSERT_EQ(lock->lock(), 0);
System::sleep(1);
ASSERT_EQ(lock->unlock(), 0);
});

Coroutine::create([lock](void *) {
ASSERT_EQ(lock->lock(), 0);
System::sleep(1);
ASSERT_EQ(lock->unlock(), 0);
});

Coroutine::create([lock](void *) { ASSERT_EQ(lock->trylock(), EBUSY); });
});
};

std::thread t1(callback);
t1.join();
delete lock;
}

#ifdef HAVE_RWLOCK
TEST(lock, rwlock_shared) {
RWLock lock(Mutex::PROCESS_SHARED);
Expand Down
5 changes: 5 additions & 0 deletions ext-src/swoole_lock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ using swoole::SpinLock;
#ifdef HAVE_RWLOCK
using swoole::RWLock;
#endif
using swoole::CoroutineLock;

static zend_class_entry *swoole_lock_ce;
static zend_object_handlers swoole_lock_handlers;
Expand Down Expand Up @@ -127,6 +128,7 @@ void php_swoole_lock_minit(int module_number) {
#ifdef HAVE_SPINLOCK
SW_REGISTER_LONG_CONSTANT("SWOOLE_SPINLOCK", Lock::SPIN_LOCK);
#endif
SW_REGISTER_LONG_CONSTANT("SWOOLE_COROLOCK", Lock::COROUTINE_LOCK);
}

static PHP_METHOD(swoole_lock, __construct) {
Expand Down Expand Up @@ -158,6 +160,9 @@ static PHP_METHOD(swoole_lock, __construct) {
case Lock::MUTEX:
lock = new Mutex(Mutex::PROCESS_SHARED);
break;
case Lock::COROUTINE_LOCK:
lock = new CoroutineLock();
break;
default:
zend_throw_exception(swoole_exception_ce, "lock type[%d] is not support", type);
RETURN_FALSE;
Expand Down
7 changes: 5 additions & 2 deletions ext-src/swoole_thread_lock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ using swoole::SpinLock;
#ifdef HAVE_RWLOCK
using swoole::RWLock;
#endif
using swoole::CoroutineLock;

zend_class_entry *swoole_thread_lock_ce;
static zend_object_handlers swoole_thread_lock_handlers;
Expand All @@ -51,6 +52,9 @@ struct LockResource : public ThreadResource {
lock_ = new RWLock(0);
break;
#endif
case Lock::COROUTINE_LOCK:
lock_ = new CoroutineLock();
break;
case Lock::MUTEX:
default:
lock_ = new Mutex(0);
Expand Down Expand Up @@ -143,8 +147,7 @@ void php_swoole_thread_lock_minit(int module_number) {
swoole_thread_lock_ce->ce_flags |= ZEND_ACC_FINAL | ZEND_ACC_NOT_SERIALIZABLE;
SW_SET_CLASS_CLONEABLE(swoole_thread_lock, sw_zend_class_clone_deny);
SW_SET_CLASS_UNSET_PROPERTY_HANDLER(swoole_thread_lock, sw_zend_class_unset_property_deny);
SW_SET_CLASS_CUSTOM_OBJECT(
swoole_thread_lock, lock_create_object, lock_free_object, LockObject, std);
SW_SET_CLASS_CUSTOM_OBJECT(swoole_thread_lock, lock_create_object, lock_free_object, LockObject, std);

zend_declare_class_constant_long(swoole_thread_lock_ce, ZEND_STRL("MUTEX"), Lock::MUTEX);
#ifdef HAVE_RWLOCK
Expand Down
10 changes: 10 additions & 0 deletions include/swoole_iouring.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
#ifdef SW_USE_IOURING
#include <liburing.h>

#ifdef HAVE_IOURING_FUTEX
#ifndef FUTEX2_SIZE_U32
#define FUTEX2_SIZE_U32 0x02
#endif
#endif

using swoole::Coroutine;

enum swIouringFlag {
Expand Down Expand Up @@ -82,6 +88,10 @@ class Iouring {
static int rmdir(const char *pathname);
static int fsync(int fd);
static int fdatasync(int fd);
#ifdef HAVE_IOURING_FUTEX
static int futex_wait(uint32_t *futex);
static int futex_wakeup(uint32_t *futex);
#endif

static int callback(Reactor *reactor, Event *event);
};
Expand Down
20 changes: 19 additions & 1 deletion include/swoole_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Lock {
RW_LOCK = 1,
MUTEX = 3,
SPIN_LOCK = 5,
ATOMIC_LOCK = 6,
COROUTINE_LOCK = 6,
};
Type get_type() {
return type_;
Expand Down Expand Up @@ -106,6 +106,24 @@ class SpinLock : public Lock {
};
#endif

class CoroutineLock : public Lock {
private:
long cid = 0;
sw_atomic_t *value = nullptr;
void *coroutine = nullptr;

int lock_impl(bool blocking = true);

public:
CoroutineLock();
~CoroutineLock();
int lock_rd() override;
int lock() override;
int unlock() override;
int trylock_rd() override;
int trylock() override;
};

#if defined(HAVE_PTHREAD_BARRIER) && !(defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__))
#define SW_USE_PTHREAD_BARRIER
#endif
Expand Down
68 changes: 60 additions & 8 deletions src/coroutine/iouring.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ enum IouringOpcode {
SW_IORING_OP_WRITE = IORING_OP_WRITE,
SW_IORING_OP_RENAMEAT = IORING_OP_RENAMEAT,
SW_IORING_OP_MKDIRAT = IORING_OP_MKDIRAT,
#ifdef HAVE_IOURING_FUTEX
SW_IORING_OP_FUTEX_WAIT = IORING_OP_FUTEX_WAIT,
SW_IORING_OP_FUTEX_WAKE = IORING_OP_FUTEX_WAKE,
#endif

SW_IORING_OP_FSTAT = 1000,
SW_IORING_OP_LSTAT = 1001,
Expand All @@ -46,14 +50,21 @@ struct IouringEvent {
Coroutine *coroutine;
int fd;
int flags;
mode_t mode;
size_t size;
union {
mode_t mode;
size_t size;
};
ssize_t result;
void *rbuf;
const void *wbuf;
const char *pathname;
const char *pathname2;
struct statx *statxbuf;
union {
void *rbuf;
const void *wbuf;
struct statx *statxbuf;
const char *pathname2;
#ifdef HAVE_IOURING_FUTEX
uint32_t *futex;
#endif
};
};

Iouring::Iouring(Reactor *_reactor) {
Expand Down Expand Up @@ -206,6 +217,12 @@ static const char *get_opcode_name(IouringOpcode opcode) {
return "FSYNC";
case SW_IORING_OP_FDATASYNC:
return "FDATASYNC";
#ifdef HAVE_IOURING_FUTEX
case SW_IORING_OP_FUTEX_WAIT:
return "FUTEX_WAIT";
case SW_IORING_OP_FUTEX_WAKE:
return "FUTEX_WAKE";
#endif
default:
return "unknown";
}
Expand Down Expand Up @@ -290,7 +307,6 @@ bool Iouring::dispatch(IouringEvent *event) {
sqe->fd = AT_FDCWD;
sqe->statx_flags |= AT_SYMLINK_NOFOLLOW;
}
// sqe->len = 0xFFF;
sqe->opcode = SW_IORING_OP_STATX;
sqe->off = (uintptr_t) event->statxbuf;
break;
Expand Down Expand Up @@ -320,7 +336,7 @@ bool Iouring::dispatch(IouringEvent *event) {
case SW_IORING_OP_FSYNC:
case SW_IORING_OP_FDATASYNC:
sqe->fd = event->fd;
sqe->addr = (unsigned long) nullptr;
sqe->addr = (uintptr_t) nullptr;
sqe->opcode = IORING_OP_FSYNC;
sqe->len = 0;
sqe->off = 0;
Expand All @@ -329,6 +345,26 @@ bool Iouring::dispatch(IouringEvent *event) {
sqe->fsync_flags = IORING_FSYNC_DATASYNC;
}
break;
#ifdef HAVE_IOURING_FUTEX
case SW_IORING_OP_FUTEX_WAIT:
sqe->opcode = SW_IORING_OP_FUTEX_WAIT;
sqe->fd = FUTEX2_SIZE_U32;
sqe->off = 1;
sqe->addr = (uintptr_t) event->futex;
sqe->len = 0;
sqe->futex_flags = 0;
sqe->addr3 = FUTEX_BITSET_MATCH_ANY;
break;
case SW_IORING_OP_FUTEX_WAKE:
sqe->opcode = SW_IORING_OP_FUTEX_WAKE;
sqe->fd = FUTEX2_SIZE_U32;
sqe->off = 1;
sqe->addr = (uintptr_t) event->futex;
sqe->len = 0;
sqe->futex_flags = 0;
sqe->addr3 = FUTEX_BITSET_MATCH_ANY;
break;
#endif
default:
abort();
return false;
Expand Down Expand Up @@ -465,6 +501,22 @@ int Iouring::stat(const char *path, struct stat *statbuf) {
return retval;
}

#ifdef HAVE_IOURING_FUTEX
int Iouring::futex_wait(uint32_t *futex) {
INIT_EVENT(SW_IORING_OP_FUTEX_WAIT);
event.futex = futex;

return execute(&event);
}

int Iouring::futex_wakeup(uint32_t *futex) {
INIT_EVENT(SW_IORING_OP_FUTEX_WAKE);
event.futex = futex;

return execute(&event);
}
#endif

int Iouring::callback(Reactor *reactor, Event *event) {
Iouring *iouring = static_cast<Iouring *>(event->socket->object);
return iouring->wakeup() ? SW_OK : SW_ERR;
Expand Down
Loading

0 comments on commit a38920d

Please sign in to comment.