Skip to content

Commit

Permalink
Add Coroutine::join() (#4406)
Browse files Browse the repository at this point in the history
* Co::join()

* Optimize code

* Add tests, Fix bugs

* Fix core tests
  • Loading branch information
matyhtf committed Sep 15, 2021
1 parent e648f7c commit 6345fe8
Show file tree
Hide file tree
Showing 12 changed files with 296 additions and 97 deletions.
12 changes: 12 additions & 0 deletions core-tests/src/reactor/base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,15 @@ TEST(reactor, add_or_update) {

swoole_event_free();
}

TEST(reactor, defer_task) {
swoole_event_init(SW_EVENTLOOP_WAIT_EXIT);
Reactor *reactor = sw_reactor();
ASSERT_EQ(reactor->max_event_num, SW_REACTOR_MAXEVENTS);

int count = 0;
reactor->defer([&count](void *) { count++; });
swoole_event_wait();
ASSERT_EQ(count, 1);
swoole_event_free();
}
67 changes: 0 additions & 67 deletions core-tests/src/reactor/defer_task.cpp

This file was deleted.

36 changes: 36 additions & 0 deletions examples/coroutine/join.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

use function Swoole\Coroutine\run;
use function Swoole\Coroutine\go;
use Swoole\Coroutine;
use Swoole\Coroutine\System;

//run(function () {
// $cid_list = [];
// for ($i = 0; $i < 10; $i++) {
// $cid_list[] = go(function () use ($i) {
// System::sleep(.3);
// echo "hello $i\n";
// });
// }
//
// Coroutine::join($cid_list);
//
// echo "all done\n";
//});


run(function () {
$result = [];
Coroutine::join([
go(function () use (&$result) {
$result['baidu'] = file_get_contents("https://www.baidu.com/");
}),
go(function () use (&$result) {
$result['taobao'] = file_get_contents("https://www.taobao.com/");
})
]);

echo "all done\n";
var_dump($result);
});
5 changes: 5 additions & 0 deletions ext-src/php_swoole_coroutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ struct Function;
namespace swoole {

struct PHPContext {
typedef std::function<void(PHPContext *)> SwapCallback;

JMP_BUF *bailout;
zval *vm_stack_top;
zval *vm_stack_end;
Expand All @@ -69,6 +71,9 @@ struct PHPContext {
int tmp_error_reporting;
Coroutine *co;
std::stack<zend::Function *> *defer_tasks;
SwapCallback *on_yield;
SwapCallback *on_resume;
SwapCallback *on_close;
long pcid;
zend_object *context;
int64_t last_msec;
Expand Down
102 changes: 93 additions & 9 deletions ext-src/swoole_coroutine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ using swoole::PHPContext;
using swoole::PHPCoroutine;
using swoole::coroutine::Socket;
using swoole::coroutine::System;
using swoole::CallbackManager;

#define PHP_CORO_TASK_SLOT \
((int) ((ZEND_MM_ALIGNED_SIZE(sizeof(PHPContext)) + ZEND_MM_ALIGNED_SIZE(sizeof(zval)) - 1) / \
Expand Down Expand Up @@ -94,6 +95,7 @@ SW_EXTERN_C_BEGIN
static PHP_METHOD(swoole_coroutine, exists);
static PHP_METHOD(swoole_coroutine, yield);
static PHP_METHOD(swoole_coroutine, resume);
static PHP_METHOD(swoole_coroutine, join);
static PHP_METHOD(swoole_coroutine, cancel);
static PHP_METHOD(swoole_coroutine, isCanceled);
static PHP_METHOD(swoole_coroutine, stats);
Expand Down Expand Up @@ -130,6 +132,11 @@ ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_coroutine_resume, 0, 0, 1)
ZEND_ARG_INFO(0, cid)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_coroutine_join, 0, 0, 1)
ZEND_ARG_INFO(0, cid_array)
ZEND_ARG_INFO(0, timeout)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_swoole_coroutine_exists, 0, 0, 1)
ZEND_ARG_INFO(0, cid)
ZEND_END_ARG_INFO()
Expand Down Expand Up @@ -178,6 +185,7 @@ static const zend_function_entry swoole_coroutine_methods[] =
PHP_ME(swoole_coroutine, exists, arginfo_swoole_coroutine_exists, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
PHP_ME(swoole_coroutine, yield, arginfo_swoole_coroutine_void, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
PHP_ME(swoole_coroutine, cancel, arginfo_swoole_coroutine_cancel, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
PHP_ME(swoole_coroutine, join, arginfo_swoole_coroutine_join, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
PHP_ME(swoole_coroutine, isCanceled, arginfo_swoole_coroutine_void, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
PHP_MALIAS(swoole_coroutine, suspend, yield, arginfo_swoole_coroutine_void, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
PHP_ME(swoole_coroutine, resume, arginfo_swoole_coroutine_resume, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC)
Expand Down Expand Up @@ -590,10 +598,14 @@ void PHPCoroutine::restore_task(PHPContext *task) {
void PHPCoroutine::on_yield(void *arg) {
PHPContext *task = (PHPContext *) arg;
PHPContext *origin_task = get_origin_context(task);
swoole_trace_log(
SW_TRACE_COROUTINE, "php_coro_yield from cid=%ld to cid=%ld", task->co->get_cid(), task->co->get_origin_cid());
save_task(task);
restore_task(origin_task);

if (task->on_yield) {
(*task->on_yield)(task);
}

swoole_trace_log(SW_TRACE_COROUTINE, "from cid=%ld to cid=%ld", task->co->get_cid(), task->co->get_origin_cid());
}

void PHPCoroutine::on_resume(void *arg) {
Expand All @@ -602,10 +614,12 @@ void PHPCoroutine::on_resume(void *arg) {
save_task(current_task);
restore_task(task);
record_last_msec(task);
swoole_trace_log(SW_TRACE_COROUTINE,
"php_coro_resume from cid=%ld to cid=%ld",
Coroutine::get_current_cid(),
task->co->get_cid());

if (task->on_resume) {
(*task->on_resume)(task);
}

swoole_trace_log(SW_TRACE_COROUTINE, "from cid=%ld to cid=%ld", Coroutine::get_current_cid(), task->co->get_cid());
}

void PHPCoroutine::on_close(void *arg) {
Expand Down Expand Up @@ -638,9 +652,14 @@ void PHPCoroutine::on_close(void *arg) {
}
#endif

if (task->on_close) {
(*task->on_close)(task);
}

if (SwooleG.max_concurrency > 0 && task->pcid == -1) {
SwooleWG.worker_concurrency--;
}

vm_stack_destroy();
restore_task(origin_task);

Expand All @@ -657,7 +676,6 @@ void PHPCoroutine::main_func(void *arg) {
#ifdef SW_CORO_SUPPORT_BAILOUT
zend_first_try {
#endif
int i;
Args *php_arg = (Args *) arg;
zend_fcall_info_cache fci_cache = *php_arg->fci_cache;
zend_function *func = fci_cache.function_handler;
Expand Down Expand Up @@ -694,7 +712,7 @@ void PHPCoroutine::main_func(void *arg) {
} while (0);
#endif

for (i = 0; i < argc; ++i) {
SW_LOOP_N(argc) {
zval *param;
zval *arg = &argv[i];
if (Z_ISREF_P(arg) && !(func->common.fn_flags & ZEND_ACC_CALL_VIA_TRAMPOLINE)) {
Expand Down Expand Up @@ -736,6 +754,9 @@ void PHPCoroutine::main_func(void *arg) {
task->defer_tasks = nullptr;
task->pcid = task->co->get_origin_cid();
task->context = nullptr;
task->on_yield = nullptr;
task->on_resume = nullptr;
task->on_close = nullptr;
task->enable_scheduler = true;

save_vm_stack(task);
Expand Down Expand Up @@ -1108,7 +1129,7 @@ static PHP_METHOD(swoole_coroutine, resume) {

auto coroutine_iterator = user_yield_coros.find(cid);
if (coroutine_iterator == user_yield_coros.end()) {
php_swoole_fatal_error(E_WARNING, "you can not resume the coroutine which is in IO operation or non-existent");
php_swoole_fatal_error(E_WARNING, "can not resume the coroutine which is in IO operation or non-existent");
RETURN_FALSE;
}

Expand Down Expand Up @@ -1137,6 +1158,69 @@ static PHP_METHOD(swoole_coroutine, yield) {
RETURN_TRUE;
}

static PHP_METHOD(swoole_coroutine, join) {
Coroutine *co = Coroutine::get_current_safe();
zval *cid_array;
double timeout = -1;

ZEND_PARSE_PARAMETERS_START(1, 2)
Z_PARAM_ARRAY(cid_array)
Z_PARAM_OPTIONAL
Z_PARAM_DOUBLE(timeout)
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

auto count = php_swoole_array_length(cid_array);
if (count == 0) {
swoole_set_last_error(SW_ERROR_INVALID_PARAMS);
RETURN_FALSE;
}

bool *canceled = new bool(false);
PHPContext::SwapCallback join_fn = [&count, canceled, co](PHPContext *task) {
if (--count > 0) {
return;
}
swoole_event_defer([co, canceled](void*) {
if (*canceled == false) {
co->resume();
}
delete canceled;
}, nullptr);
};

zval *zcid;
ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(cid_array), zcid) {
long cid = zval_get_long(zcid);
if (co->get_cid() == cid) {
swoole_set_last_error(SW_ERROR_WRONG_OPERATION);
php_swoole_error(E_WARNING, "can not join self");
RETURN_FALSE;
}
auto ctx = PHPCoroutine::get_context_by_cid(cid);
if (ctx == nullptr) {
swoole_set_last_error(SW_ERROR_CO_NOT_EXISTS);
RETURN_FALSE;
}
ctx->on_close = &join_fn;
}
ZEND_HASH_FOREACH_END();

if (!co->yield_ex(timeout)) {
*canceled = true;
ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(cid_array), zcid) {
long cid = zval_get_long(zcid);
auto ctx = PHPCoroutine::get_context_by_cid(cid);
if (ctx) {
ctx->on_close = nullptr;
}
}
ZEND_HASH_FOREACH_END();
RETURN_FALSE;
}

RETURN_TRUE;
}

static PHP_METHOD(swoole_coroutine, cancel) {
long cid;
if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &cid) == FAILURE) {
Expand Down
8 changes: 0 additions & 8 deletions include/swoole_reactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ class CallbackManager {
list_.emplace_front(fn, private_data);
auto t = list_.back();
}
TaskList::iterator back_position() {
return std::prev(list_.end());
}
TaskList::iterator front_position() {
return list_.begin();
}
void remove(TaskList::iterator iter) {
list_.erase(iter);
}
Expand Down Expand Up @@ -194,8 +188,6 @@ class Reactor {
~Reactor();
bool if_exit();
void defer(Callback cb, void *data = nullptr);
CallbackManager::TaskList::iterator get_last_defer_task();
void remove_defer_task(CallbackManager::TaskList::iterator iter);
void set_end_callback(enum EndCallback id, const std::function<void(Reactor *)> &fn);
void set_exit_condition(enum ExitCondition id, const std::function<bool(Reactor *, size_t &)> &fn);
bool set_handler(int _fdtype, ReactorHandler handler);
Expand Down
8 changes: 3 additions & 5 deletions src/coroutine/system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,10 @@ void System::clear_dns_cache() {
}

static void sleep_callback(Coroutine *co, bool *canceled) {
bool _canceled = *canceled;
delete canceled;
if (_canceled) {
return;
if (*canceled == false) {
co->resume();
}
co->resume();
delete canceled;
}

int System::sleep(double sec) {
Expand Down
8 changes: 0 additions & 8 deletions src/reactor/base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -394,14 +394,6 @@ void Reactor::defer(Callback cb, void *data) {
defer_tasks->append(cb, data);
}

CallbackManager::TaskList::iterator Reactor::get_last_defer_task() {
return defer_tasks->back_position();
}

void Reactor::remove_defer_task(CallbackManager::TaskList::iterator iter) {
defer_tasks->remove(iter);
}

void Reactor::execute_end_callbacks(bool timedout) {
for (auto &kv : end_callbacks) {
kv.second(this);
Expand Down
Loading

0 comments on commit 6345fe8

Please sign in to comment.