Skip to content

Commit

Permalink
Optimize Server::stop()
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Aug 13, 2024
1 parent cb3a2dc commit f0cd53d
Show file tree
Hide file tree
Showing 25 changed files with 440 additions and 255 deletions.
35 changes: 25 additions & 10 deletions examples/server/echo.php
Original file line number Diff line number Diff line change
@@ -1,29 +1,44 @@
<?php
$serv = new Swoole\Server("0.0.0.0", 9501, SWOOLE_BASE);
//$serv = new Swoole\Server("0.0.0.0", 9501, SWOOLE_BASE);
// $serv = new Swoole\Server("0.0.0.0", 9501);
$serv = new Swoole\Server("0.0.0.0", 9501, SWOOLE_THREAD);

function getpid()
{
global $serv;
return $serv->mode === SWOOLE_THREAD ? \Swoole\Thread::getId() : posix_getpid();
}

$serv->set([
'worker_num' =>1,
'worker_num' => 2,
'task_worker_num' => 3,
]);

$serv->on('connect', function ($serv, $fd, $reactor_id){
echo "[#".posix_getpid()."]\tClient@[$fd:$reactor_id]: Connect.\n";
$serv->on('workerStart', function ($serv, $worker_id) {
echo "[#" . getpid() . "]\tWorker#{$worker_id} is started.\n";
});

$serv->on('workerStop', function ($serv, $worker_id) {
echo "[#" . getpid() . "]\tWorker#{$worker_id} is stopped.\n";
});

$serv->set(array(
'worker_num' => 1,
));
$serv->on('connect', function ($serv, $fd, $reactor_id) {
echo "[#" . getpid() . "]\tClient@[$fd:$reactor_id]: Connect.\n";
});

$serv->on('receive', function (Swoole\Server $serv, $fd, $reactor_id, $data) {
echo "[#".$serv->worker_id."]\tClient[$fd] receive data: $data\n";
echo "[#" . $serv->worker_id . "]\tClient[$fd] receive data: $data\n";
if ($serv->send($fd, "hello {$data}\n") == false) {
echo "error\n";
}

});

$serv->on('close', function ($serv, $fd, $reactor_id) {
echo "[#".posix_getpid()."]\tClient@[$fd:$reactor_id]: Close.\n";
echo "[#" . getpid() . "]\tClient@[$fd:$reactor_id]: Close.\n";
});

$serv->on('task', function ($serv, $src_worker_id, $task) {
var_dump($task);
});

$serv->start();
29 changes: 20 additions & 9 deletions examples/thread/thread_server.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
<?php
$http = new Swoole\Http\Server("0.0.0.0", 9503, SWOOLE_THREAD);

use Swoole\Http\Server;

$http = new Server("0.0.0.0", 9503, SWOOLE_THREAD);
$http->set([
'worker_num' => 2,
'task_worker_num' => 3,
Expand All @@ -17,6 +20,10 @@
// $resp->end("tid=" . \Swoole\Thread::getId() . ', fd=' . $req->fd);
if ($req->server['request_uri'] == '/task') {
$http->task(['code' => uniqid()]);
} elseif ($req->server['request_uri'] == '/stop') {
var_dump($http->getWorkerId());
var_dump($req->get['worker_id']);
$http->stop($req->get['worker_id'] ?? 0);
} elseif ($req->server['request_uri'] == '/msg') {
$dstWorkerId = random_int(0, 4);
if ($dstWorkerId != $http->getWorkerId()) {
Expand All @@ -31,10 +38,10 @@
echo "[worker#" . $http->getWorkerId() . "]\treceived pipe message[$msg] from " . $srcWorkerId . "\n";
});

$http->addProcess(new \Swoole\Process(function () {
echo "user process, id=" . \Swoole\Thread::getId() . "\n";
sleep(2);
}));
//$http->addProcess(new \Swoole\Process(function () {
// echo "user process, id=" . \Swoole\Thread::getId() . "\n";
// sleep(2);
//}));

$http->on('Task', function ($server, $taskId, $srcWorkerId, $data) {
var_dump($taskId, $srcWorkerId, $data);
Expand All @@ -45,12 +52,16 @@
var_dump($taskId, $data);
});

$http->on('WorkerStart', function ($serv, $wid) {
var_dump(\Swoole\Thread::getArguments(), $wid);
$http->on('workerStart', function ($serv, $worker_id) {
echo "[#" . Swoole\Thread::getId() . "]\tWorker#{$worker_id} is started.\n";
});

$http->on('workerStop', function ($serv, $worker_id) {
echo "[#" . Swoole\Thread::getId() . "]\tWorker#{$worker_id} is stopped.\n";
});

$http->on('WorkerStop', function ($serv, $wid) {
var_dump('stop: T' . \Swoole\Thread::getId());
$http->on('workerExit', function (Server $serv, $worker_id) {
echo "[#" . Swoole\Thread::getId() . "]\tWorker#{$worker_id} is exited, event_num=" . Swoole\Coroutine::stats()['event_num'] . ".\n";
});

$http->start();
4 changes: 2 additions & 2 deletions ext-src/php_swoole.cc
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ PHP_MINIT_FUNCTION(swoole) {
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_HTTP_PROXY_BAD_RESPONSE", SW_ERROR_HTTP_PROXY_BAD_RESPONSE);
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_HTTP_CONFLICT_HEADER", SW_ERROR_HTTP_CONFLICT_HEADER);
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_HTTP_CONTEXT_UNAVAILABLE", SW_ERROR_HTTP_CONTEXT_UNAVAILABLE);
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_HTTP_COOKIE_UNAVAILABLE", SW_ERROR_HTTP_COOKIE_UNAVAILABLE);
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_WEBSOCKET_BAD_CLIENT", SW_ERROR_WEBSOCKET_BAD_CLIENT);
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_WEBSOCKET_BAD_OPCODE", SW_ERROR_WEBSOCKET_BAD_OPCODE);
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_WEBSOCKET_UNCONNECTED", SW_ERROR_WEBSOCKET_UNCONNECTED);
Expand All @@ -610,6 +611,7 @@ PHP_MINIT_FUNCTION(swoole) {
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_SERVER_INVALID_COMMAND", SW_ERROR_SERVER_INVALID_COMMAND);
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_SERVER_IS_NOT_REGULAR_FILE", SW_ERROR_SERVER_IS_NOT_REGULAR_FILE);
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_SERVER_SEND_TO_WOKER_TIMEOUT", SW_ERROR_SERVER_SEND_TO_WOKER_TIMEOUT);
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_SERVER_INVALID_CALLBACK", SW_ERROR_SERVER_INVALID_CALLBACK);
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_SERVER_WORKER_EXIT_TIMEOUT", SW_ERROR_SERVER_WORKER_EXIT_TIMEOUT);
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_SERVER_WORKER_ABNORMAL_PIPE_DATA", SW_ERROR_SERVER_WORKER_ABNORMAL_PIPE_DATA);
SW_REGISTER_LONG_CONSTANT("SWOOLE_ERROR_SERVER_WORKER_UNPROCESSED_DATA", SW_ERROR_SERVER_WORKER_UNPROCESSED_DATA);
Expand Down Expand Up @@ -1009,7 +1011,6 @@ PHP_RINIT_FUNCTION(swoole) {
}

SWOOLE_G(req_status) = PHP_SWOOLE_RINIT_BEGIN;
SwooleG.running = 1;

php_swoole_register_shutdown_function("swoole_internal_call_user_shutdown_begin");

Expand Down Expand Up @@ -1082,7 +1083,6 @@ PHP_RSHUTDOWN_FUNCTION(swoole) {
php_swoole_thread_rshutdown();
#endif

SwooleG.running = 0;
SWOOLE_G(req_status) = PHP_SWOOLE_RSHUTDOWN_END;

#ifdef PHP_STREAM_FLAG_NO_CLOSE
Expand Down
12 changes: 12 additions & 0 deletions ext-src/php_swoole_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ extern PHPAPI int php_array_merge(zend_array *dest, zend_array *src);
RETURN_FALSE; \
}

#ifdef SW_THREAD
#define SW_MUST_BE_MAIN_THREAD_EX(op) \
if (!tsrm_is_main_thread()) { \
swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT); \
op; \
}
#else
#define SW_MUST_BE_MAIN_THREAD()
#endif

#define SW_MUST_BE_MAIN_THREAD() SW_MUST_BE_MAIN_THREAD_EX(RETURN_TRUE)

#define php_swoole_fatal_error(level, fmt_str, ...) \
swoole_set_last_error(SW_ERROR_PHP_FATAL_ERROR); \
php_error_docref(NULL, level, (const char *) (fmt_str), ##__VA_ARGS__)
Expand Down
7 changes: 1 addition & 6 deletions ext-src/swoole_async_coro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,7 @@ void php_swoole_set_aio_option(HashTable *vht) {
}

PHP_FUNCTION(swoole_async_set) {
#ifdef SW_THREAD
if (!tsrm_is_main_thread()) {
swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT);
RETURN_FALSE;
}
#endif
SW_MUST_BE_MAIN_THREAD();
if (sw_reactor()) {
php_swoole_fatal_error(E_ERROR, "eventLoop has already been created. unable to change settings");
swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT);
Expand Down
1 change: 1 addition & 0 deletions ext-src/swoole_coroutine_system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ PHP_METHOD(swoole_coroutine_system, waitPid) {
}

PHP_METHOD(swoole_coroutine_system, waitSignal) {
SW_MUST_BE_MAIN_THREAD();
zend_long signo;
double timeout = -1;

Expand Down
1 change: 1 addition & 0 deletions ext-src/swoole_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ static PHP_METHOD(swoole_process, kill) {
}

static PHP_METHOD(swoole_process, signal) {
SW_MUST_BE_MAIN_THREAD();
zend_long signo = 0;
zval *zcallback = nullptr;
zend_fcall_info_cache *fci_cache = nullptr;
Expand Down
7 changes: 1 addition & 6 deletions ext-src/swoole_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1176,12 +1176,7 @@ void PHPCoroutine::enable_unsafe_function() {
}

bool PHPCoroutine::enable_hook(uint32_t flags) {
#ifdef SW_THREAD
if (!tsrm_is_main_thread()) {
swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT);
return false;
}
#endif
SW_MUST_BE_MAIN_THREAD_EX(return false);
if (swoole_isset_hook((enum swGlobalHookType) PHP_SWOOLE_HOOK_BEFORE_ENABLE_HOOK)) {
swoole_call_hook((enum swGlobalHookType) PHP_SWOOLE_HOOK_BEFORE_ENABLE_HOOK, &flags);
}
Expand Down
47 changes: 18 additions & 29 deletions ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ void php_swoole_server_rshutdown() {
Server *serv = sw_server();
serv->drain_worker_pipe();

if (serv->is_started() && serv->is_running() && !serv->is_user_worker()) {
if (serv->is_started() && serv->worker_is_running() && !serv->is_user_worker()) {
if (php_swoole_is_fatal_error()) {
swoole_error_log(SW_LOG_ERROR,
SW_ERROR_PHP_FATAL_ERROR,
Expand Down Expand Up @@ -1492,12 +1492,14 @@ static void php_swoole_server_onWorkerStart(Server *serv, Worker *worker) {
zend_update_property_bool(swoole_server_ce, SW_Z8_OBJ_P(zserv), ZEND_STRL("taskworker"), serv->is_task_worker());
zend_update_property_long(swoole_server_ce, SW_Z8_OBJ_P(zserv), ZEND_STRL("worker_pid"), getpid());

if (serv->is_task_worker() && !serv->task_enable_coroutine) {
PHPCoroutine::disable_hook();
if (serv->is_task_worker()) {
if (!serv->task_enable_coroutine) {
PHPCoroutine::disable_hook();
}
} else {
serv->get_worker_message_bus()->set_allocator(sw_zend_string_allocator());
}

serv->get_worker_message_bus()->set_allocator(sw_zend_string_allocator());

zval args[2];
args[0] = *zserv;
ZVAL_LONG(&args[1], worker->id);
Expand Down Expand Up @@ -1540,10 +1542,9 @@ static void php_swoole_server_onAfterReload(Server *serv) {
}

static void php_swoole_server_onWorkerStop(Server *serv, Worker *worker) {
if (SwooleWG.shutdown) {
if (!SwooleWG.running) {
return;
}
SwooleWG.shutdown = true;

zval *zserv = php_swoole_server_zval_ptr(serv);
ServerObject *server_object = server_fetch_object(Z_OBJ_P(zserv));
Expand Down Expand Up @@ -1926,6 +1927,7 @@ static PHP_METHOD(swoole_server, __construct) {
&& serv_mode != Server::MODE_THREAD
#endif
) {
swoole_set_last_error(SW_ERROR_INVALID_PARAMS);
zend_throw_error(NULL, "invalid $mode parameters %d", (int) serv_mode);
RETURN_FALSE;
}
Expand All @@ -1935,8 +1937,14 @@ static PHP_METHOD(swoole_server, __construct) {
server_ctor(ZEND_THIS, sw_server());
return;
}
if (!tsrm_is_main_thread()) {
swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT);
zend_throw_exception_ex(swoole_exception_ce, -2, "This operation is only allowed in the main thread");
RETURN_FALSE;
}
#else
if (sw_server() != nullptr) {
swoole_set_last_error(SW_ERROR_OPERATION_NOT_SUPPORT);
zend_throw_exception_ex(
swoole_exception_ce, -3, "server is running. unable to create %s", SW_Z_OBJCE_NAME_VAL_P(zserv));
RETURN_FALSE;
Expand Down Expand Up @@ -2672,7 +2680,7 @@ static PHP_METHOD(swoole_server, start) {
server_object->on_before_start();

if (serv->start() < 0) {
php_swoole_fatal_error(E_ERROR, "failed to start server. Error: %s", sw_error);
php_swoole_fatal_error(E_ERROR, "failed to start server. Error: %s", serv->get_startup_error_message());
}

#ifdef SW_THREAD
Expand Down Expand Up @@ -3869,7 +3877,7 @@ static PHP_METHOD(swoole_server, getWorkerPid) {

static PHP_METHOD(swoole_server, getManagerPid) {
Server *serv = php_swoole_server_get_and_check_server(ZEND_THIS);
RETURN_LONG(serv->gs->manager_pid);
RETURN_LONG(serv->get_manager_pid());
}

static PHP_METHOD(swoole_server, getMasterPid) {
Expand Down Expand Up @@ -3898,26 +3906,7 @@ static PHP_METHOD(swoole_server, stop) {
Z_PARAM_BOOL(wait_reactor)
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

if (worker_id == sw_worker()->id && wait_reactor == 0) {
if (SwooleTG.reactor != nullptr) {
SwooleTG.reactor->defer(
[](void *data) {
Reactor *reactor = (Reactor *) data;
reactor->running = false;
},
SwooleTG.reactor);
}
serv->running = false;
} else {
Worker *worker = serv->get_worker(worker_id);
if (worker == nullptr) {
RETURN_FALSE;
} else if (swoole_kill(worker->pid, SIGTERM) < 0) {
php_swoole_sys_error(E_WARNING, "kill(%d, SIGTERM) failed", worker->pid);
RETURN_FALSE;
}
}
RETURN_TRUE;
RETURN_BOOL(serv->kill_worker(worker_id, wait_reactor));
}

// swoole_connection_iterator
Expand Down
7 changes: 7 additions & 0 deletions include/swoole.h
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,13 @@ static inline int swoole_get_last_error(void) {
return SwooleTG.error;
}

static inline void swoole_clear_last_error(void) {
SwooleTG.error = 0;
}

void swoole_clear_last_error_msg(void);
const char *swoole_get_last_error_msg(void);

static inline int swoole_get_thread_id(void) {
return SwooleTG.id;
}
Expand Down
1 change: 1 addition & 0 deletions include/swoole_error.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ enum swErrorCode {
SW_ERROR_SERVER_INVALID_COMMAND,
SW_ERROR_SERVER_IS_NOT_REGULAR_FILE,
SW_ERROR_SERVER_SEND_TO_WOKER_TIMEOUT,
SW_ERROR_SERVER_INVALID_CALLBACK,

/**
* Process exit timeout, forced to end.
Expand Down
1 change: 1 addition & 0 deletions include/swoole_process_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ struct Worker;
struct WorkerGlobal {
bool run_always;
bool shutdown;
bool running;
uint32_t max_request;
Worker *worker;
Worker *worker_copy;
Expand Down
9 changes: 9 additions & 0 deletions include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,7 @@ class Server {
return nullptr;
}

bool kill_worker(WorkerId worker_id, bool wait_reactor);
void stop_async_worker(Worker *worker);
void stop_master_thread();
void join_heartbeat_thread();
Expand Down Expand Up @@ -1166,6 +1167,10 @@ class Server {
return swoole_get_process_type() == SW_PROCESS_EVENTWORKER;
}

bool is_event_worker() {
return is_worker();
}

bool is_task_worker() {
return swoole_get_process_type() == SW_PROCESS_TASKWORKER;
}
Expand Down Expand Up @@ -1313,6 +1318,7 @@ class Server {

void call_hook(enum HookType type, void *arg);
void call_worker_start_callback(Worker *worker);
void call_worker_stop_callback(Worker *worker);
void call_command_handler(MessageBus &mb, uint16_t worker_id, network::Socket *sock);
std::string call_command_handler_in_master(int command_id, const std::string &msg);
void call_command_callback(int64_t request_id, const std::string &result);
Expand Down Expand Up @@ -1431,6 +1437,7 @@ class Server {
void worker_stop_callback(Worker *worker);
void worker_accept_event(DataHead *info);
void worker_signal_init(void);
bool worker_is_running();
std::function<void(const WorkerFn &fn)> worker_thread_start;

/**
Expand All @@ -1451,6 +1458,7 @@ class Server {
int start_master_thread(Reactor *reactor);
int start_event_worker(Worker *worker);
void start_heartbeat_thread();
const char *get_startup_error_message();

private:
enum Mode mode_;
Expand All @@ -1477,6 +1485,7 @@ class Server {
int start_reactor_threads();
int start_reactor_processes();
int start_worker_threads();
void stop_worker_threads();
void join_reactor_thread();
TimerCallback get_timeout_callback(ListenPort *port, Reactor *reactor, Connection *conn);

Expand Down
Loading

0 comments on commit f0cd53d

Please sign in to comment.