From 93dbf3bbce10254c0db45e7bc126e9dffe403020 Mon Sep 17 00:00:00 2001 From: hantianfeng Date: Fri, 13 Sep 2024 17:43:40 +0800 Subject: [PATCH] Optimize code --- core-tests/src/server/server.cpp | 15 ++-- ext-src/php_swoole_server.h | 2 +- ext-src/swoole_server.cc | 128 +++++++++---------------------- include/swoole.h | 8 ++ include/swoole_process_pool.h | 1 - include/swoole_server.h | 15 +++- src/os/process_pool.cc | 35 +++------ src/server/master.cc | 46 ++++++----- src/server/task_worker.cc | 60 ++++++++++++++- 9 files changed, 161 insertions(+), 149 deletions(-) diff --git a/core-tests/src/server/server.cpp b/core-tests/src/server/server.cpp index 3d7e06de161..e67b63a90f6 100644 --- a/core-tests/src/server/server.cpp +++ b/core-tests/src/server/server.cpp @@ -545,8 +545,8 @@ TEST(server, task_worker) { exit(2); } - serv.onTask = [](Server *serv, swEventData *task) -> int { - EXPECT_EQ(serv->get_task_count(), 1); + serv.onTask = [](Server *serv, EventData *task) -> int { + EXPECT_EQ(serv->get_tasking_num(), 1); EXPECT_EQ(string(task->data, task->info.len), string(packet)); serv->gs->task_workers.running = 0; return 0; @@ -556,13 +556,14 @@ TEST(server, task_worker) { ASSERT_EQ(serv.create_task_workers(), SW_OK); thread t1([&serv]() { + SwooleWG.run_always = true; serv.gs->task_workers.running = 1; serv.gs->tasking_num++; serv.gs->task_workers.main_loop(&serv.gs->task_workers, &serv.gs->task_workers.workers[0]); serv.gs->tasking_num--; - EXPECT_EQ(serv.get_task_count(), 0); + EXPECT_EQ(serv.get_tasking_num(), 0); serv.gs->tasking_num--; - EXPECT_EQ(serv.get_task_count(), 0); + EXPECT_EQ(serv.get_tasking_num(), 0); EXPECT_EQ(serv.get_idle_task_worker_num(), serv.task_worker_num); }); @@ -581,6 +582,8 @@ TEST(server, task_worker) { t1.join(); serv.gs->task_workers.destroy(); + + ASSERT_EQ(serv.gs->) } // PHP_METHOD(swoole_server, task) @@ -724,7 +727,7 @@ TEST(server, task_worker4) { serv->gs->task_workers.dispatch(&buf, &_dst_worker_id); sleep(1); - EventData *task_result = &(serv->task_result[swoole_get_process_id()]); + EventData *task_result = serv->get_task_result(); sw_memset_zero(task_result, sizeof(*task_result)); memset(&buf.info, 0, sizeof(buf.info)); buf.info.len = strlen(packet); @@ -779,7 +782,7 @@ TEST(server, task_worker5) { if (worker->id == 1) { int _dst_worker_id = 0; - EventData *task_result = &(serv->task_result[worker->id]); + EventData *task_result = &(serv->task_results[worker->id]); sw_memset_zero(task_result, sizeof(*task_result)); File fp = make_tmpfile(); diff --git a/ext-src/php_swoole_server.h b/ext-src/php_swoole_server.h index 4fc2429069a..f96d75305f3 100644 --- a/ext-src/php_swoole_server.h +++ b/ext-src/php_swoole_server.h @@ -124,7 +124,7 @@ struct ServerObject { struct TaskCo { Coroutine *co; - int *list; + TaskId *list; uint32_t count; zval *result; }; diff --git a/ext-src/swoole_server.cc b/ext-src/swoole_server.cc index be52068f298..476cbe692cc 100644 --- a/ext-src/swoole_server.cc +++ b/ext-src/swoole_server.cc @@ -86,7 +86,7 @@ static void php_swoole_server_onManagerStart(Server *serv); static void php_swoole_server_onManagerStop(Server *serv); static int php_swoole_server_task_finish(Server *serv, zval *zdata, EventData *current_task); -static TaskId php_swoole_server_task_pack(EventData *task, zval *data); +static TaskId php_swoole_server_task_pack(zval *data, EventData *task); static bool php_swoole_server_task_unpack(zval *zresult, EventData *task_result); static int php_swoole_server_dispatch_func(Server *serv, Connection *conn, SendData *data); static zval *php_swoole_server_add_port(ServerObject *server_object, ListenPort *port); @@ -655,7 +655,7 @@ int php_swoole_create_dir(const char *path, size_t length) { return php_stream_mkdir(path, 0777, PHP_STREAM_MKDIR_RECURSIVE | REPORT_ERRORS, nullptr) ? 0 : -1; } -static TaskId php_swoole_server_task_pack(EventData *task, zval *zdata) { +static TaskId php_swoole_server_task_pack(zval *zdata, EventData *task) { smart_str serialized_data = {}; php_serialize_data_t var_hash; @@ -2973,7 +2973,8 @@ static PHP_METHOD(swoole_server, stats) { if (serv->task_worker_num > 0) { add_assoc_long_ex(return_value, ZEND_STRL("task_idle_worker_num"), serv->get_idle_task_worker_num()); - add_assoc_long_ex(return_value, ZEND_STRL("tasking_num"), serv->get_task_count()); + add_assoc_long_ex(return_value, ZEND_STRL("tasking_num"), serv->get_tasking_num()); + add_assoc_long_ex(return_value, ZEND_STRL("task_count"), serv->gs->task_count); } add_assoc_long_ex(return_value, ZEND_STRL("coroutine_num"), Coroutine::count()); @@ -3056,9 +3057,6 @@ static PHP_METHOD(swoole_server, taskwait) { RETURN_FALSE; } - EventData buf; - memset(&buf.info, 0, sizeof(buf.info)); - zval *zdata; double timeout = SW_TASKWAIT_TIMEOUT; zend_long dst_worker_id = -1; @@ -3074,11 +3072,11 @@ static PHP_METHOD(swoole_server, taskwait) { RETURN_FALSE; } - if (php_swoole_server_task_pack(&buf, zdata) < 0) { + EventData buf; + if (php_swoole_server_task_pack(zdata, &buf) < 0) { RETURN_FALSE; } - int _dst_worker_id = (int) dst_worker_id; TaskId task_id = serv->get_task_id(&buf); // coroutine @@ -3091,9 +3089,7 @@ static PHP_METHOD(swoole_server, taskwait) { task_co.count = 1; task_co.result = return_value; - sw_atomic_fetch_add(&serv->gs->tasking_num, 1); - if (serv->gs->task_workers.dispatch(&buf, &_dst_worker_id) < 0) { - sw_atomic_fetch_sub(&serv->gs->tasking_num, 1); + if (!serv->task(&buf, (int *) &dst_worker_id)) { RETURN_FALSE; } @@ -3104,49 +3100,19 @@ static PHP_METHOD(swoole_server, taskwait) { if (!retval) { RETURN_FALSE; } - return; - } - - uint64_t notify; - EventData *task_result = &(serv->task_result[swoole_get_process_id()]); - sw_memset_zero(task_result, sizeof(*task_result)); - Pipe *pipe = serv->task_notify_pipes.at(swoole_get_process_id()).get(); - network::Socket *task_notify_socket = pipe->get_socket(false); - - // clear history task - while (task_notify_socket->wait_event(0, SW_EVENT_READ) == SW_OK) { - if (task_notify_socket->read(¬ify, sizeof(notify)) <= 0) { - break; + } else { + auto retval = serv->task_sync(&buf, (int *) &dst_worker_id, timeout); + if (!retval) { + RETURN_FALSE; } - } - - sw_atomic_fetch_add(&serv->gs->tasking_num, 1); - - if (serv->gs->task_workers.dispatch_blocking(&buf, &_dst_worker_id) == SW_OK) { - while (1) { - if (task_notify_socket->wait_event((int) (timeout * 1000), SW_EVENT_READ) != SW_OK) { - break; - } - if (pipe->read(¬ify, sizeof(notify)) > 0) { - if (serv->get_task_id(task_result) != task_id) { - continue; - } - zval zresult; - if (!php_swoole_server_task_unpack(&zresult, task_result)) { - RETURN_FALSE; - } else { - RETURN_ZVAL(&zresult, 0, 0); - } - break; - } else { - php_swoole_sys_error(E_WARNING, "taskwait failed"); - break; - } + zval zresult; + auto task_result = serv->get_task_result(); + if (!php_swoole_server_task_unpack(&zresult, task_result)) { + RETURN_FALSE; + } else { + RETURN_ZVAL(&zresult, 0, 0); } - } else { - sw_atomic_fetch_sub(&serv->gs->tasking_num, 1); } - RETURN_FALSE; } static PHP_METHOD(swoole_server, taskWaitMulti) { @@ -3164,9 +3130,6 @@ static PHP_METHOD(swoole_server, taskWaitMulti) { RETURN_FALSE; } - EventData buf; - memset(&buf.info, 0, sizeof(buf.info)); - zval *ztasks; double timeout = SW_TASKWAIT_TIMEOUT; @@ -3187,10 +3150,10 @@ static PHP_METHOD(swoole_server, taskWaitMulti) { RETURN_FALSE; } - int list_of_id[SW_MAX_CONCURRENT_TASK] = {}; + TaskId list_of_id[SW_MAX_CONCURRENT_TASK] = {}; uint64_t notify; - EventData *task_result = &(serv->task_result[swoole_get_process_id()]); + EventData *task_result = serv->get_task_result(); sw_memset_zero(task_result, sizeof(*task_result)); Pipe *pipe = serv->task_notify_pipes.at(swoole_get_process_id()).get(); Worker *worker = serv->get_worker(swoole_get_process_id()); @@ -3219,7 +3182,9 @@ static PHP_METHOD(swoole_server, taskWaitMulti) { zval *ztask; SW_HASHTABLE_FOREACH_START(Z_ARRVAL_P(ztasks), ztask) - TaskId task_id = php_swoole_server_task_pack(&buf, ztask); + + EventData buf; + TaskId task_id = php_swoole_server_task_pack(ztask, &buf); if (task_id < 0) { php_swoole_fatal_error(E_WARNING, "task pack failed"); goto _fail; @@ -3280,7 +3245,7 @@ static PHP_METHOD(swoole_server, taskWaitMulti) { } (void) add_index_zval(return_value, j, &zresult); _next: - content->offset += sizeof(DataHead) + result->info.len; + content->offset += result->size(); } while (content->offset < 0 || (size_t) content->offset < content->length); // delete tmp file unlink(file_path.c_str()); @@ -3313,9 +3278,6 @@ static PHP_METHOD(swoole_server, taskCo) { int i = 0; uint32_t n_task = php_swoole_array_length(ztasks); - EventData buf; - memset(&buf.info, 0, sizeof(buf.info)); - if (n_task >= SW_MAX_CONCURRENT_TASK) { php_swoole_fatal_error(E_WARNING, "too many concurrent tasks"); RETURN_FALSE; @@ -3325,7 +3287,7 @@ static PHP_METHOD(swoole_server, taskCo) { RETURN_FALSE; } - int *list = (int *) ecalloc(n_task, sizeof(int)); + TaskId *list = (TaskId *) ecalloc(n_task, sizeof(TaskId)); if (list == nullptr) { RETURN_FALSE; } @@ -3337,20 +3299,19 @@ static PHP_METHOD(swoole_server, taskCo) { zval *ztask; SW_HASHTABLE_FOREACH_START(Z_ARRVAL_P(ztasks), ztask) { - task_id = php_swoole_server_task_pack(&buf, ztask); + EventData buf; + task_id = php_swoole_server_task_pack(ztask, &buf); if (task_id < 0) { php_swoole_fatal_error(E_WARNING, "failed to pack task"); goto _fail; } buf.info.ext_flags |= (SW_TASK_NONBLOCK | SW_TASK_COROUTINE); dst_worker_id = -1; - sw_atomic_fetch_add(&serv->gs->tasking_num, 1); - if (serv->gs->task_workers.dispatch(&buf, &dst_worker_id) < 0) { + if (!serv->task(&buf, &dst_worker_id)) { task_id = -1; _fail: add_index_bool(return_value, i, 0); n_task--; - sw_atomic_fetch_sub(&serv->gs->tasking_num, 1); } else { server_object->property->task_coroutine_map[task_id] = &task_co; } @@ -3406,14 +3367,11 @@ static PHP_METHOD(swoole_server, task) { } EventData buf; - memset(&buf.info, 0, sizeof(buf.info)); - - if (php_swoole_server_task_pack(&buf, zdata) < 0) { + TaskId task_id = php_swoole_server_task_pack(zdata, &buf); + if (task_id < 0) { RETURN_FALSE; } - TaskId task_id = serv->get_task_id(&buf); - if (!serv->is_worker()) { buf.info.ext_flags |= SW_TASK_NOREPLY; } else if (fci.size) { @@ -3424,15 +3382,11 @@ static PHP_METHOD(swoole_server, task) { buf.info.ext_flags |= SW_TASK_NONBLOCK; - int _dst_worker_id = (int) dst_worker_id; - sw_atomic_fetch_add(&serv->gs->tasking_num, 1); - - if (serv->gs->task_workers.dispatch(&buf, &_dst_worker_id) >= 0) { + if (serv->task(&buf, (int *) &dst_worker_id)) { RETURN_LONG(task_id); + } else { + RETURN_FALSE; } - - sw_atomic_fetch_sub(&serv->gs->tasking_num, 1); - RETURN_FALSE; } static PHP_METHOD(swoole_server, command) { @@ -3523,17 +3477,11 @@ static PHP_METHOD(swoole_server, sendMessage) { } EventData buf; - memset(&buf.info, 0, sizeof(buf.info)); - - if (php_swoole_server_task_pack(&buf, zmessage) < 0) { + if (php_swoole_server_task_pack(zmessage, &buf) < 0) { RETURN_FALSE; } - buf.info.type = SW_SERVER_EVENT_PIPE_MESSAGE; - - Worker *to_worker = serv->get_worker(worker_id); - SW_CHECK_RETURN(serv->send_to_worker_from_worker( - to_worker, &buf, sizeof(buf.info) + buf.info.len, SW_PIPE_MASTER | SW_PIPE_NONBLOCK)); + RETURN_BOOL(serv->send_pipe_message(worker_id, &buf)); } static PHP_METHOD(swoole_server, finish) { @@ -3576,21 +3524,19 @@ static PHP_METHOD(swoole_server_task, finish) { } static PHP_METHOD(swoole_server_task, pack) { - EventData buf; - memset(&buf.info, 0, sizeof(buf.info)); - zval *zdata; ZEND_PARSE_PARAMETERS_START(1, 1) Z_PARAM_ZVAL(zdata) ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE); - if (php_swoole_server_task_pack(&buf, zdata) < 0) { + EventData buf; + if (php_swoole_server_task_pack(zdata, &buf) < 0) { RETURN_FALSE; } buf.info.ext_flags |= (SW_TASK_NONBLOCK | SW_TASK_NOREPLY); - RETURN_STRINGL((char *) &buf, sizeof(buf.info) + buf.info.len); + RETURN_STRINGL((char *) &buf, buf.size()); } static PHP_METHOD(swoole_server_task, unpack) { diff --git a/include/swoole.h b/include/swoole.h index 3ae29b22107..01a3ec9c1c7 100644 --- a/include/swoole.h +++ b/include/swoole.h @@ -669,6 +669,14 @@ struct DataHead { struct EventData { DataHead info; char data[SW_IPC_BUFFER_SIZE]; + + uint32_t size() { + return sizeof(info) + len(); + } + + uint32_t len() { + return info.len; + } }; struct SendData { diff --git a/include/swoole_process_pool.h b/include/swoole_process_pool.h index 039b63b9a66..0891f27fd0e 100644 --- a/include/swoole_process_pool.h +++ b/include/swoole_process_pool.h @@ -141,7 +141,6 @@ struct Worker { uint8_t msgqueue_mode; uint8_t child_process; - sw_atomic_t tasking_num; uint32_t concurrency; time_t start_time; diff --git a/include/swoole_server.h b/include/swoole_server.h index f8904dba8e2..4585584b9b0 100644 --- a/include/swoole_server.h +++ b/include/swoole_server.h @@ -389,6 +389,7 @@ struct ServerGS { sw_atomic_long_t total_recv_bytes; sw_atomic_long_t total_send_bytes; sw_atomic_long_t pipe_packet_msg_id; + sw_atomic_long_t task_count; sw_atomic_t spinlock; @@ -837,7 +838,7 @@ class Server { uint32_t task_max_request = 0; uint32_t task_max_request_grace = 0; std::vector> task_notify_pipes; - EventData *task_result = nullptr; + EventData *task_results = nullptr; /** * Used for process management, saving the mapping relationship between PID and worker pointers @@ -995,12 +996,16 @@ class Server { int get_idle_worker_num(); int get_idle_task_worker_num(); - int get_task_count(); + int get_tasking_num(); TaskId get_task_id(EventData *task) { return gs->task_workers.get_task_id(task); } + EventData *get_task_result() { + return &(task_results[swoole_get_process_id()]); + } + WorkerId get_task_src_worker_id(EventData *task) { return gs->task_workers.get_task_src_worker_id(task); } @@ -1362,7 +1367,7 @@ class Server { ssize_t send_to_worker_from_worker(Worker *dst_worker, const void *buf, size_t len, int flags); ssize_t send_to_worker_from_worker(WorkerId id, EventData *data, int flags) { - return send_to_worker_from_worker(get_worker(id), data, sizeof(data->info) + data->info.len, flags); + return send_to_worker_from_worker(get_worker(id), data, data->size(), flags); } ssize_t send_to_reactor_thread(const EventData *ev_data, size_t sendn, SessionId session_id); @@ -1381,6 +1386,10 @@ class Server { const std::string &msg, const Command::Callback &fn); + bool task(EventData *task, int *dst_worker_id, bool blocking = false); + bool task_sync(EventData *task, int *dst_worker_id, double timeout); + bool send_pipe_message(WorkerId worker_id, EventData *msg); + void init_reactor(Reactor *reactor); void init_worker(Worker *worker); void init_task_workers(); diff --git a/src/os/process_pool.cc b/src/os/process_pool.cc index 7da0b53f872..f97a38a5708 100644 --- a/src/os/process_pool.cc +++ b/src/os/process_pool.cc @@ -299,7 +299,7 @@ int ProcessPool::response(const char *data, int length) { } int ProcessPool::push_message(EventData *msg) { - if (message_box->push(msg, sizeof(msg->info) + msg->info.len) < 0) { + if (message_box->push(msg, msg->size()) < 0) { return SW_ERR; } return swoole_kill(master_pid, SIGIO); @@ -332,7 +332,6 @@ int ProcessPool::pop_message(void *data, size_t size) { * dispatch data to worker */ int ProcessPool::dispatch(EventData *data, int *dst_worker_id) { - int ret = 0; Worker *worker; if (use_socket) { @@ -341,7 +340,7 @@ int ProcessPool::dispatch(EventData *data, int *dst_worker_id) { return SW_ERR; } stream->response = nullptr; - if (stream->send((char *) data, sizeof(data->info) + data->info.len) < 0) { + if (stream->send((char *) data, data->size()) < 0) { stream->cancel = 1; delete stream; return SW_ERR; @@ -356,16 +355,12 @@ int ProcessPool::dispatch(EventData *data, int *dst_worker_id) { *dst_worker_id += start_id; worker = get_worker(*dst_worker_id); - int sendn = sizeof(data->info) + data->info.len; - ret = worker->send_pipe_message(data, sendn, SW_PIPE_MASTER | SW_PIPE_NONBLOCK); - - if (ret >= 0) { - sw_atomic_fetch_add(&worker->tasking_num, 1); - } else { - swoole_warning("send %d bytes to worker#%d failed", sendn, *dst_worker_id); + if (worker->send_pipe_message(data, data->size(), SW_PIPE_MASTER | SW_PIPE_NONBLOCK) < 0) { + swoole_warning("send %d bytes to worker#%d failed", data->size(), *dst_worker_id); + return SW_ERR; } - return ret; + return SW_OK; } int ProcessPool::dispatch_blocking(const char *data, uint32_t len) { @@ -394,11 +389,8 @@ int ProcessPool::dispatch_blocking(const char *data, uint32_t len) { * @return SW_OK/SW_ERR */ int ProcessPool::dispatch_blocking(EventData *data, int *dst_worker_id) { - int ret = 0; - int sendn = sizeof(data->info) + data->info.len; - if (use_socket) { - return dispatch_blocking((char *) data, sendn); + return dispatch_blocking((char *) data, data->size()); } if (*dst_worker_id < 0) { @@ -408,14 +400,11 @@ int ProcessPool::dispatch_blocking(EventData *data, int *dst_worker_id) { *dst_worker_id += start_id; Worker *worker = get_worker(*dst_worker_id); - ret = worker->send_pipe_message(data, sendn, SW_PIPE_MASTER); - if (ret < 0) { - swoole_warning("send %d bytes to worker#%d failed", sendn, *dst_worker_id); - } else { - sw_atomic_fetch_add(&worker->tasking_num, 1); + if (worker->send_pipe_message(data, data->size(), SW_PIPE_MASTER) < 0) { + swoole_warning("send %d bytes to worker#%d failed", data->size(), *dst_worker_id); + return SW_ERR; } - - return ret > 0 ? SW_OK : SW_ERR; + return SW_OK; } bool ProcessPool::reload() { @@ -589,7 +578,7 @@ static int ProcessPool_worker_loop_with_task_protocol(ProcessPool *pool, Worker continue; } - if (n != (ssize_t) (out.buf.info.len + sizeof(out.buf.info))) { + if (n != (ssize_t) out.buf.size()) { swoole_warning("bad task packet, The received data-length[%ld] is inconsistent with the packet-length[%ld]", n, out.buf.info.len + sizeof(out.buf.info)); diff --git a/src/server/master.cc b/src/server/master.cc index 889ce35776f..06498be4238 100644 --- a/src/server/master.cc +++ b/src/server/master.cc @@ -527,6 +527,24 @@ int Server::create_task_workers() { } } + /* + * For Server::task_sync(), create notify pipe and result shared memory. + */ + task_results = (EventData *) sw_shm_calloc(worker_num, sizeof(EventData)); + if (!task_results) { + swoole_warning("malloc[task_result] failed"); + return SW_ERR; + } + SW_LOOP_N(worker_num) { + auto _pipe = new Pipe(true); + if (!_pipe->ready()) { + sw_shm_free(task_results); + delete _pipe; + return SW_ERR; + } + task_notify_pipes.emplace_back(_pipe); + } + init_task_workers(); return SW_OK; @@ -642,26 +660,6 @@ int Server::start() { gs->event_workers.workers[i].type = SW_PROCESS_WORKER; } - /* - * For swoole_server->taskwait, create notify pipe and result shared memory. - */ - if (task_worker_num > 0 && worker_num > 0) { - task_result = (EventData *) sw_shm_calloc(worker_num, sizeof(EventData)); - if (!task_result) { - swoole_warning("malloc[task_result] failed"); - return SW_ERR; - } - SW_LOOP_N(worker_num) { - auto _pipe = new Pipe(true); - if (!_pipe->ready()) { - sw_shm_free(task_result); - delete _pipe; - return SW_ERR; - } - task_notify_pipes.emplace_back(_pipe); - } - } - if (!user_worker_list.empty()) { uint32_t i = 0; for (auto worker : user_worker_list) { @@ -1607,6 +1605,12 @@ bool Server::close(SessionId session_id, bool reset) { return factory->end(session_id, reset ? (CLOSE_ACTIVELY | CLOSE_RESET) : CLOSE_ACTIVELY); } +bool Server::send_pipe_message(WorkerId worker_id, EventData *msg) { + msg->info.type = SW_SERVER_EVENT_PIPE_MESSAGE; + + return send_to_worker_from_worker(get_worker(worker_id), msg, msg->size(), SW_PIPE_MASTER | SW_PIPE_NONBLOCK) > 0; +} + void Server::init_signal_handler() { swoole_signal_set(SIGPIPE, nullptr); swoole_signal_set(SIGHUP, nullptr); @@ -2077,7 +2081,7 @@ int Server::get_idle_task_worker_num() { return idle_worker_num; } -int Server::get_task_count() { +int Server::get_tasking_num() { // TODO Why need to reset ? int tasking_num = gs->tasking_num; if (tasking_num < 0) { diff --git a/src/server/task_worker.cc b/src/server/task_worker.cc index 079b0695eb8..2e106542f1d 100644 --- a/src/server/task_worker.cc +++ b/src/server/task_worker.cc @@ -107,6 +107,7 @@ static int TaskWorker_onTask(ProcessPool *pool, Worker *worker, EventData *task) } bool Server::task_pack(EventData *task, const void *_data, size_t _length) { + task->info = {}; task->info.type = SW_SERVER_EVENT_TASK; task->info.fd = SwooleG.current_task_id++; task->info.reactor_id = swoole_get_process_id(); @@ -138,6 +139,59 @@ bool Server::task_pack(EventData *task, const void *_data, size_t _length) { return true; } +bool Server::task(EventData *_task, int *dst_worker_id, bool blocking) { + sw_atomic_fetch_add(&gs->tasking_num, 1); + + int retval; + if (blocking) { + retval = gs->task_workers.dispatch_blocking(_task, dst_worker_id); + } else { + retval = gs->task_workers.dispatch(_task, dst_worker_id); + } + + if (retval == SW_OK) { + sw_atomic_fetch_add(&gs->task_count, 1); + return true; + } + + sw_atomic_fetch_sub(&gs->tasking_num, 1); + return false; +} + +bool Server::task_sync(EventData *_task, int *dst_worker_id, double timeout) { + uint64_t notify; + EventData *task_result = get_task_result(); + sw_memset_zero(task_result, sizeof(*task_result)); + Pipe *pipe = task_notify_pipes.at(swoole_get_process_id()).get(); + network::Socket *task_notify_socket = pipe->get_socket(false); + TaskId task_id = get_task_id(_task); + + // clear history task + while (task_notify_socket->wait_event(0, SW_EVENT_READ) == SW_OK) { + if (task_notify_socket->read(¬ify, sizeof(notify)) <= 0) { + break; + } + } + + if (!task(_task, dst_worker_id, true)) { + return false; + } + + SW_LOOP { + if (task_notify_socket->wait_event((int) (timeout * 1000), SW_EVENT_READ) == SW_OK) { + if (pipe->read(¬ify, sizeof(notify)) > 0) { + if (get_task_id(task_result) != task_id) { + continue; + } + return true; + } + } + break; + } + + return false; +} + bool Server::task_unpack(EventData *task, String *buffer, PacketPtr *packet) { if (!(task->info.ext_flags & SW_TASK_TMPFILE)) { packet->data = task->data; @@ -331,7 +385,7 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even /** * Use worker shm store the result */ - EventData *result = &(task_result[source_worker_id]); + EventData *result = &(task_results[source_worker_id]); Pipe *pipe = task_notify_pipes.at(source_worker_id).get(); // lock worker @@ -348,8 +402,8 @@ int Server::reply_task_result(const char *data, size_t data_len, int flags, Even } buf.info.ext_flags |= flags; buf.info.type = SW_SERVER_EVENT_FINISH; - buf.info.fd = current_task->info.fd; - size_t bytes = sizeof(buf.info) + buf.info.len; + buf.info.fd = get_task_id(current_task); + size_t bytes = buf.size(); if (file.write_all(&buf, bytes) != bytes) { swoole_sys_warning("write(%s, %ld) failed", _tmpfile, bytes); }