From 6345fe88ef29546403bc6eb70505fb573f9a7d71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=A9=E5=A4=A9=E5=B3=B0-Rango?= Date: Wed, 15 Sep 2021 09:56:50 +0800 Subject: [PATCH] Add Coroutine::join() (#4406) * Co::join() * Optimize code * Add tests, Fix bugs * Fix core tests --- core-tests/src/reactor/base.cpp | 12 +++ core-tests/src/reactor/defer_task.cpp | 67 ----------------- examples/coroutine/join.php | 36 +++++++++ ext-src/php_swoole_coroutine.h | 5 ++ ext-src/swoole_coroutine.cc | 102 +++++++++++++++++++++++--- include/swoole_reactor.h | 8 -- src/coroutine/system.cc | 8 +- src/reactor/base.cc | 8 -- tests/swoole_coroutine/join/1.phpt | 30 ++++++++ tests/swoole_coroutine/join/2.phpt | 35 +++++++++ tests/swoole_coroutine/join/3.phpt | 49 +++++++++++++ tests/swoole_coroutine/join/4.phpt | 33 +++++++++ 12 files changed, 296 insertions(+), 97 deletions(-) delete mode 100644 core-tests/src/reactor/defer_task.cpp create mode 100644 examples/coroutine/join.php create mode 100644 tests/swoole_coroutine/join/1.phpt create mode 100644 tests/swoole_coroutine/join/2.phpt create mode 100644 tests/swoole_coroutine/join/3.phpt create mode 100644 tests/swoole_coroutine/join/4.phpt diff --git a/core-tests/src/reactor/base.cpp b/core-tests/src/reactor/base.cpp index 8395cbd25ef..d753c544253 100644 --- a/core-tests/src/reactor/base.cpp +++ b/core-tests/src/reactor/base.cpp @@ -257,3 +257,15 @@ TEST(reactor, add_or_update) { swoole_event_free(); } + +TEST(reactor, defer_task) { + swoole_event_init(SW_EVENTLOOP_WAIT_EXIT); + Reactor *reactor = sw_reactor(); + ASSERT_EQ(reactor->max_event_num, SW_REACTOR_MAXEVENTS); + + int count = 0; + reactor->defer([&count](void *) { count++; }); + swoole_event_wait(); + ASSERT_EQ(count, 1); + swoole_event_free(); +} diff --git a/core-tests/src/reactor/defer_task.cpp b/core-tests/src/reactor/defer_task.cpp deleted file mode 100644 index 7fc823daa32..00000000000 --- a/core-tests/src/reactor/defer_task.cpp +++ /dev/null @@ -1,67 +0,0 @@ -/* - +----------------------------------------------------------------------+ - | Swoole | - +----------------------------------------------------------------------+ - | This source file is subject to version 2.0 of the Apache license, | - | that is bundled with this package in the file LICENSE, and is | - | available through the world-wide-web at the following url: | - | http://www.apache.org/licenses/LICENSE-2.0.html | - | If you did not receive a copy of the Apache2.0 license and are unable| - | to obtain it through the world-wide-web, please send a note to | - | license@swoole.com so we can mail you a copy immediately. | - +----------------------------------------------------------------------+ - | @link https://www.swoole.com/ | - | @contact team@swoole.com | - | @license https://github.com/swoole/swoole-src/blob/master/LICENSE | - | @author Tianfeng Han | - +----------------------------------------------------------------------+ -*/ - -#include "test_core.h" -#include "swoole_pipe.h" - -using namespace swoole; - -TEST(defer_task, defer) { - swoole_event_init(SW_EVENTLOOP_WAIT_EXIT); - Reactor *reactor = sw_reactor(); - ASSERT_EQ(reactor->max_event_num, SW_REACTOR_MAXEVENTS); - - int count = 0; - reactor->defer([&count](void *) { count++; }); - swoole_event_wait(); - ASSERT_EQ(count, 1); - swoole_event_free(); -} - -TEST(defer_task, cancel_1) { - swoole_event_init(SW_EVENTLOOP_WAIT_EXIT); - Reactor *reactor = sw_reactor(); - ASSERT_EQ(reactor->max_event_num, SW_REACTOR_MAXEVENTS); - - int count = 0; - reactor->defer([&count](void *) { count += 2; }); - auto iter = reactor->get_last_defer_task(); - reactor->remove_defer_task(iter); - - reactor->defer([&count](void *) { count += 5; }); - - swoole_event_wait(); - ASSERT_EQ(count, 5); - swoole_event_free(); -} - -TEST(defer_task, cancel_2) { - swoole_event_init(SW_EVENTLOOP_WAIT_EXIT); - Reactor *reactor = sw_reactor(); - ASSERT_EQ(reactor->max_event_num, SW_REACTOR_MAXEVENTS); - - int count = 0; - reactor->defer([&count](void *) { count += 2; }); - auto iter = reactor->get_last_defer_task(); - reactor->remove_defer_task(iter); - - swoole_event_wait(); - ASSERT_EQ(count, 0); - swoole_event_free(); -} diff --git a/examples/coroutine/join.php b/examples/coroutine/join.php new file mode 100644 index 00000000000..4431e7e6af0 --- /dev/null +++ b/examples/coroutine/join.php @@ -0,0 +1,36 @@ + SwapCallback; + JMP_BUF *bailout; zval *vm_stack_top; zval *vm_stack_end; @@ -69,6 +71,9 @@ struct PHPContext { int tmp_error_reporting; Coroutine *co; std::stack *defer_tasks; + SwapCallback *on_yield; + SwapCallback *on_resume; + SwapCallback *on_close; long pcid; zend_object *context; int64_t last_msec; diff --git a/ext-src/swoole_coroutine.cc b/ext-src/swoole_coroutine.cc index e6b2ff380ad..378dec25a71 100644 --- a/ext-src/swoole_coroutine.cc +++ b/ext-src/swoole_coroutine.cc @@ -35,6 +35,7 @@ using swoole::PHPContext; using swoole::PHPCoroutine; using swoole::coroutine::Socket; using swoole::coroutine::System; +using swoole::CallbackManager; #define PHP_CORO_TASK_SLOT \ ((int) ((ZEND_MM_ALIGNED_SIZE(sizeof(PHPContext)) + ZEND_MM_ALIGNED_SIZE(sizeof(zval)) - 1) / \ @@ -94,6 +95,7 @@ SW_EXTERN_C_BEGIN static PHP_METHOD(swoole_coroutine, exists); static PHP_METHOD(swoole_coroutine, yield); static PHP_METHOD(swoole_coroutine, resume); +static PHP_METHOD(swoole_coroutine, join); static PHP_METHOD(swoole_coroutine, cancel); static PHP_METHOD(swoole_coroutine, isCanceled); static PHP_METHOD(swoole_coroutine, stats); @@ -130,6 +132,11 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_coroutine_resume, 0, 0, 1) ZEND_ARG_INFO(0, cid) ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_coroutine_join, 0, 0, 1) + ZEND_ARG_INFO(0, cid_array) + ZEND_ARG_INFO(0, timeout) +ZEND_END_ARG_INFO() + ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_coroutine_exists, 0, 0, 1) ZEND_ARG_INFO(0, cid) ZEND_END_ARG_INFO() @@ -178,6 +185,7 @@ static const zend_function_entry swoole_coroutine_methods[] = PHP_ME(swoole_coroutine, exists, arginfo_swoole_coroutine_exists, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_ME(swoole_coroutine, yield, arginfo_swoole_coroutine_void, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_ME(swoole_coroutine, cancel, arginfo_swoole_coroutine_cancel, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) + PHP_ME(swoole_coroutine, join, arginfo_swoole_coroutine_join, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_ME(swoole_coroutine, isCanceled, arginfo_swoole_coroutine_void, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_MALIAS(swoole_coroutine, suspend, yield, arginfo_swoole_coroutine_void, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_ME(swoole_coroutine, resume, arginfo_swoole_coroutine_resume, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) @@ -590,10 +598,14 @@ void PHPCoroutine::restore_task(PHPContext *task) { void PHPCoroutine::on_yield(void *arg) { PHPContext *task = (PHPContext *) arg; PHPContext *origin_task = get_origin_context(task); - swoole_trace_log( - SW_TRACE_COROUTINE, "php_coro_yield from cid=%ld to cid=%ld", task->co->get_cid(), task->co->get_origin_cid()); save_task(task); restore_task(origin_task); + + if (task->on_yield) { + (*task->on_yield)(task); + } + + swoole_trace_log(SW_TRACE_COROUTINE, "from cid=%ld to cid=%ld", task->co->get_cid(), task->co->get_origin_cid()); } void PHPCoroutine::on_resume(void *arg) { @@ -602,10 +614,12 @@ void PHPCoroutine::on_resume(void *arg) { save_task(current_task); restore_task(task); record_last_msec(task); - swoole_trace_log(SW_TRACE_COROUTINE, - "php_coro_resume from cid=%ld to cid=%ld", - Coroutine::get_current_cid(), - task->co->get_cid()); + + if (task->on_resume) { + (*task->on_resume)(task); + } + + swoole_trace_log(SW_TRACE_COROUTINE, "from cid=%ld to cid=%ld", Coroutine::get_current_cid(), task->co->get_cid()); } void PHPCoroutine::on_close(void *arg) { @@ -638,9 +652,14 @@ void PHPCoroutine::on_close(void *arg) { } #endif + if (task->on_close) { + (*task->on_close)(task); + } + if (SwooleG.max_concurrency > 0 && task->pcid == -1) { SwooleWG.worker_concurrency--; } + vm_stack_destroy(); restore_task(origin_task); @@ -657,7 +676,6 @@ void PHPCoroutine::main_func(void *arg) { #ifdef SW_CORO_SUPPORT_BAILOUT zend_first_try { #endif - int i; Args *php_arg = (Args *) arg; zend_fcall_info_cache fci_cache = *php_arg->fci_cache; zend_function *func = fci_cache.function_handler; @@ -694,7 +712,7 @@ void PHPCoroutine::main_func(void *arg) { } while (0); #endif - for (i = 0; i < argc; ++i) { + SW_LOOP_N(argc) { zval *param; zval *arg = &argv[i]; if (Z_ISREF_P(arg) && !(func->common.fn_flags & ZEND_ACC_CALL_VIA_TRAMPOLINE)) { @@ -736,6 +754,9 @@ void PHPCoroutine::main_func(void *arg) { task->defer_tasks = nullptr; task->pcid = task->co->get_origin_cid(); task->context = nullptr; + task->on_yield = nullptr; + task->on_resume = nullptr; + task->on_close = nullptr; task->enable_scheduler = true; save_vm_stack(task); @@ -1108,7 +1129,7 @@ static PHP_METHOD(swoole_coroutine, resume) { auto coroutine_iterator = user_yield_coros.find(cid); if (coroutine_iterator == user_yield_coros.end()) { - php_swoole_fatal_error(E_WARNING, "you can not resume the coroutine which is in IO operation or non-existent"); + php_swoole_fatal_error(E_WARNING, "can not resume the coroutine which is in IO operation or non-existent"); RETURN_FALSE; } @@ -1137,6 +1158,69 @@ static PHP_METHOD(swoole_coroutine, yield) { RETURN_TRUE; } +static PHP_METHOD(swoole_coroutine, join) { + Coroutine *co = Coroutine::get_current_safe(); + zval *cid_array; + double timeout = -1; + + ZEND_PARSE_PARAMETERS_START(1, 2) + Z_PARAM_ARRAY(cid_array) + Z_PARAM_OPTIONAL + Z_PARAM_DOUBLE(timeout) + ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE); + + auto count = php_swoole_array_length(cid_array); + if (count == 0) { + swoole_set_last_error(SW_ERROR_INVALID_PARAMS); + RETURN_FALSE; + } + + bool *canceled = new bool(false); + PHPContext::SwapCallback join_fn = [&count, canceled, co](PHPContext *task) { + if (--count > 0) { + return; + } + swoole_event_defer([co, canceled](void*) { + if (*canceled == false) { + co->resume(); + } + delete canceled; + }, nullptr); + }; + + zval *zcid; + ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(cid_array), zcid) { + long cid = zval_get_long(zcid); + if (co->get_cid() == cid) { + swoole_set_last_error(SW_ERROR_WRONG_OPERATION); + php_swoole_error(E_WARNING, "can not join self"); + RETURN_FALSE; + } + auto ctx = PHPCoroutine::get_context_by_cid(cid); + if (ctx == nullptr) { + swoole_set_last_error(SW_ERROR_CO_NOT_EXISTS); + RETURN_FALSE; + } + ctx->on_close = &join_fn; + } + ZEND_HASH_FOREACH_END(); + + if (!co->yield_ex(timeout)) { + *canceled = true; + ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(cid_array), zcid) { + long cid = zval_get_long(zcid); + auto ctx = PHPCoroutine::get_context_by_cid(cid); + if (ctx) { + ctx->on_close = nullptr; + } + } + ZEND_HASH_FOREACH_END(); + RETURN_FALSE; + } + + RETURN_TRUE; +} + static PHP_METHOD(swoole_coroutine, cancel) { long cid; if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &cid) == FAILURE) { diff --git a/include/swoole_reactor.h b/include/swoole_reactor.h index 131aa1bc70e..17340a2a2ac 100644 --- a/include/swoole_reactor.h +++ b/include/swoole_reactor.h @@ -60,12 +60,6 @@ class CallbackManager { list_.emplace_front(fn, private_data); auto t = list_.back(); } - TaskList::iterator back_position() { - return std::prev(list_.end()); - } - TaskList::iterator front_position() { - return list_.begin(); - } void remove(TaskList::iterator iter) { list_.erase(iter); } @@ -194,8 +188,6 @@ class Reactor { ~Reactor(); bool if_exit(); void defer(Callback cb, void *data = nullptr); - CallbackManager::TaskList::iterator get_last_defer_task(); - void remove_defer_task(CallbackManager::TaskList::iterator iter); void set_end_callback(enum EndCallback id, const std::function &fn); void set_exit_condition(enum ExitCondition id, const std::function &fn); bool set_handler(int _fdtype, ReactorHandler handler); diff --git a/src/coroutine/system.cc b/src/coroutine/system.cc index 04a4b562226..a2336a1713e 100644 --- a/src/coroutine/system.cc +++ b/src/coroutine/system.cc @@ -43,12 +43,10 @@ void System::clear_dns_cache() { } static void sleep_callback(Coroutine *co, bool *canceled) { - bool _canceled = *canceled; - delete canceled; - if (_canceled) { - return; + if (*canceled == false) { + co->resume(); } - co->resume(); + delete canceled; } int System::sleep(double sec) { diff --git a/src/reactor/base.cc b/src/reactor/base.cc index 9d6fdaddc06..d4be6cba97e 100644 --- a/src/reactor/base.cc +++ b/src/reactor/base.cc @@ -394,14 +394,6 @@ void Reactor::defer(Callback cb, void *data) { defer_tasks->append(cb, data); } -CallbackManager::TaskList::iterator Reactor::get_last_defer_task() { - return defer_tasks->back_position(); -} - -void Reactor::remove_defer_task(CallbackManager::TaskList::iterator iter) { - defer_tasks->remove(iter); -} - void Reactor::execute_end_callbacks(bool timedout) { for (auto &kv : end_callbacks) { kv.second(this); diff --git a/tests/swoole_coroutine/join/1.phpt b/tests/swoole_coroutine/join/1.phpt new file mode 100644 index 00000000000..3461ff26421 --- /dev/null +++ b/tests/swoole_coroutine/join/1.phpt @@ -0,0 +1,30 @@ +--TEST-- +swoole_coroutine/join: 1 +--SKIPIF-- + +--FILE-- + +--EXPECT-- +ALL DONE diff --git a/tests/swoole_coroutine/join/2.phpt b/tests/swoole_coroutine/join/2.phpt new file mode 100644 index 00000000000..c3dcc76bd04 --- /dev/null +++ b/tests/swoole_coroutine/join/2.phpt @@ -0,0 +1,35 @@ +--TEST-- +swoole_coroutine/join: 2 +--SKIPIF-- + +--FILE-- + +--EXPECT-- +ALL DONE diff --git a/tests/swoole_coroutine/join/3.phpt b/tests/swoole_coroutine/join/3.phpt new file mode 100644 index 00000000000..5e99872f4f4 --- /dev/null +++ b/tests/swoole_coroutine/join/3.phpt @@ -0,0 +1,49 @@ +--TEST-- +swoole_coroutine/join: 3 +--SKIPIF-- + +--FILE-- + +--EXPECT-- diff --git a/tests/swoole_coroutine/join/4.phpt b/tests/swoole_coroutine/join/4.phpt new file mode 100644 index 00000000000..5ab8d152639 --- /dev/null +++ b/tests/swoole_coroutine/join/4.phpt @@ -0,0 +1,33 @@ +--TEST-- +swoole_coroutine/join: 4 +--SKIPIF-- + +--FILE-- + +--EXPECT-- +DEFER CALLBACK +DONE