Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add onDisconnect callback #4230

Merged
merged 4 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions ext-src/php_swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ enum php_swoole_server_callback_type {
};
//--------------------------------------------------------
enum php_swoole_server_port_callback_type {
SW_SERVER_CB_onConnect, // worker(event)
SW_SERVER_CB_onReceive, // worker(event)
SW_SERVER_CB_onClose, // worker(event)
SW_SERVER_CB_onPacket, // worker(event)
SW_SERVER_CB_onRequest, // http server
SW_SERVER_CB_onHandShake, // worker(event)
SW_SERVER_CB_onOpen, // worker(event)
SW_SERVER_CB_onMessage, // worker(event)
SW_SERVER_CB_onConnect, // stream, worker(event)
SW_SERVER_CB_onReceive, // stream, worker(event)
SW_SERVER_CB_onClose, // stream, worker(event)
SW_SERVER_CB_onPacket, // dgram, worker(event)
SW_SERVER_CB_onRequest, // http, worker(event)
SW_SERVER_CB_onHandShake, // websocket, worker(event)
SW_SERVER_CB_onOpen, // websocket, worker(event)
SW_SERVER_CB_onMessage, // websocket, worker(event)
SW_SERVER_CB_onDisconnect, // websocket (non websocket connection), worker(event)
SW_SERVER_CB_onBufferFull, // worker(event)
SW_SERVER_CB_onBufferEmpty, // worker(event)
};
Expand Down Expand Up @@ -137,7 +138,6 @@ void php_swoole_server_onConnect(swServer *, swDataHead *);
int php_swoole_server_onReceive(swServer *, swRecvData *);
int php_swoole_http_server_onReceive(swServer *, swRecvData *);
int php_swoole_redis_server_onReceive(swServer *serv, swRecvData *req);
void php_swoole_http_server_onClose(swServer *, swDataHead *);
int php_swoole_server_onPacket(swServer *, swRecvData *);
void php_swoole_server_onClose(swServer *, swDataHead *);
void php_swoole_server_onBufferFull(swServer *, swDataHead *);
Expand Down
13 changes: 0 additions & 13 deletions ext-src/swoole_http_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,6 @@ int php_swoole_http_server_onReceive(Server *serv, RecvData *req) {
return SW_OK;
}

void php_swoole_http_server_onClose(Server *serv, DataHead *ev) {
Connection *conn = serv->get_connection_by_session_id(ev->fd);
if (!conn) {
return;
}
php_swoole_server_onClose(serv, ev);
#ifdef SW_USE_HTTP2
if (conn->http2_stream) {
swoole_http2_server_session_free(conn);
}
#endif
}

void php_swoole_http_server_minit(int module_number) {
SW_INIT_CLASS_ENTRY_EX(
swoole_http_server, "Swoole\\Http\\Server", "swoole_http_server", nullptr, nullptr, swoole_server);
Expand Down
34 changes: 25 additions & 9 deletions ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "php_swoole_server.h"
#include "php_swoole_http_server.h"
#include "php_swoole_process.h"
#include "swoole_msg_queue.h"

Expand Down Expand Up @@ -1156,9 +1157,6 @@ void ServerObject::on_before_start() {

if (find_http_port) {
serv->onReceive = php_swoole_http_server_onReceive;
if (serv->is_support_unsafe_events()) {
serv->onClose = php_swoole_http_server_onClose;
}
php_swoole_http_server_init_global_variant();
}
}
Expand Down Expand Up @@ -1843,25 +1841,37 @@ void php_swoole_server_onConnect(Server *serv, DataHead *info) {
void php_swoole_server_onClose(Server *serv, DataHead *info) {
zval *zserv = (zval *) serv->private_data_2;
ServerObject *server_object = server_fetch_object(Z_OBJ_P(zserv));
SessionId session_id = info->fd;

if (serv->enable_coroutine && serv->send_yield) {
auto _i_coros_list = server_object->property->send_coroutine_map.find(info->fd);
auto _i_coros_list = server_object->property->send_coroutine_map.find(session_id);
if (_i_coros_list != server_object->property->send_coroutine_map.end()) {
auto coros_list = _i_coros_list->second;
server_object->property->send_coroutine_map.erase(info->fd);
server_object->property->send_coroutine_map.erase(session_id);
while (!coros_list->empty()) {
FutureTask *context = coros_list->front();
coros_list->pop_front();
swoole_set_last_error(ECONNRESET);
zval_ptr_dtor(&context->coro_params);
ZVAL_NULL(&context->coro_params);
php_swoole_server_send_resume(serv, context, info->fd);
php_swoole_server_send_resume(serv, context, session_id);
}
delete coros_list;
}
}

auto fci_cache = php_swoole_server_get_fci_cache(serv, info->server_fd, SW_SERVER_CB_onClose);
auto *fci_cache = php_swoole_server_get_fci_cache(serv, info->server_fd, SW_SERVER_CB_onClose);
Connection *conn = serv->get_connection_by_session_id(session_id);
if (!conn) {
return;
}
if (conn->websocket_status != WEBSOCKET_STATUS_ACTIVE) {
ListenPort *port = serv->get_port_by_server_fd(info->server_fd);
if (port && port->open_websocket_protocol
&& php_swoole_server_isset_callback(serv, port, SW_SERVER_CB_onDisconnect)) {
fci_cache = php_swoole_server_get_fci_cache(serv, info->server_fd, SW_SERVER_CB_onDisconnect);
}
}
if (fci_cache) {
zval *zserv = (zval *) serv->private_data_2;
zval args[3];
Expand All @@ -1872,14 +1882,14 @@ void php_swoole_server_onClose(Server *serv, DataHead *info) {
zval *object = &args[1];
object_init_ex(object, swoole_server_event_ce);
zend_update_property_long(
swoole_server_event_ce, SW_Z8_OBJ_P(object), ZEND_STRL("fd"), (zend_long) info->fd);
swoole_server_event_ce, SW_Z8_OBJ_P(object), ZEND_STRL("fd"), (zend_long) session_id);
zend_update_property_long(
swoole_server_event_ce, SW_Z8_OBJ_P(object), ZEND_STRL("reactor_id"), (zend_long) info->reactor_id);
zend_update_property_double(
swoole_server_event_ce, SW_Z8_OBJ_P(object), ZEND_STRL("dispatch_time"), info->time);
argc = 2;
} else {
ZVAL_LONG(&args[1], info->fd);
ZVAL_LONG(&args[1], session_id);
ZVAL_LONG(&args[2], info->reactor_id);
argc = 3;
}
Expand All @@ -1892,6 +1902,12 @@ void php_swoole_server_onClose(Server *serv, DataHead *info) {
zval_ptr_dtor(&args[1]);
}
}

#ifdef SW_USE_HTTP2
if (conn->http2_stream) {
swoole_http2_server_session_free(conn);
}
#endif
}

void php_swoole_server_onBufferFull(Server *serv, DataHead *info) {
Expand Down
58 changes: 22 additions & 36 deletions ext-src/swoole_server_port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ static std::unordered_map<std::string, server_port_event> server_port_event_map(
{ "handshake", server_port_event(SW_SERVER_CB_onHandShake, "Handshake") },
{ "open", server_port_event(SW_SERVER_CB_onOpen, "Open") },
{ "message", server_port_event(SW_SERVER_CB_onMessage, "Message") },
{ "disconnect", server_port_event(SW_SERVER_CB_onDisconnect, "Disconnect") },
});
// clang-format on

Expand Down Expand Up @@ -184,6 +185,7 @@ void php_swoole_server_port_minit(int module_number) {
zend_declare_property_null(swoole_server_port_ce, ZEND_STRL("onHandShake"), ZEND_ACC_PRIVATE);
zend_declare_property_null(swoole_server_port_ce, ZEND_STRL("onOpen"), ZEND_ACC_PRIVATE);
zend_declare_property_null(swoole_server_port_ce, ZEND_STRL("onMessage"), ZEND_ACC_PRIVATE);
zend_declare_property_null(swoole_server_port_ce, ZEND_STRL("onDisconnect"), ZEND_ACC_PRIVATE);

zend_declare_property_null(swoole_server_port_ce, ZEND_STRL("host"), ZEND_ACC_PUBLIC);
zend_declare_property_long(swoole_server_port_ce, ZEND_STRL("port"), 0, ZEND_ACC_PUBLIC);
Expand Down Expand Up @@ -618,7 +620,7 @@ static PHP_METHOD(swoole_server_port, set) {

static PHP_METHOD(swoole_server_port, on) {
char *name = nullptr;
size_t len, i;
size_t len;
zval *cb;

ServerPortProperty *property = php_swoole_server_port_get_and_check_property(ZEND_THIS);
Expand All @@ -640,55 +642,39 @@ static PHP_METHOD(swoole_server_port, on) {
}
efree(func_name);

const char *callback_name[PHP_SWOOLE_SERVER_PORT_CALLBACK_NUM] = {
"Connect",
"Receive",
"Close",
"Packet",
"Request",
"HandShake",
"Open",
"Message",
"BufferFull",
"BufferEmpty",
};

char property_name[128];
int l_property_name = 0;
memcpy(property_name, "on", 2);

for (i = 0; i < PHP_SWOOLE_SERVER_PORT_CALLBACK_NUM; i++) {
if (!swoole_strcaseeq(name, len, callback_name[i], strlen(callback_name[i]))) {
bool found = false;
for (auto i = server_port_event_map.begin(); i!= server_port_event_map.end(); i++) {
if (!swoole_strcaseeq(name, len, i->first.c_str(), i->first.length())) {
continue;
}

memcpy(property_name + 2, callback_name[i], len);
l_property_name = len + 2;
property_name[l_property_name] = '\0';
zend_update_property(swoole_server_port_ce, SW_Z8_OBJ_P(ZEND_THIS), property_name, l_property_name, cb);
property->callbacks[i] =
sw_zend_read_property(swoole_server_port_ce, ZEND_THIS, property_name, l_property_name, 0);
sw_copy_to_stack(property->callbacks[i], property->_callbacks[i]);
if (property->caches[i]) {
efree(property->caches[i]);
found = true;
int index = i->second.type;
std::string property_name = std::string("on") + i->second.name;
zend_update_property(swoole_server_port_ce, SW_Z8_OBJ_P(ZEND_THIS), property_name.c_str(), property_name.length(), cb);
property->callbacks[index] =
sw_zend_read_property(swoole_server_port_ce, ZEND_THIS, property_name.c_str(), property_name.length(), 0);
sw_copy_to_stack(property->callbacks[index], property->_callbacks[index]);
if (property->caches[index]) {
efree(property->caches[index]);
}
property->caches[i] = fci_cache;
property->caches[index] = fci_cache;

if (i == SW_SERVER_CB_onConnect && !serv->onConnect) {
if (index == SW_SERVER_CB_onConnect && !serv->onConnect) {
serv->onConnect = php_swoole_server_onConnect;
} else if (i == SW_SERVER_CB_onPacket && !serv->onPacket) {
} else if (index == SW_SERVER_CB_onPacket && !serv->onPacket) {
serv->onPacket = php_swoole_server_onPacket;
} else if (i == SW_SERVER_CB_onClose && !serv->onClose) {
} else if (index == SW_SERVER_CB_onClose && !serv->onClose) {
serv->onClose = php_swoole_server_onClose;
} else if (i == SW_SERVER_CB_onBufferFull && !serv->onBufferFull) {
} else if (index == SW_SERVER_CB_onBufferFull && !serv->onBufferFull) {
serv->onBufferFull = php_swoole_server_onBufferFull;
} else if (i == SW_SERVER_CB_onBufferEmpty && !serv->onBufferEmpty) {
} else if (index == SW_SERVER_CB_onBufferEmpty && !serv->onBufferEmpty) {
serv->onBufferEmpty = php_swoole_server_onBufferEmpty;
}
break;
}

if (l_property_name == 0) {
if (!found) {
php_swoole_error(E_WARNING, "unknown event types[%s]", name);
efree(fci_cache);
RETURN_FALSE;
Expand Down
4 changes: 4 additions & 0 deletions tests/include/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ function clear_php()
`ps -A | grep php | grep -v phpstorm | grep -v 'run-tests' | awk '{print $1}' | xargs kill -9 > /dev/null 2>&1`;
}

function puts($msg) {
echo $msg."\n";
}

function top(int $pid)
{
static $available;
Expand Down
18 changes: 7 additions & 11 deletions tests/swoole_server_port/http.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ $pm->parentFunc = function ($pid) use ($pm)
if (!$cli->connect('127.0.0.1', $pm->getFreePort(0), 0.5))
{
fail:
echo "ERROR\n";
echo "ERROR 1\n";
return;
}
//no eof, should be timeout here
Expand All @@ -36,14 +36,13 @@ $pm->parentFunc = function ($pid) use ($pm)
echo "OK\n";
});

go(function () use ($pm)
{
go(function () use ($pm) {
$cli = new Swoole\Coroutine\Http\Client('127.0.0.1', $pm->getFreePort(1));
if ( $cli->get("/") ) {
if ($cli->get("/")) {
echo $cli->body;
Assert::same($cli->statusCode, 200);
} else {
echo "ERROR\n";
echo "ERROR 2\n";
}
});

Expand All @@ -68,21 +67,18 @@ $pm->childFunc = function () use ($pm)

$port2 = $server->listen('127.0.0.1', $pm->getFreePort(1), SWOOLE_SOCK_TCP);
$port2->set(['open_http_protocol' => true,]);
$port2->on("request", function ($req, $resp)
{
$port2->on("request", function ($req, $resp) {
$resp->end("hello swooler\n");
});

$server->on("WorkerStart", function (\swoole_server $serv)
{
$server->on("WorkerStart", function (\swoole_server $serv) {
/**
* @var $pm ProcessManager
*/
global $pm;
$pm->wakeup();
});
$server->on('request', function (swoole_http_request $request, swoole_http_response $response)
{
$server->on('request', function (swoole_http_request $request, swoole_http_response $response) {
$response->end("OK\n");
});
$server->start();
Expand Down
66 changes: 66 additions & 0 deletions tests/swoole_websocket_server/onDisconnct.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
--TEST--
swoole_websocket_server: onDisconnect
--SKIPIF--
<?php require __DIR__ . '/../include/skipif.inc'; ?>
--FILE--
<?php
require __DIR__ . '/../include/bootstrap.php';

use function Swoole\Coroutine\run;
use Swoole\Coroutine\Http\Client;

$pm = new ProcessManager;

$pm->parentFunc = function (int $pid) use ($pm) {
run(function () use ($pm) {
$data = httpGetBody('http://127.0.0.1:' . $pm->getFreePort() . '/');
Assert::contains($data, 'HTTP 400 Bad Request');

$client = new Client('127.0.0.1', $pm->getFreePort());
Assert::assert($client->upgrade('/websocket'));
Assert::eq($client->getStatusCode(), 101);
$client->push('hello world');
$client->close();
});
puts('done!');
$pm->kill();
};
$pm->childFunc = function () use ($pm) {
$serv = new swoole_websocket_server('127.0.0.1', $pm->getFreePort(), SERVER_MODE_RANDOM);
$serv->set([
'worker_num' => 1,
'log_file' => '/dev/null'
]);
$serv->on('WorkerStart', function () use ($pm) {
$pm->wakeup();
});
$serv->on('Message', function (swoole_websocket_server $serv, swoole_websocket_frame $frame) {
if ($frame->data == 'shutdown') {
$serv->disconnect($frame->fd, 4000, 'shutdown received');
}
});
$serv->on('connect', function ($s, $id) use ($pm) {
puts("connect ".$id);
});
$serv->on('disconnect', function ($s, $id) use ($pm) {
puts("disconnect ".$id);
});
$serv->on('open', function ($s, $req) use ($pm) {
puts("open ".$req->fd);
});
$serv->on('close', function ($s, $id) use ($pm) {
puts("close ".$id);
});
$serv->start();
};
$pm->childFirst();
$pm->run();
?>
--EXPECT--
connect 1
disconnect 1
connect 2
open 2
close 2
done!