From f0cd53d0c3cda281aebc4e7b9ac6f241712c7e1e Mon Sep 17 00:00:00 2001 From: hantianfeng Date: Tue, 13 Aug 2024 17:43:39 +0800 Subject: [PATCH] Optimize Server::stop() --- examples/server/echo.php | 35 +++++-- examples/thread/thread_server.php | 29 ++++-- ext-src/php_swoole.cc | 4 +- ext-src/php_swoole_private.h | 12 +++ ext-src/swoole_async_coro.cc | 7 +- ext-src/swoole_coroutine_system.cc | 1 + ext-src/swoole_process.cc | 1 + ext-src/swoole_runtime.cc | 7 +- ext-src/swoole_server.cc | 47 ++++------ include/swoole.h | 7 ++ include/swoole_error.h | 1 + include/swoole_process_pool.h | 1 + include/swoole_server.h | 9 ++ src/core/error.cc | 12 +++ src/os/process_pool.cc | 4 +- src/server/master.cc | 69 +++++++------- src/server/reactor_thread.cc | 13 ++- src/server/task_worker.cc | 2 + src/server/thread.cc | 19 +++- src/server/worker.cc | 76 +++++++++++++-- tests/include/functions.php | 5 + tests/swoole_thread/server/base.phpt | 87 ++++++++--------- tests/swoole_thread/server/heartbeat.phpt | 79 +++++++--------- .../server/send_large_packet.phpt | 93 +++++++++---------- tests/swoole_thread/server/stop_worker.phpt | 75 +++++++++++++++ 25 files changed, 440 insertions(+), 255 deletions(-) create mode 100644 tests/swoole_thread/server/stop_worker.phpt diff --git a/examples/server/echo.php b/examples/server/echo.php index 1ba7235a028..cdb1fa25729 100644 --- a/examples/server/echo.php +++ b/examples/server/echo.php @@ -1,29 +1,44 @@ mode === SWOOLE_THREAD ? \Swoole\Thread::getId() : posix_getpid(); +} $serv->set([ - 'worker_num' =>1, + 'worker_num' => 2, + 'task_worker_num' => 3, ]); -$serv->on('connect', function ($serv, $fd, $reactor_id){ - echo "[#".posix_getpid()."]\tClient@[$fd:$reactor_id]: Connect.\n"; +$serv->on('workerStart', function ($serv, $worker_id) { + echo "[#" . getpid() . "]\tWorker#{$worker_id} is started.\n"; +}); + +$serv->on('workerStop', function ($serv, $worker_id) { + echo "[#" . getpid() . "]\tWorker#{$worker_id} is stopped.\n"; }); -$serv->set(array( - 'worker_num' => 1, -)); +$serv->on('connect', function ($serv, $fd, $reactor_id) { + echo "[#" . getpid() . "]\tClient@[$fd:$reactor_id]: Connect.\n"; +}); $serv->on('receive', function (Swoole\Server $serv, $fd, $reactor_id, $data) { - echo "[#".$serv->worker_id."]\tClient[$fd] receive data: $data\n"; + echo "[#" . $serv->worker_id . "]\tClient[$fd] receive data: $data\n"; if ($serv->send($fd, "hello {$data}\n") == false) { echo "error\n"; } - }); $serv->on('close', function ($serv, $fd, $reactor_id) { - echo "[#".posix_getpid()."]\tClient@[$fd:$reactor_id]: Close.\n"; + echo "[#" . getpid() . "]\tClient@[$fd:$reactor_id]: Close.\n"; +}); + +$serv->on('task', function ($serv, $src_worker_id, $task) { + var_dump($task); }); $serv->start(); diff --git a/examples/thread/thread_server.php b/examples/thread/thread_server.php index f58e6a09dd2..77fac8b038d 100644 --- a/examples/thread/thread_server.php +++ b/examples/thread/thread_server.php @@ -1,5 +1,8 @@ set([ 'worker_num' => 2, 'task_worker_num' => 3, @@ -17,6 +20,10 @@ // $resp->end("tid=" . \Swoole\Thread::getId() . ', fd=' . $req->fd); if ($req->server['request_uri'] == '/task') { $http->task(['code' => uniqid()]); + } elseif ($req->server['request_uri'] == '/stop') { + var_dump($http->getWorkerId()); + var_dump($req->get['worker_id']); + $http->stop($req->get['worker_id'] ?? 0); } elseif ($req->server['request_uri'] == '/msg') { $dstWorkerId = random_int(0, 4); if ($dstWorkerId != $http->getWorkerId()) { @@ -31,10 +38,10 @@ echo "[worker#" . $http->getWorkerId() . "]\treceived pipe message[$msg] from " . $srcWorkerId . "\n"; }); -$http->addProcess(new \Swoole\Process(function () { - echo "user process, id=" . \Swoole\Thread::getId() . "\n"; - sleep(2); -})); +//$http->addProcess(new \Swoole\Process(function () { +// echo "user process, id=" . \Swoole\Thread::getId() . "\n"; +// sleep(2); +//})); $http->on('Task', function ($server, $taskId, $srcWorkerId, $data) { var_dump($taskId, $srcWorkerId, $data); @@ -45,12 +52,16 @@ var_dump($taskId, $data); }); -$http->on('WorkerStart', function ($serv, $wid) { - var_dump(\Swoole\Thread::getArguments(), $wid); +$http->on('workerStart', function ($serv, $worker_id) { + echo "[#" . Swoole\Thread::getId() . "]\tWorker#{$worker_id} is started.\n"; +}); + +$http->on('workerStop', function ($serv, $worker_id) { + echo "[#" . Swoole\Thread::getId() . "]\tWorker#{$worker_id} is stopped.\n"; }); -$http->on('WorkerStop', function ($serv, $wid) { - var_dump('stop: T' . \Swoole\Thread::getId()); +$http->on('workerExit', function (Server $serv, $worker_id) { + echo "[#" . Swoole\Thread::getId() . "]\tWorker#{$worker_id} is exited, event_num=" . Swoole\Coroutine::stats()['event_num'] . ".\n"; }); $http->start(); diff --git a/ext-src/php_swoole.cc b/ext-src/php_swoole.cc index bb369301d14..dcf5604c20e 100644 --- a/ext-src/php_swoole.cc +++ b/ext-src/php_swoole.cc @@ -589,6 +589,7 @@ PHP_MINIT_FUNCTION(swoole) { SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_HTTP_PROXY_BAD_RESPONSE", SW_ERROR_HTTP_PROXY_BAD_RESPONSE); SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_HTTP_CONFLICT_HEADER", SW_ERROR_HTTP_CONFLICT_HEADER); SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_HTTP_CONTEXT_UNAVAILABLE", SW_ERROR_HTTP_CONTEXT_UNAVAILABLE); + SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_HTTP_COOKIE_UNAVAILABLE", SW_ERROR_HTTP_COOKIE_UNAVAILABLE); SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_WEBSOCKET_BAD_CLIENT", SW_ERROR_WEBSOCKET_BAD_CLIENT); SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_WEBSOCKET_BAD_OPCODE", SW_ERROR_WEBSOCKET_BAD_OPCODE); SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_WEBSOCKET_UNCONNECTED", SW_ERROR_WEBSOCKET_UNCONNECTED); @@ -610,6 +611,7 @@ PHP_MINIT_FUNCTION(swoole) { SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_SERVER_INVALID_COMMAND", SW_ERROR_SERVER_INVALID_COMMAND); SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_SERVER_IS_NOT_REGULAR_FILE", SW_ERROR_SERVER_IS_NOT_REGULAR_FILE); SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_SERVER_SEND_TO_WOKER_TIMEOUT", SW_ERROR_SERVER_SEND_TO_WOKER_TIMEOUT); + SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_SERVER_INVALID_CALLBACK", SW_ERROR_SERVER_INVALID_CALLBACK); SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_SERVER_WORKER_EXIT_TIMEOUT", SW_ERROR_SERVER_WORKER_EXIT_TIMEOUT); SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_SERVER_WORKER_ABNORMAL_PIPE_DATA", SW_ERROR_SERVER_WORKER_ABNORMAL_PIPE_DATA); SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_SERVER_WORKER_UNPROCESSED_DATA", SW_ERROR_SERVER_WORKER_UNPROCESSED_DATA); @@ -1009,7 +1011,6 @@ PHP_RINIT_FUNCTION(swoole) { } SWOOLE_G(req_status) = PHP_SWOOLE_RINIT_BEGIN; - SwooleG.running = 1; php_swoole_register_shutdown_function("swoole_internal_call_user_shutdown_begin"); @@ -1082,7 +1083,6 @@ PHP_RSHUTDOWN_FUNCTION(swoole) { php_swoole_thread_rshutdown(); #endif - SwooleG.running = 0; SWOOLE_G(req_status) = PHP_SWOOLE_RSHUTDOWN_END; #ifdef PHP_STREAM_FLAG_NO_CLOSE diff --git a/ext-src/php_swoole_private.h b/ext-src/php_swoole_private.h index da866d09b3b..abbf43ffb73 100644 --- a/ext-src/php_swoole_private.h +++ b/ext-src/php_swoole_private.h @@ -72,6 +72,18 @@ extern PHPAPI int php_array_merge(zend_array *dest, zend_array *src); RETURN_FALSE; \ } +#ifdef SW_THREAD +#define SW_MUST_BE_MAIN_THREAD_EX(op) \ + if (!tsrm_is_main_thread()) { \ + swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT); \ + op; \ + } +#else +#define SW_MUST_BE_MAIN_THREAD() +#endif + +#define SW_MUST_BE_MAIN_THREAD() SW_MUST_BE_MAIN_THREAD_EX(RETURN_TRUE) + #define php_swoole_fatal_error(level, fmt_str, ...) \ swoole_set_last_error(SW_ERROR_PHP_FATAL_ERROR); \ php_error_docref(NULL, level, (const char *) (fmt_str), ##__VA_ARGS__) diff --git a/ext-src/swoole_async_coro.cc b/ext-src/swoole_async_coro.cc index b50f77d75b1..19c5c92dad1 100644 --- a/ext-src/swoole_async_coro.cc +++ b/ext-src/swoole_async_coro.cc @@ -69,12 +69,7 @@ void php_swoole_set_aio_option(HashTable *vht) { } PHP_FUNCTION(swoole_async_set) { -#ifdef SW_THREAD - if (!tsrm_is_main_thread()) { - swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT); - RETURN_FALSE; - } -#endif + SW_MUST_BE_MAIN_THREAD(); if (sw_reactor()) { php_swoole_fatal_error(E_ERROR, "eventLoop has already been created. unable to change settings"); swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT); diff --git a/ext-src/swoole_coroutine_system.cc b/ext-src/swoole_coroutine_system.cc index 628c9d65295..952f3b37335 100644 --- a/ext-src/swoole_coroutine_system.cc +++ b/ext-src/swoole_coroutine_system.cc @@ -334,6 +334,7 @@ PHP_METHOD(swoole_coroutine_system, waitPid) { } PHP_METHOD(swoole_coroutine_system, waitSignal) { + SW_MUST_BE_MAIN_THREAD(); zend_long signo; double timeout = -1; diff --git a/ext-src/swoole_process.cc b/ext-src/swoole_process.cc index 7bfac0af358..969dfee9945 100644 --- a/ext-src/swoole_process.cc +++ b/ext-src/swoole_process.cc @@ -424,6 +424,7 @@ static PHP_METHOD(swoole_process, kill) { } static PHP_METHOD(swoole_process, signal) { + SW_MUST_BE_MAIN_THREAD(); zend_long signo = 0; zval *zcallback = nullptr; zend_fcall_info_cache *fci_cache = nullptr; diff --git a/ext-src/swoole_runtime.cc b/ext-src/swoole_runtime.cc index 08150d55865..a70da7b1a41 100644 --- a/ext-src/swoole_runtime.cc +++ b/ext-src/swoole_runtime.cc @@ -1176,12 +1176,7 @@ void PHPCoroutine::enable_unsafe_function() { } bool PHPCoroutine::enable_hook(uint32_t flags) { -#ifdef SW_THREAD - if (!tsrm_is_main_thread()) { - swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT); - return false; - } -#endif + SW_MUST_BE_MAIN_THREAD_EX(return false); if (swoole_isset_hook((enum swGlobalHookType) PHP_SWOOLE_HOOK_BEFORE_ENABLE_HOOK)) { swoole_call_hook((enum swGlobalHookType) PHP_SWOOLE_HOOK_BEFORE_ENABLE_HOOK, &flags); } diff --git a/ext-src/swoole_server.cc b/ext-src/swoole_server.cc index f74bf6e672b..df80f1ccb69 100644 --- a/ext-src/swoole_server.cc +++ b/ext-src/swoole_server.cc @@ -99,7 +99,7 @@ void php_swoole_server_rshutdown() { Server *serv = sw_server(); serv->drain_worker_pipe(); - if (serv->is_started() && serv->is_running() && !serv->is_user_worker()) { + if (serv->is_started() && serv->worker_is_running() && !serv->is_user_worker()) { if (php_swoole_is_fatal_error()) { swoole_error_log(SW_LOG_ERROR, SW_ERROR_PHP_FATAL_ERROR, @@ -1492,12 +1492,14 @@ static void php_swoole_server_onWorkerStart(Server *serv, Worker *worker) { zend_update_property_bool(swoole_server_ce, SW_Z8_OBJ_P(zserv), ZEND_STRL("taskworker"), serv->is_task_worker()); zend_update_property_long(swoole_server_ce, SW_Z8_OBJ_P(zserv), ZEND_STRL("worker_pid"), getpid()); - if (serv->is_task_worker() && !serv->task_enable_coroutine) { - PHPCoroutine::disable_hook(); + if (serv->is_task_worker()) { + if (!serv->task_enable_coroutine) { + PHPCoroutine::disable_hook(); + } + } else { + serv->get_worker_message_bus()->set_allocator(sw_zend_string_allocator()); } - serv->get_worker_message_bus()->set_allocator(sw_zend_string_allocator()); - zval args[2]; args[0] = *zserv; ZVAL_LONG(&args[1], worker->id); @@ -1540,10 +1542,9 @@ static void php_swoole_server_onAfterReload(Server *serv) { } static void php_swoole_server_onWorkerStop(Server *serv, Worker *worker) { - if (SwooleWG.shutdown) { + if (!SwooleWG.running) { return; } - SwooleWG.shutdown = true; zval *zserv = php_swoole_server_zval_ptr(serv); ServerObject *server_object = server_fetch_object(Z_OBJ_P(zserv)); @@ -1926,6 +1927,7 @@ static PHP_METHOD(swoole_server, __construct) { && serv_mode != Server::MODE_THREAD #endif ) { + swoole_set_last_error(SW_ERROR_INVALID_PARAMS); zend_throw_error(NULL, "invalid $mode parameters %d", (int) serv_mode); RETURN_FALSE; } @@ -1935,8 +1937,14 @@ static PHP_METHOD(swoole_server, __construct) { server_ctor(ZEND_THIS, sw_server()); return; } + if (!tsrm_is_main_thread()) { + swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT); + zend_throw_exception_ex(swoole_exception_ce, -2, "This operation is only allowed in the main thread"); + RETURN_FALSE; + } #else if (sw_server() != nullptr) { + swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT); zend_throw_exception_ex( swoole_exception_ce, -3, "server is running. unable to create %s", SW_Z_OBJCE_NAME_VAL_P(zserv)); RETURN_FALSE; @@ -2672,7 +2680,7 @@ static PHP_METHOD(swoole_server, start) { server_object->on_before_start(); if (serv->start() < 0) { - php_swoole_fatal_error(E_ERROR, "failed to start server. Error: %s", sw_error); + php_swoole_fatal_error(E_ERROR, "failed to start server. Error: %s", serv->get_startup_error_message()); } #ifdef SW_THREAD @@ -3869,7 +3877,7 @@ static PHP_METHOD(swoole_server, getWorkerPid) { static PHP_METHOD(swoole_server, getManagerPid) { Server *serv = php_swoole_server_get_and_check_server(ZEND_THIS); - RETURN_LONG(serv->gs->manager_pid); + RETURN_LONG(serv->get_manager_pid()); } static PHP_METHOD(swoole_server, getMasterPid) { @@ -3898,26 +3906,7 @@ static PHP_METHOD(swoole_server, stop) { Z_PARAM_BOOL(wait_reactor) ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE); - if (worker_id == sw_worker()->id && wait_reactor == 0) { - if (SwooleTG.reactor != nullptr) { - SwooleTG.reactor->defer( - [](void *data) { - Reactor *reactor = (Reactor *) data; - reactor->running = false; - }, - SwooleTG.reactor); - } - serv->running = false; - } else { - Worker *worker = serv->get_worker(worker_id); - if (worker == nullptr) { - RETURN_FALSE; - } else if (swoole_kill(worker->pid, SIGTERM) < 0) { - php_swoole_sys_error(E_WARNING, "kill(%d, SIGTERM) failed", worker->pid); - RETURN_FALSE; - } - } - RETURN_TRUE; + RETURN_BOOL(serv->kill_worker(worker_id, wait_reactor)); } // swoole_connection_iterator diff --git a/include/swoole.h b/include/swoole.h index 3002e8e04cc..699414c9a9c 100644 --- a/include/swoole.h +++ b/include/swoole.h @@ -807,6 +807,13 @@ static inline int swoole_get_last_error(void) { return SwooleTG.error; } +static inline void swoole_clear_last_error(void) { + SwooleTG.error = 0; +} + +void swoole_clear_last_error_msg(void); +const char *swoole_get_last_error_msg(void); + static inline int swoole_get_thread_id(void) { return SwooleTG.id; } diff --git a/include/swoole_error.h b/include/swoole_error.h index d5746534f5a..7368b916450 100644 --- a/include/swoole_error.h +++ b/include/swoole_error.h @@ -159,6 +159,7 @@ enum swErrorCode { SW_ERROR_SERVER_INVALID_COMMAND, SW_ERROR_SERVER_IS_NOT_REGULAR_FILE, SW_ERROR_SERVER_SEND_TO_WOKER_TIMEOUT, + SW_ERROR_SERVER_INVALID_CALLBACK, /** * Process exit timeout, forced to end. diff --git a/include/swoole_process_pool.h b/include/swoole_process_pool.h index 74f70754018..cda0d47900d 100644 --- a/include/swoole_process_pool.h +++ b/include/swoole_process_pool.h @@ -105,6 +105,7 @@ struct Worker; struct WorkerGlobal { bool run_always; bool shutdown; + bool running; uint32_t max_request; Worker *worker; Worker *worker_copy; diff --git a/include/swoole_server.h b/include/swoole_server.h index 4e8eba6b19e..1f52fd19c5f 100644 --- a/include/swoole_server.h +++ b/include/swoole_server.h @@ -1126,6 +1126,7 @@ class Server { return nullptr; } + bool kill_worker(WorkerId worker_id, bool wait_reactor); void stop_async_worker(Worker *worker); void stop_master_thread(); void join_heartbeat_thread(); @@ -1166,6 +1167,10 @@ class Server { return swoole_get_process_type() == SW_PROCESS_EVENTWORKER; } + bool is_event_worker() { + return is_worker(); + } + bool is_task_worker() { return swoole_get_process_type() == SW_PROCESS_TASKWORKER; } @@ -1313,6 +1318,7 @@ class Server { void call_hook(enum HookType type, void *arg); void call_worker_start_callback(Worker *worker); + void call_worker_stop_callback(Worker *worker); void call_command_handler(MessageBus &mb, uint16_t worker_id, network::Socket *sock); std::string call_command_handler_in_master(int command_id, const std::string &msg); void call_command_callback(int64_t request_id, const std::string &result); @@ -1431,6 +1437,7 @@ class Server { void worker_stop_callback(Worker *worker); void worker_accept_event(DataHead *info); void worker_signal_init(void); + bool worker_is_running(); std::function worker_thread_start; /** @@ -1451,6 +1458,7 @@ class Server { int start_master_thread(Reactor *reactor); int start_event_worker(Worker *worker); void start_heartbeat_thread(); + const char *get_startup_error_message(); private: enum Mode mode_; @@ -1477,6 +1485,7 @@ class Server { int start_reactor_threads(); int start_reactor_processes(); int start_worker_threads(); + void stop_worker_threads(); void join_reactor_thread(); TimerCallback get_timeout_callback(ListenPort *port, Reactor *reactor, Connection *conn); diff --git a/src/core/error.cc b/src/core/error.cc index ba8277986d9..276187d532a 100644 --- a/src/core/error.cc +++ b/src/core/error.cc @@ -174,6 +174,8 @@ const char *swoole_strerror(int code) { return "Http conflict header"; case SW_ERROR_HTTP_CONTEXT_UNAVAILABLE: return "Http context unavailable"; + case SW_ERROR_HTTP_COOKIE_UNAVAILABLE: + return "Http cookie unavailable"; case SW_ERROR_WEBSOCKET_BAD_CLIENT: return "Websocket bad client"; case SW_ERROR_WEBSOCKET_BAD_OPCODE: @@ -216,6 +218,8 @@ const char *swoole_strerror(int code) { return "Server is not regular file"; case SW_ERROR_SERVER_SEND_TO_WOKER_TIMEOUT: return "Server send to woker timeout"; + case SW_ERROR_SERVER_INVALID_CALLBACK: + return "Server invalid callback"; case SW_ERROR_SERVER_WORKER_EXIT_TIMEOUT: return "Server worker exit timeout"; case SW_ERROR_SERVER_WORKER_ABNORMAL_PIPE_DATA: @@ -283,3 +287,11 @@ void swoole_ignore_error(int code) { bool swoole_is_ignored_error(int code) { return ignored_errors.find(code) != ignored_errors.end(); } + +void swoole_clear_last_error_msg(void) { + sw_error[0] = '\0'; +} + +const char *swoole_get_last_error_msg(void) { + return sw_error; +} diff --git a/src/os/process_pool.cc b/src/os/process_pool.cc index b04db48cc49..75be46df47e 100644 --- a/src/os/process_pool.cc +++ b/src/os/process_pool.cc @@ -683,7 +683,7 @@ static int ProcessPool_worker_loop_with_stream_protocol(ProcessPool *pool, Worke QueueNode *outbuf = (QueueNode *) pool->packet_buffer; outbuf->mtype = 0; - while (pool->running) { + while (pool->running && !SwooleWG.shutdown) { /** * fetch task */ @@ -788,7 +788,7 @@ static int ProcessPool_worker_loop_with_message_protocol(ProcessPool *pool, Work worker->pipe_worker->dont_restart = 1; - while (pool->running) { + while (pool->running && !SwooleWG.shutdown) { switch (fn()) { case 0: if (SwooleG.signal_alarm && SwooleTG.timer) { diff --git a/src/server/master.cc b/src/server/master.cc index 0976bf99da7..f57d5a5b76e 100644 --- a/src/server/master.cc +++ b/src/server/master.cc @@ -331,24 +331,48 @@ void Server::set_max_connection(uint32_t _max_connection) { } } +const char *Server::get_startup_error_message() { + auto error_msg = swoole_get_last_error_msg(); + if (strlen(error_msg) == 0 && swoole_get_last_error() > 0) { + auto buf = sw_tg_buffer(); + buf->clear(); + buf->append(swoole_get_last_error()); + buf->str[buf->length] = '\0'; + error_msg = buf->str; + } + return error_msg; +} + int Server::start_check() { // disable notice when use SW_DISPATCH_ROUND and SW_DISPATCH_QUEUE if (is_process_mode()) { if (!is_support_unsafe_events()) { if (onConnect) { - swoole_warning("cannot set 'onConnect' event when using dispatch_mode=%d", dispatch_mode); + swoole_error_log(SW_LOG_WARNING, + SW_ERROR_SERVER_INVALID_CALLBACK, + "cannot set 'onConnect' event when using dispatch_mode=%d", + dispatch_mode); onConnect = nullptr; } if (onClose) { - swoole_warning("cannot set 'onClose' event when using dispatch_mode=%d", dispatch_mode); + swoole_error_log(SW_LOG_WARNING, + SW_ERROR_SERVER_INVALID_CALLBACK, + "cannot set 'onClose' event when using dispatch_mode=%d", + dispatch_mode); onClose = nullptr; } if (onBufferFull) { - swoole_warning("cannot set 'onBufferFull' event when using dispatch_mode=%d", dispatch_mode); + swoole_error_log(SW_LOG_WARNING, + SW_ERROR_SERVER_INVALID_CALLBACK, + "cannot set 'onBufferFull' event when using dispatch_mode=%d", + dispatch_mode); onBufferFull = nullptr; } if (onBufferEmpty) { - swoole_warning("cannot set 'onBufferEmpty' event when using dispatch_mode=%d", dispatch_mode); + swoole_error_log(SW_LOG_WARNING, + SW_ERROR_SERVER_INVALID_CALLBACK, + "cannot set 'onBufferEmpty' event when using dispatch_mode=%d", + dispatch_mode); onBufferEmpty = nullptr; } disable_notify = 1; @@ -361,7 +385,7 @@ int Server::start_check() { } if (task_worker_num > 0) { if (onTask == nullptr) { - swoole_warning("onTask event callback must be set"); + swoole_error_log(SW_LOG_WARNING, SW_ERROR_SERVER_INVALID_CALLBACK, "require 'onTask' callback"); return SW_ERR; } } @@ -380,11 +404,11 @@ int Server::start_check() { ls->protocol.package_max_length = SW_BUFFER_MIN_SIZE; } if (if_require_receive_callback(ls, onReceive != nullptr)) { - swoole_warning("require onReceive callback"); + swoole_error_log(SW_LOG_WARNING, SW_ERROR_SERVER_INVALID_CALLBACK, "require 'onReceive' callback"); return SW_ERR; } if (if_require_packet_callback(ls, onPacket != nullptr)) { - swoole_warning("require onPacket callback"); + swoole_error_log(SW_LOG_WARNING, SW_ERROR_SERVER_INVALID_CALLBACK, "require 'onPacket' callback"); return SW_ERR; } if (ls->heartbeat_idle_time > 0) { @@ -434,7 +458,7 @@ int Server::start_master_thread(Reactor *reactor) { return SW_ERR; } - if (!single_thread) { + if (!single_thread && !is_thread_mode()) { reactor_thread_barrier.wait(); } if (is_process_mode()) { @@ -565,23 +589,9 @@ void Server::init_worker(Worker *worker) { worker->request_count = 0; } -void Server::call_worker_start_callback(Worker *worker) { - void *hook_args[2]; - hook_args[0] = this; - hook_args[1] = (void *) (uintptr_t) worker->id; - - if (swoole_isset_hook(SW_GLOBAL_HOOK_BEFORE_WORKER_START)) { - swoole_call_hook(SW_GLOBAL_HOOK_BEFORE_WORKER_START, hook_args); - } - if (isset_hook(HOOK_WORKER_START)) { - call_hook(Server::HOOK_WORKER_START, hook_args); - } - if (onWorkerStart) { - onWorkerStart(this, worker); - } -} - int Server::start() { + swoole_clear_last_error(); + swoole_clear_last_error_msg(); if (start_check() < 0) { return SW_ERR; } @@ -826,7 +836,7 @@ int Server::create() { return SW_ERR; } - if (is_process_mode() || is_thread_mode()) { + if (is_process_mode()) { reactor_thread_barrier.init(false, reactor_num + 1); gs->manager_barrier.init(true, 2); } @@ -937,12 +947,7 @@ void Server::stop_master_thread() { reactor->set_exit_condition(Reactor::EXIT_CONDITION_FORCED_TERMINATION, fn); } if (is_thread_mode()) { - SW_LOOP_N(reactor_num) { - auto thread = get_thread(i); - DataHead ev = {}; - ev.type = SW_SERVER_EVENT_SHUTDOWN; - thread->notify_pipe->send_blocking((void *) &ev, sizeof(ev)); - } + stop_worker_threads(); } } @@ -1836,7 +1841,7 @@ static void Server_signal_handler(int sig) { swoole_trace_log(SW_TRACE_SERVER, "signal[%d] %s triggered in %d", sig, swoole_signal_to_str(sig), getpid()); Server *serv = sw_server(); - if (!SwooleG.running or !serv) { + if (!SwooleG.running || !serv || !serv->is_running()) { return; } diff --git a/src/server/reactor_thread.cc b/src/server/reactor_thread.cc index 14fb7c62a0c..6beb5b7ec79 100644 --- a/src/server/reactor_thread.cc +++ b/src/server/reactor_thread.cc @@ -389,7 +389,11 @@ static int ReactorThread_onPipeRead(Reactor *reactor, Event *ev) { auto packet = thread->message_bus.get_packet(); serv->call_command_callback(resp->info.fd, std::string(packet.data, packet.length)); } else if (resp->info.type == SW_SERVER_EVENT_SHUTDOWN) { - thread->shutdown(reactor); + if (serv->is_thread_mode()) { + serv->stop_async_worker(serv->get_worker(reactor->id)); + } else { + thread->shutdown(reactor); + } } else if (resp->info.type == SW_SERVER_EVENT_FINISH) { serv->onFinish(serv, (EventData *) resp); } else if (resp->info.type == SW_SERVER_EVENT_PIPE_MESSAGE) { @@ -802,6 +806,7 @@ void ReactorThread::clean() { pipe_command->fd = -1; delete pipe_command; } + pipe_num = 0; message_bus.free_buffer(); } @@ -826,11 +831,13 @@ void Server::reactor_thread_main_loop(Server *serv, int reactor_id) { } // wait other thread - serv->reactor_thread_barrier.wait(); + if (serv->is_process_mode()) { + serv->reactor_thread_barrier.wait(); + } // main loop swoole_event_wait(); if (serv->is_thread_mode()) { - serv->worker_stop_callback(serv->get_worker(reactor_id)); + serv->call_worker_stop_callback(serv->get_worker(reactor_id)); } thread->clean(); } diff --git a/src/server/task_worker.cc b/src/server/task_worker.cc index b7046907294..0c692c3d9ad 100644 --- a/src/server/task_worker.cc +++ b/src/server/task_worker.cc @@ -88,6 +88,8 @@ static int TaskWorker_onTask(ProcessPool *pool, EventData *task) { if (task->info.type == SW_SERVER_EVENT_PIPE_MESSAGE) { serv->onPipeMessage(serv, task); + } else if (task->info.type == SW_SERVER_EVENT_SHUTDOWN) { + SwooleWG.shutdown = true; } else if (task->info.type == SW_SERVER_EVENT_COMMAND_REQUEST) { ret = TaskWorker_call_command_handler(pool, task); } else { diff --git a/src/server/thread.cc b/src/server/thread.cc index faf63a42f62..434ef05d15b 100644 --- a/src/server/thread.cc +++ b/src/server/thread.cc @@ -67,9 +67,7 @@ bool ThreadFactory::shutdown() { return true; } -ThreadFactory::~ThreadFactory() { - -} +ThreadFactory::~ThreadFactory() {} void ThreadFactory::at_thread_exit(Worker *worker) { std::unique_lock _lock(lock_); @@ -235,4 +233,19 @@ int Server::start_worker_threads() { store_listen_socket(); return start_master_thread(reactor); } + +void Server::stop_worker_threads() { + DataHead event = {}; + event.type = SW_SERVER_EVENT_SHUTDOWN; + + SW_LOOP_N(worker_num) { + send_to_worker_from_worker(get_worker(i), &event, sizeof(event), SW_PIPE_MASTER); + } + + if (task_worker_num > 0) { + SW_LOOP_N(task_worker_num) { + send_to_worker_from_worker(get_worker(worker_num + i), &event, sizeof(event), SW_PIPE_MASTER); + } + } +} } // namespace swoole diff --git a/src/server/worker.cc b/src/server/worker.cc index 585350ae08d..a17e723e940 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -48,7 +48,7 @@ void Server::worker_signal_init(void) { } void Server::worker_signal_handler(int signo) { - if (!SwooleG.running || !sw_server() || !sw_worker()) { + if (!SwooleG.running || !sw_server() || !sw_worker() || !sw_server()->is_running()) { return; } switch (signo) { @@ -198,6 +198,9 @@ void Server::worker_accept_event(DataHead *info) { call_command_handler(message_bus, worker->id, pipe_command->get_socket(false)); break; } + case SW_SERVER_EVENT_SHUTDOWN: { + break; + } default: swoole_warning("[Worker] error event[type=%d]", (int) info->type); break; @@ -282,20 +285,46 @@ void Server::worker_start_callback(Worker *worker) { } void Server::worker_stop_callback(Worker *worker) { + call_worker_stop_callback(worker); +} + +void Server::call_worker_start_callback(Worker *worker) { void *hook_args[2]; hook_args[0] = this; hook_args[1] = (void *) (uintptr_t) worker->id; + + if (swoole_isset_hook(SW_GLOBAL_HOOK_BEFORE_WORKER_START)) { + swoole_call_hook(SW_GLOBAL_HOOK_BEFORE_WORKER_START, hook_args); + } + if (isset_hook(HOOK_WORKER_START)) { + call_hook(Server::HOOK_WORKER_START, hook_args); + } + + SwooleWG.running = true; + if (onWorkerStart) { + onWorkerStart(this, worker); + } +} + +void Server::call_worker_stop_callback(Worker *worker) { + void *hook_args[2]; + hook_args[0] = this; + hook_args[1] = (void *) (uintptr_t) worker->id; + if (swoole_isset_hook(SW_GLOBAL_HOOK_BEFORE_WORKER_STOP)) { swoole_call_hook(SW_GLOBAL_HOOK_BEFORE_WORKER_STOP, hook_args); } if (onWorkerStop) { onWorkerStop(this, worker); } - if (!get_worker_message_bus()->empty()) { + + if (is_event_worker() && !get_worker_message_bus()->empty()) { swoole_error_log( SW_LOG_WARNING, SW_ERROR_SERVER_WORKER_UNPROCESSED_DATA, "unprocessed data in the worker process buffer"); get_worker_message_bus()->clear(); } + + SwooleWG.running = false; if (SwooleWG.worker_copy) { delete SwooleWG.worker_copy; SwooleWG.worker_copy = nullptr; @@ -303,6 +332,39 @@ void Server::worker_stop_callback(Worker *worker) { } } +bool Server::worker_is_running() { + return SwooleWG.running; +} + +bool Server::kill_worker(WorkerId worker_id, bool wait_reactor) { + if (is_thread_mode()) { + DataHead event = {}; + event.type = SW_SERVER_EVENT_SHUTDOWN; + return send_to_worker_from_worker(get_worker(worker_id), &event, sizeof(event), SW_PIPE_MASTER) != -1; + } + + if (worker_id == sw_worker()->id && !wait_reactor) { + if (swoole_event_is_available()) { + swoole_event_defer( + [](void *data) { + sw_reactor()->running = false; + }, nullptr); + } + running = false; + } else { + Worker *worker = get_worker(worker_id); + if (worker == nullptr) { + swoole_error_log(SW_LOG_WARNING, SW_ERROR_INVALID_PARAMS, "the worker_id[%d] is invalid", worker_id); + return false; + } + if (swoole_kill(worker->pid, SIGTERM) < 0) { + swoole_sys_warning("kill(%d, SIGTERM) failed", worker->pid); + return false; + } + } + return true; +} + void Server::stop_async_worker(Worker *worker) { worker->status = SW_WORKER_EXIT; Reactor *reactor = SwooleTG.reactor; @@ -311,7 +373,6 @@ void Server::stop_async_worker(Worker *worker) { * force to end. */ if (reload_async == 0) { - running = false; reactor->running = false; return; } @@ -353,13 +414,13 @@ void Server::stop_async_worker(Worker *worker) { }); clear_timer(); } - } else { + } else if (is_process_mode()) { WorkerStopMessage msg; msg.pid = SwooleG.pid; msg.worker_id = worker->id; if (gs->event_workers.push_message(SW_WORKER_MESSAGE_STOP, &msg, sizeof(msg)) < 0) { - running = 0; + swoole_sys_warning("failed to push WORKER_STOP message"); } } @@ -368,9 +429,6 @@ void Server::stop_async_worker(Worker *worker) { SwooleWG.exit_time = ::time(nullptr); Worker_reactor_try_to_exit(reactor); - if (!reactor->running) { - running = false; - } } static void Worker_reactor_try_to_exit(Reactor *reactor) { @@ -386,7 +444,6 @@ static void Worker_reactor_try_to_exit(Reactor *reactor) { while (1) { if (reactor->if_exit()) { reactor->running = false; - break; } else { if (serv->onWorkerExit && call_worker_exit_func == 0) { serv->onWorkerExit(serv, sw_worker()); @@ -398,7 +455,6 @@ static void Worker_reactor_try_to_exit(Reactor *reactor) { swoole_error_log( SW_LOG_WARNING, SW_ERROR_SERVER_WORKER_EXIT_TIMEOUT, "worker exit timeout, forced termination"); reactor->running = false; - break; } else { int timeout_msec = remaining_time * 1000; if (reactor->timeout_msec < 0 || reactor->timeout_msec > timeout_msec) { diff --git a/tests/include/functions.php b/tests/include/functions.php index 4dd94504cad..891e638d8b0 100644 --- a/tests/include/functions.php +++ b/tests/include/functions.php @@ -88,6 +88,11 @@ function get_one_free_port(): int return $port; } +function get_constant_port(string $str, int $base = 9500): int +{ + return $base + crc32(__FILE__) % 1000; +} + function get_one_free_port_ipv6(): int { $hookFlags = Swoole\Runtime::getHookFlags(); diff --git a/tests/swoole_thread/server/base.phpt b/tests/swoole_thread/server/base.phpt index a89087432e4..76303fe6c98 100644 --- a/tests/swoole_thread/server/base.phpt +++ b/tests/swoole_thread/server/base.phpt @@ -10,68 +10,59 @@ skip_if_nts(); require __DIR__ . '/../../include/bootstrap.php'; use Swoole\Thread; -use Swoole\Thread\Lock; const SIZE = 2 * 1024 * 1024; +$port = get_constant_port(__FILE__); -$tm = new \SwooleTest\ThreadManager(); -$tm->initFreePorts(increment: crc32(__FILE__) % 1000); - -$tm->parentFunc = function () use ($tm) { - $queue = new Swoole\Thread\Queue(); - $atomic = new Swoole\Thread\Atomic(1); - $thread = new Thread(__FILE__, $queue, $atomic); +$serv = new Swoole\Server('127.0.0.1', $port, SWOOLE_THREAD); +$serv->set(array( + 'worker_num' => 2, + 'log_level' => SWOOLE_LOG_ERROR, + 'open_eof_check' => true, + 'package_eof' => "\r\n", + 'init_arguments' => function () { + global $queue, $atomic; + $queue = new Swoole\Thread\Queue(); + $atomic = new Swoole\Thread\Atomic(1); + return [$queue, $atomic]; + } +)); +$serv->on('WorkerStart', function (Swoole\Server $serv, $workerId) use ($port) { + [$queue, $atomic] = Thread::getArguments(); + if ($workerId == 0) { + $queue->push("begin\n", Thread\Queue::NOTIFY_ALL); + } +}); +$serv->on('receive', function (Swoole\Server $serv, $fd, $rid, $data) { + $json = json_decode(rtrim($data)); + if ($json->type == 'eof') { + $serv->send($fd, "EOF\r\n"); + } +}); +$serv->on('shutdown', function () { + global $queue, $atomic; + echo 'shutdown', PHP_EOL; + Assert::eq($atomic->get(), 0); +}); +$serv->addProcess(new Swoole\Process(function ($process) use ($serv) { + [$queue, $atomic] = Thread::getArguments(); + global $port; echo $queue->pop(-1); - Co\run(function () use ($tm) { + Co\run(function () use ($port) { $cli = new Co\Client(SWOOLE_SOCK_TCP); $cli->set([ 'open_eof_check' => true, 'package_eof' => "\r\n", ]); - Assert::assert($cli->connect('127.0.0.1', $tm->getFreePort(), 2)); + Assert::assert($cli->connect('127.0.0.1', $port, 2)); $cli->send(json_encode(['type' => 'eof']) . "\r\n"); Assert::eq($cli->recv(), "EOF\r\n"); }); $atomic->set(0); echo "done\n"; - echo $queue->pop(-1); -}; - -$tm->childFunc = function ($queue, $atomic) use ($tm) { - $serv = new Swoole\Server('127.0.0.1', $tm->getFreePort(), SWOOLE_THREAD); - $serv->set(array( - 'worker_num' => 2, - 'log_level' => SWOOLE_LOG_ERROR, - 'open_eof_check' => true, - 'package_eof' => "\r\n", - 'init_arguments' => function () use ($queue, $atomic) { - return [$queue, $atomic]; - } - )); - $serv->on("WorkerStart", function (Swoole\Server $serv, $workerId) use ($queue, $atomic) { - if ($workerId == 0) { - $queue->push("begin\n", Thread\Queue::NOTIFY_ALL); - \Swoole\Timer::tick(200, function ($timerId) use ($atomic, $serv) { - if ($atomic->get() == 0) { - $serv->shutdown(); - \Swoole\Timer::clear($timerId); - } - }); - } - }); - $serv->on('receive', function (Swoole\Server $serv, $fd, $rid, $data) { - $json = json_decode(rtrim($data)); - if ($json->type == 'eof') { - $serv->send($fd, "EOF\r\n"); - } - }); - $serv->on('shutdown', function () use ($queue, $atomic) { - $queue->push("shutdown\n", Thread\Queue::NOTIFY_ALL); - }); - $serv->start(); -}; - -$tm->run(); + $serv->shutdown(); +})); +$serv->start(); ?> --EXPECT-- begin diff --git a/tests/swoole_thread/server/heartbeat.phpt b/tests/swoole_thread/server/heartbeat.phpt index 112cf6457d5..d53826cb934 100644 --- a/tests/swoole_thread/server/heartbeat.phpt +++ b/tests/swoole_thread/server/heartbeat.phpt @@ -10,22 +10,44 @@ skip_if_nts(); require __DIR__ . '/../../include/bootstrap.php'; use Swoole\Thread; -use Swoole\Thread\Lock; const SIZE = 2 * 1024 * 1024; +$port = get_constant_port(__FILE__); -$tm = new \SwooleTest\ThreadManager(); -$tm->initFreePorts(increment: crc32(__FILE__) % 1000); - -$tm->parentFunc = function () use ($tm) { - $queue = new Swoole\Thread\Queue(); - $atomic = new Swoole\Thread\Atomic(1); - $thread = new Thread(__FILE__, $queue, $atomic); +$serv = new Swoole\Server('127.0.0.1', $port, SWOOLE_THREAD); +$serv->set(array( + 'worker_num' => 1, + 'log_level' => SWOOLE_LOG_ERROR, + 'heartbeat_check_interval' => 1, + 'heartbeat_idle_time' => 2, + 'init_arguments' => function () { + global $queue, $atomic; + $queue = new Swoole\Thread\Queue(); + $atomic = new Swoole\Thread\Atomic(1); + return [$queue, $atomic]; + } +)); +$serv->on('WorkerStart', function (Swoole\Server $serv, $workerId) use ($port) { + [$queue, $atomic] = Thread::getArguments(); + if ($workerId == 0) { + $queue->push("begin\n", Thread\Queue::NOTIFY_ALL); + } +}); +$serv->on('receive', function (Swoole\Server $serv, $fd, $rid, $data) { +}); +$serv->on('shutdown', function () { + global $queue, $atomic; + echo 'shutdown', PHP_EOL; + Assert::eq($atomic->get(), 0); +}); +$serv->addProcess(new Swoole\Process(function ($process) use ($serv) { + [$queue, $atomic] = Thread::getArguments(); + global $port; echo $queue->pop(-1); $client = new Swoole\Client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC); - if (!$client->connect('127.0.0.1', $tm->getFreePort(), 5, 0)) { - echo "Over flow. errno=" . $client->errCode; + if (!$client->connect('127.0.0.1', $port, 5, 0)) { + echo "Error: " . $client->errCode; die("\n"); } $s1 = time(); @@ -35,40 +57,9 @@ $tm->parentFunc = function () use ($tm) { $atomic->set(0); echo "done\n"; - echo $queue->pop(-1); -}; - -$tm->childFunc = function ($queue, $atomic) use ($tm) { - $serv = new Swoole\Server('127.0.0.1', $tm->getFreePort(), SWOOLE_THREAD); - $serv->set(array( - 'worker_num' => 1, - 'log_level' => SWOOLE_LOG_ERROR, - 'heartbeat_check_interval' => 1, - 'heartbeat_idle_time' => 2, - 'init_arguments' => function () use ($queue, $atomic) { - return [$queue, $atomic]; - } - )); - $serv->on("WorkerStart", function (Swoole\Server $serv, $workerId) use ($queue, $atomic) { - if ($workerId == 0) { - $queue->push("begin\n", Thread\Queue::NOTIFY_ALL); - \Swoole\Timer::tick(200, function ($timerId) use ($atomic, $serv) { - if ($atomic->get() == 0) { - $serv->shutdown(); - \Swoole\Timer::clear($timerId); - } - }); - } - }); - $serv->on('receive', function (Swoole\Server $serv, $fd, $rid, $data) { - }); - $serv->on('shutdown', function () use ($queue, $atomic) { - $queue->push("shutdown\n", Thread\Queue::NOTIFY_ALL); - }); - $serv->start(); -}; - -$tm->run(); + $serv->shutdown(); +})); +$serv->start(); ?> --EXPECT-- begin diff --git a/tests/swoole_thread/server/send_large_packet.phpt b/tests/swoole_thread/server/send_large_packet.phpt index 63f57ed7166..2f277a7f5f0 100644 --- a/tests/swoole_thread/server/send_large_packet.phpt +++ b/tests/swoole_thread/server/send_large_packet.phpt @@ -10,24 +10,53 @@ skip_if_nts(); require __DIR__ . '/../../include/bootstrap.php'; use Swoole\Thread; -use Swoole\Thread\Lock; const SIZE = 2 * 1024 * 1024; +$port = get_constant_port(__FILE__); -$tm = new \SwooleTest\ThreadManager(); -$tm->initFreePorts(increment: crc32(__FILE__) % 1000); - -$tm->parentFunc = function () use ($tm) { - $queue = new Swoole\Thread\Queue(); - $atomic = new Swoole\Thread\Atomic(1); - $thread = new Thread(__FILE__, $queue, $atomic); +$serv = new Swoole\Server('127.0.0.1', $port, SWOOLE_THREAD); +$serv->set(array( + 'worker_num' => 2, + 'log_level' => SWOOLE_LOG_ERROR, + 'open_length_check' => true, + 'package_max_length' => 4 * 1024 * 1024, + 'package_length_type' => 'N', + 'package_length_offset' => 0, + 'package_body_offset' => 4, + 'init_arguments' => function () { + global $queue, $atomic; + $queue = new Swoole\Thread\Queue(); + $atomic = new Swoole\Thread\Atomic(1); + return [$queue, $atomic]; + } +)); +$serv->on('WorkerStart', function (Swoole\Server $serv, $workerId) use ($port) { + [$queue, $atomic] = Thread::getArguments(); + if ($workerId == 0) { + $queue->push("begin\n", Thread\Queue::NOTIFY_ALL); + } +}); +$serv->on("WorkerStop", function (Swoole\Server $serv, $workerId) { +}); +$serv->on('receive', function (Swoole\Server $serv, $fd, $rid, $data) { + $send_data = str_repeat('A', SIZE - 12) . substr($data, -8, 8); + $serv->send($fd, pack('N', strlen($send_data)) . $send_data); +}); +$serv->on('shutdown', function () { + global $queue, $atomic; + echo 'shutdown', PHP_EOL; + Assert::eq($atomic->get(), 0); +}); +$serv->addProcess(new Swoole\Process(function ($process) use ($serv) { + [$queue, $atomic] = Thread::getArguments(); + global $port; echo $queue->pop(-1); $c = MAX_CONCURRENCY_LOW; $n = MAX_REQUESTS_LOW; for ($i = 0; $i < $c; $i++) { - go(function () use ($tm, $i, $n, $atomic) { + go(function () use ($i, $n, $atomic, $port) { $cli = new Co\Client(SWOOLE_SOCK_TCP); $cli->set([ 'open_length_check' => true, @@ -36,7 +65,7 @@ $tm->parentFunc = function () use ($tm) { 'package_length_offset' => 0, 'package_body_offset' => 4, ]); - if ($cli->connect('127.0.0.1', $tm->getFreePort(), 2) == false) { + if ($cli->connect('127.0.0.1', $port, 2) == false) { echo "ERROR\n"; return; } @@ -53,47 +82,9 @@ $tm->parentFunc = function () use ($tm) { Swoole\Event::wait(); $atomic->set(0); echo "done\n"; - echo $queue->pop(-1); -}; - -$tm->childFunc = function ($queue, $atomic) use ($tm) { - $serv = new Swoole\Server('127.0.0.1', $tm->getFreePort(), SWOOLE_THREAD); - $serv->set(array( - 'worker_num' => 2, - 'log_level' => SWOOLE_LOG_ERROR, - 'open_length_check' => true, - 'package_max_length' => 4 * 1024 * 1024, - 'package_length_type' => 'N', - 'package_length_offset' => 0, - 'package_body_offset' => 4, - 'init_arguments' => function () use ($queue, $atomic) { - return [$queue, $atomic]; - } - )); - $serv->on("WorkerStart", function (Swoole\Server $serv, $workerId) use ($queue, $atomic) { - if ($workerId == 0) { - $queue->push("begin\n", Thread\Queue::NOTIFY_ALL); - \Swoole\Timer::tick(200, function ($timerId) use ($atomic, $serv) { - if ($atomic->get() == 0) { - $serv->shutdown(); - \Swoole\Timer::clear($timerId); - } - }); - } - }); - $serv->on("WorkerStop", function (Swoole\Server $serv, $workerId) use ($queue, $atomic) { - }); - $serv->on('receive', function (Swoole\Server $serv, $fd, $rid, $data) use ($queue, $atomic) { - $send_data = str_repeat('A', SIZE - 12) . substr($data, -8, 8); - $serv->send($fd, pack('N', strlen($send_data)) . $send_data); - }); - $serv->on('shutdown', function () use ($queue, $atomic) { - $queue->push("shutdown\n", Thread\Queue::NOTIFY_ALL); - }); - $serv->start(); -}; - -$tm->run(); + $serv->shutdown(); +})); +$serv->start(); ?> --EXPECT-- begin diff --git a/tests/swoole_thread/server/stop_worker.phpt b/tests/swoole_thread/server/stop_worker.phpt new file mode 100644 index 00000000000..d4e648a8f19 --- /dev/null +++ b/tests/swoole_thread/server/stop_worker.phpt @@ -0,0 +1,75 @@ +--TEST-- +swoole_thread/server: base +--SKIPIF-- + +--FILE-- +set(array( + 'worker_num' => 2, + 'task_worker_num' => 3, + 'log_level' => SWOOLE_LOG_ERROR, + 'init_arguments' => function () { + global $queue, $atomic1, $atomic2; + $queue = new Swoole\Thread\Queue(); + $atomic1 = new Swoole\Thread\Atomic(0); + $atomic2 = new Swoole\Thread\Atomic(0); + return [$queue, $atomic1, $atomic2]; + } +)); +$serv->on('WorkerStart', function (Swoole\Server $serv, $workerId) use ($port) { + [$queue, $atomic1, $atomic2] = Thread::getArguments(); + if ($atomic1->add() == 5) { + $queue->push("begin\n", Thread\Queue::NOTIFY_ALL); + } +}); +$serv->on('WorkerStop', function (Swoole\Server $serv, $workerId) { + [$queue, $atomic1, $atomic2] = Thread::getArguments(); + $atomic2->add(); +}); +$serv->on('Request', function ($req, $resp) use ($serv) { + if ($req->server['request_uri'] == '/stop') { + $serv->stop($req->get['worker'] ?? 0); + $resp->end("OK\n"); + } +}); +$serv->on('Task', function ($serv, $task_id, $worker_id, $data) { + +}); +$serv->on('shutdown', function () { + global $queue, $atomic1, $atomic2; + echo 'shutdown', PHP_EOL; + Assert::eq($atomic1->get(), 7); + Assert::eq($atomic2->get(), 7); +}); +$serv->addProcess(new Swoole\Process(function ($process) use ($serv) { + [$queue, $atomic] = Thread::getArguments(); + global $port; + echo $queue->pop(-1); + + echo file_get_contents('http://127.0.0.1:' . $port . '/stop?worker=' . random_int(0, 1)); + echo file_get_contents('http://127.0.0.1:' . $port . '/stop?worker=' . random_int(2, 4)); + + sleep(1); + echo "done\n"; + $serv->shutdown(); +})); +$serv->start(); +?> +--EXPECTF-- +begin +OK +OK +done +shutdown +