Skip to content

Commit

Permalink
Add onDisconnect callback (#4230)
Browse files Browse the repository at this point in the history
* add onDisconnect

* fix tests

* fix tests[2]
  • Loading branch information
matyhtf authored May 21, 2021
1 parent af1722e commit 7ca38ff
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 78 deletions.
18 changes: 9 additions & 9 deletions ext-src/php_swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ enum php_swoole_server_callback_type {
};
//--------------------------------------------------------
enum php_swoole_server_port_callback_type {
SW_SERVER_CB_onConnect, // worker(event)
SW_SERVER_CB_onReceive, // worker(event)
SW_SERVER_CB_onClose, // worker(event)
SW_SERVER_CB_onPacket, // worker(event)
SW_SERVER_CB_onRequest, // http server
SW_SERVER_CB_onHandShake, // worker(event)
SW_SERVER_CB_onOpen, // worker(event)
SW_SERVER_CB_onMessage, // worker(event)
SW_SERVER_CB_onConnect, // stream, worker(event)
SW_SERVER_CB_onReceive, // stream, worker(event)
SW_SERVER_CB_onClose, // stream, worker(event)
SW_SERVER_CB_onPacket, // dgram, worker(event)
SW_SERVER_CB_onRequest, // http, worker(event)
SW_SERVER_CB_onHandShake, // websocket, worker(event)
SW_SERVER_CB_onOpen, // websocket, worker(event)
SW_SERVER_CB_onMessage, // websocket, worker(event)
SW_SERVER_CB_onDisconnect, // websocket (non websocket connection), worker(event)
SW_SERVER_CB_onBufferFull, // worker(event)
SW_SERVER_CB_onBufferEmpty, // worker(event)
};
Expand Down Expand Up @@ -137,7 +138,6 @@ void php_swoole_server_onConnect(swServer *, swDataHead *);
int php_swoole_server_onReceive(swServer *, swRecvData *);
int php_swoole_http_server_onReceive(swServer *, swRecvData *);
int php_swoole_redis_server_onReceive(swServer *serv, swRecvData *req);
void php_swoole_http_server_onClose(swServer *, swDataHead *);
int php_swoole_server_onPacket(swServer *, swRecvData *);
void php_swoole_server_onClose(swServer *, swDataHead *);
void php_swoole_server_onBufferFull(swServer *, swDataHead *);
Expand Down
13 changes: 0 additions & 13 deletions ext-src/swoole_http_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,6 @@ int php_swoole_http_server_onReceive(Server *serv, RecvData *req) {
return SW_OK;
}

void php_swoole_http_server_onClose(Server *serv, DataHead *ev) {
Connection *conn = serv->get_connection_by_session_id(ev->fd);
if (!conn) {
return;
}
php_swoole_server_onClose(serv, ev);
#ifdef SW_USE_HTTP2
if (conn->http2_stream) {
swoole_http2_server_session_free(conn);
}
#endif
}

void php_swoole_http_server_minit(int module_number) {
SW_INIT_CLASS_ENTRY_EX(
swoole_http_server, "Swoole\\Http\\Server", "swoole_http_server", nullptr, nullptr, swoole_server);
Expand Down
34 changes: 25 additions & 9 deletions ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "php_swoole_server.h"
#include "php_swoole_http_server.h"
#include "php_swoole_process.h"
#include "swoole_msg_queue.h"

Expand Down Expand Up @@ -1156,9 +1157,6 @@ void ServerObject::on_before_start() {

if (find_http_port) {
serv->onReceive = php_swoole_http_server_onReceive;
if (serv->is_support_unsafe_events()) {
serv->onClose = php_swoole_http_server_onClose;
}
php_swoole_http_server_init_global_variant();
}
}
Expand Down Expand Up @@ -1843,25 +1841,37 @@ void php_swoole_server_onConnect(Server *serv, DataHead *info) {
void php_swoole_server_onClose(Server *serv, DataHead *info) {
zval *zserv = (zval *) serv->private_data_2;
ServerObject *server_object = server_fetch_object(Z_OBJ_P(zserv));
SessionId session_id = info->fd;

if (serv->enable_coroutine && serv->send_yield) {
auto _i_coros_list = server_object->property->send_coroutine_map.find(info->fd);
auto _i_coros_list = server_object->property->send_coroutine_map.find(session_id);
if (_i_coros_list != server_object->property->send_coroutine_map.end()) {
auto coros_list = _i_coros_list->second;
server_object->property->send_coroutine_map.erase(info->fd);
server_object->property->send_coroutine_map.erase(session_id);
while (!coros_list->empty()) {
FutureTask *context = coros_list->front();
coros_list->pop_front();
swoole_set_last_error(ECONNRESET);
zval_ptr_dtor(&context->coro_params);
ZVAL_NULL(&context->coro_params);
php_swoole_server_send_resume(serv, context, info->fd);
php_swoole_server_send_resume(serv, context, session_id);
}
delete coros_list;
}
}

auto fci_cache = php_swoole_server_get_fci_cache(serv, info->server_fd, SW_SERVER_CB_onClose);
auto *fci_cache = php_swoole_server_get_fci_cache(serv, info->server_fd, SW_SERVER_CB_onClose);
Connection *conn = serv->get_connection_by_session_id(session_id);
if (!conn) {
return;
}
if (conn->websocket_status != WEBSOCKET_STATUS_ACTIVE) {
ListenPort *port = serv->get_port_by_server_fd(info->server_fd);
if (port && port->open_websocket_protocol
&& php_swoole_server_isset_callback(serv, port, SW_SERVER_CB_onDisconnect)) {
fci_cache = php_swoole_server_get_fci_cache(serv, info->server_fd, SW_SERVER_CB_onDisconnect);
}
}
if (fci_cache) {
zval *zserv = (zval *) serv->private_data_2;
zval args[3];
Expand All @@ -1872,14 +1882,14 @@ void php_swoole_server_onClose(Server *serv, DataHead *info) {
zval *object = &args[1];
object_init_ex(object, swoole_server_event_ce);
zend_update_property_long(
swoole_server_event_ce, SW_Z8_OBJ_P(object), ZEND_STRL("fd"), (zend_long) info->fd);
swoole_server_event_ce, SW_Z8_OBJ_P(object), ZEND_STRL("fd"), (zend_long) session_id);
zend_update_property_long(
swoole_server_event_ce, SW_Z8_OBJ_P(object), ZEND_STRL("reactor_id"), (zend_long) info->reactor_id);
zend_update_property_double(
swoole_server_event_ce, SW_Z8_OBJ_P(object), ZEND_STRL("dispatch_time"), info->time);
argc = 2;
} else {
ZVAL_LONG(&args[1], info->fd);
ZVAL_LONG(&args[1], session_id);
ZVAL_LONG(&args[2], info->reactor_id);
argc = 3;
}
Expand All @@ -1892,6 +1902,12 @@ void php_swoole_server_onClose(Server *serv, DataHead *info) {
zval_ptr_dtor(&args[1]);
}
}

#ifdef SW_USE_HTTP2
if (conn->http2_stream) {
swoole_http2_server_session_free(conn);
}
#endif
}

void php_swoole_server_onBufferFull(Server *serv, DataHead *info) {
Expand Down
58 changes: 22 additions & 36 deletions ext-src/swoole_server_port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ static std::unordered_map<std::string, server_port_event> server_port_event_map(
{ "handshake", server_port_event(SW_SERVER_CB_onHandShake, "Handshake") },
{ "open", server_port_event(SW_SERVER_CB_onOpen, "Open") },
{ "message", server_port_event(SW_SERVER_CB_onMessage, "Message") },
{ "disconnect", server_port_event(SW_SERVER_CB_onDisconnect, "Disconnect") },
});
// clang-format on

Expand Down Expand Up @@ -184,6 +185,7 @@ void php_swoole_server_port_minit(int module_number) {
zend_declare_property_null(swoole_server_port_ce, ZEND_STRL("onHandShake"), ZEND_ACC_PRIVATE);
zend_declare_property_null(swoole_server_port_ce, ZEND_STRL("onOpen"), ZEND_ACC_PRIVATE);
zend_declare_property_null(swoole_server_port_ce, ZEND_STRL("onMessage"), ZEND_ACC_PRIVATE);
zend_declare_property_null(swoole_server_port_ce, ZEND_STRL("onDisconnect"), ZEND_ACC_PRIVATE);

zend_declare_property_null(swoole_server_port_ce, ZEND_STRL("host"), ZEND_ACC_PUBLIC);
zend_declare_property_long(swoole_server_port_ce, ZEND_STRL("port"), 0, ZEND_ACC_PUBLIC);
Expand Down Expand Up @@ -618,7 +620,7 @@ static PHP_METHOD(swoole_server_port, set) {

static PHP_METHOD(swoole_server_port, on) {
char *name = nullptr;
size_t len, i;
size_t len;
zval *cb;

ServerPortProperty *property = php_swoole_server_port_get_and_check_property(ZEND_THIS);
Expand All @@ -640,55 +642,39 @@ static PHP_METHOD(swoole_server_port, on) {
}
efree(func_name);

const char *callback_name[PHP_SWOOLE_SERVER_PORT_CALLBACK_NUM] = {
"Connect",
"Receive",
"Close",
"Packet",
"Request",
"HandShake",
"Open",
"Message",
"BufferFull",
"BufferEmpty",
};

char property_name[128];
int l_property_name = 0;
memcpy(property_name, "on", 2);

for (i = 0; i < PHP_SWOOLE_SERVER_PORT_CALLBACK_NUM; i++) {
if (!swoole_strcaseeq(name, len, callback_name[i], strlen(callback_name[i]))) {
bool found = false;
for (auto i = server_port_event_map.begin(); i!= server_port_event_map.end(); i++) {
if (!swoole_strcaseeq(name, len, i->first.c_str(), i->first.length())) {
continue;
}

memcpy(property_name + 2, callback_name[i], len);
l_property_name = len + 2;
property_name[l_property_name] = '\0';
zend_update_property(swoole_server_port_ce, SW_Z8_OBJ_P(ZEND_THIS), property_name, l_property_name, cb);
property->callbacks[i] =
sw_zend_read_property(swoole_server_port_ce, ZEND_THIS, property_name, l_property_name, 0);
sw_copy_to_stack(property->callbacks[i], property->_callbacks[i]);
if (property->caches[i]) {
efree(property->caches[i]);
found = true;
int index = i->second.type;
std::string property_name = std::string("on") + i->second.name;
zend_update_property(swoole_server_port_ce, SW_Z8_OBJ_P(ZEND_THIS), property_name.c_str(), property_name.length(), cb);
property->callbacks[index] =
sw_zend_read_property(swoole_server_port_ce, ZEND_THIS, property_name.c_str(), property_name.length(), 0);
sw_copy_to_stack(property->callbacks[index], property->_callbacks[index]);
if (property->caches[index]) {
efree(property->caches[index]);
}
property->caches[i] = fci_cache;
property->caches[index] = fci_cache;

if (i == SW_SERVER_CB_onConnect && !serv->onConnect) {
if (index == SW_SERVER_CB_onConnect && !serv->onConnect) {
serv->onConnect = php_swoole_server_onConnect;
} else if (i == SW_SERVER_CB_onPacket && !serv->onPacket) {
} else if (index == SW_SERVER_CB_onPacket && !serv->onPacket) {
serv->onPacket = php_swoole_server_onPacket;
} else if (i == SW_SERVER_CB_onClose && !serv->onClose) {
} else if (index == SW_SERVER_CB_onClose && !serv->onClose) {
serv->onClose = php_swoole_server_onClose;
} else if (i == SW_SERVER_CB_onBufferFull && !serv->onBufferFull) {
} else if (index == SW_SERVER_CB_onBufferFull && !serv->onBufferFull) {
serv->onBufferFull = php_swoole_server_onBufferFull;
} else if (i == SW_SERVER_CB_onBufferEmpty && !serv->onBufferEmpty) {
} else if (index == SW_SERVER_CB_onBufferEmpty && !serv->onBufferEmpty) {
serv->onBufferEmpty = php_swoole_server_onBufferEmpty;
}
break;
}

if (l_property_name == 0) {
if (!found) {
php_swoole_error(E_WARNING, "unknown event types[%s]", name);
efree(fci_cache);
RETURN_FALSE;
Expand Down
4 changes: 4 additions & 0 deletions tests/include/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ function clear_php()
`ps -A | grep php | grep -v phpstorm | grep -v 'run-tests' | awk '{print $1}' | xargs kill -9 > /dev/null 2>&1`;
}

function puts($msg) {
echo $msg."\n";
}

function top(int $pid)
{
static $available;
Expand Down
18 changes: 7 additions & 11 deletions tests/swoole_server_port/http.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ $pm->parentFunc = function ($pid) use ($pm)
if (!$cli->connect('127.0.0.1', $pm->getFreePort(0), 0.5))
{
fail:
echo "ERROR\n";
echo "ERROR 1\n";
return;
}
//no eof, should be timeout here
Expand All @@ -36,14 +36,13 @@ $pm->parentFunc = function ($pid) use ($pm)
echo "OK\n";
});

go(function () use ($pm)
{
go(function () use ($pm) {
$cli = new Swoole\Coroutine\Http\Client('127.0.0.1', $pm->getFreePort(1));
if ( $cli->get("/") ) {
if ($cli->get("/")) {
echo $cli->body;
Assert::same($cli->statusCode, 200);
} else {
echo "ERROR\n";
echo "ERROR 2\n";
}
});

Expand All @@ -68,21 +67,18 @@ $pm->childFunc = function () use ($pm)

$port2 = $server->listen('127.0.0.1', $pm->getFreePort(1), SWOOLE_SOCK_TCP);
$port2->set(['open_http_protocol' => true,]);
$port2->on("request", function ($req, $resp)
{
$port2->on("request", function ($req, $resp) {
$resp->end("hello swooler\n");
});

$server->on("WorkerStart", function (\swoole_server $serv)
{
$server->on("WorkerStart", function (\swoole_server $serv) {
/**
* @var $pm ProcessManager
*/
global $pm;
$pm->wakeup();
});
$server->on('request', function (swoole_http_request $request, swoole_http_response $response)
{
$server->on('request', function (swoole_http_request $request, swoole_http_response $response) {
$response->end("OK\n");
});
$server->start();
Expand Down
66 changes: 66 additions & 0 deletions tests/swoole_websocket_server/onDisconnct.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
--TEST--
swoole_websocket_server: onDisconnect
--SKIPIF--
<?php require __DIR__ . '/../include/skipif.inc'; ?>
--FILE--
<?php
require __DIR__ . '/../include/bootstrap.php';

use function Swoole\Coroutine\run;
use Swoole\Coroutine\Http\Client;

$pm = new ProcessManager;

$pm->parentFunc = function (int $pid) use ($pm) {
run(function () use ($pm) {
$data = httpGetBody('http://127.0.0.1:' . $pm->getFreePort() . '/');
Assert::contains($data, 'HTTP 400 Bad Request');

$client = new Client('127.0.0.1', $pm->getFreePort());
Assert::assert($client->upgrade('/websocket'));
Assert::eq($client->getStatusCode(), 101);
$client->push('hello world');
$client->close();
});
puts('done!');
$pm->kill();
};
$pm->childFunc = function () use ($pm) {
$serv = new swoole_websocket_server('127.0.0.1', $pm->getFreePort(), SERVER_MODE_RANDOM);
$serv->set([
'worker_num' => 1,
'log_file' => '/dev/null'
]);
$serv->on('WorkerStart', function () use ($pm) {
$pm->wakeup();
});
$serv->on('Message', function (swoole_websocket_server $serv, swoole_websocket_frame $frame) {
if ($frame->data == 'shutdown') {
$serv->disconnect($frame->fd, 4000, 'shutdown received');
}
});
$serv->on('connect', function ($s, $id) use ($pm) {
puts("connect ".$id);
});
$serv->on('disconnect', function ($s, $id) use ($pm) {
puts("disconnect ".$id);
});
$serv->on('open', function ($s, $req) use ($pm) {
puts("open ".$req->fd);
});
$serv->on('close', function ($s, $id) use ($pm) {
puts("close ".$id);
});
$serv->start();
};
$pm->childFirst();
$pm->run();
?>
--EXPECT--
connect 1
disconnect 1
connect 2
open 2
close 2
done!

0 comments on commit 7ca38ff

Please sign in to comment.