Skip to content

Commit

Permalink
Refactor Server::close() with reset (#4408)
Browse files Browse the repository at this point in the history
* Refactor Server::close() with reset

* remove debug code
  • Loading branch information
matyhtf authored Sep 17, 2021
1 parent 86708d6 commit 6d2b2e3
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 59 deletions.
67 changes: 33 additions & 34 deletions src/server/base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,45 +122,44 @@ bool BaseFactory::end(SessionId session_id, int flags) {
if (flags & Server::CLOSE_ACTIVELY) {
conn->close_actively = 1;
}
if (conn->close_force) {
goto _do_close;
} else if (conn->closing) {
swoole_warning("session#%ld is closing", session_id);

if (conn->closing) {
swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SESSION_CLOSING, "session#%ld is closing", session_id);
return false;
} else if (conn->closed) {
} else if (!(conn->close_force || conn->close_reset) && conn->closed) {
swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SESSION_CLOSED, "session#%ld is closed", session_id);
return false;
} else {
_do_close:
conn->closing = 1;
if (server_->onClose != nullptr) {
DataHead info{};
info.fd = session_id;
if (conn->close_actively) {
info.reactor_id = -1;
} else {
info.reactor_id = conn->reactor_id;
}
info.server_fd = conn->server_fd;
server_->onClose(server_, &info);
}
conn->closing = 0;
conn->closed = 1;
conn->close_errno = 0;

if (conn->socket == nullptr) {
swoole_warning("session#%ld->socket is nullptr", session_id);
return false;
}
}

if (Buffer::empty(conn->socket->out_buffer) || conn->peer_closed || conn->close_force) {
Reactor *reactor = SwooleTG.reactor;
return Server::close_connection(reactor, conn->socket) == SW_OK;
conn->closing = 1;
if (server_->onClose != nullptr && !conn->closed) {
DataHead info{};
info.fd = session_id;
if (conn->close_actively) {
info.reactor_id = -1;
} else {
BufferChunk *chunk = conn->socket->out_buffer->alloc(BufferChunk::TYPE_CLOSE, 0);
chunk->value.data.val1 = _send.info.type;
conn->close_queued = 1;
return true;
info.reactor_id = conn->reactor_id;
}
info.server_fd = conn->server_fd;
server_->onClose(server_, &info);
}
conn->closing = 0;
conn->closed = 1;
conn->close_errno = 0;

if (conn->socket == nullptr) {
swoole_warning("session#%ld->socket is nullptr", session_id);
return false;
}

if (Buffer::empty(conn->socket->out_buffer) || (conn->close_reset || conn->peer_closed || conn->close_force)) {
Reactor *reactor = SwooleTG.reactor;
return Server::close_connection(reactor, conn->socket) == SW_OK;
} else {
BufferChunk *chunk = conn->socket->out_buffer->alloc(BufferChunk::TYPE_CLOSE, 0);
chunk->value.data.val1 = _send.info.type;
conn->close_queued = 1;
return true;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1515,7 +1515,7 @@ void Server::call_hook(HookType type, void *arg) {
* [Worker]
*/
bool Server::close(SessionId session_id, bool reset) {
return factory->end(session_id, reset ? CLOSE_ACTIVELY | CLOSE_RESET : CLOSE_ACTIVELY);
return factory->end(session_id, reset ? (CLOSE_ACTIVELY | CLOSE_RESET) : CLOSE_ACTIVELY);
}

void Server::init_signal_handler() {
Expand Down
43 changes: 19 additions & 24 deletions src/server/process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,12 @@ bool ProcessFactory::end(SessionId session_id, int flags) {

Connection *conn = server_->get_connection_verify_no_ssl(session_id);
if (!conn) {
swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SESSION_NOT_EXIST, "session[%ld] is closed", session_id);
swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SESSION_NOT_EXIST, "session#%ld is closed", session_id);
return false;
}
// Reset send buffer, Immediately close the connection.
if (flags & Server::CLOSE_RESET) {
swoole_warning("close session=%ld, force", session_id);
conn->close_reset = 1;
}
// Server is initiative to close the connection
Expand Down Expand Up @@ -301,34 +302,28 @@ bool ProcessFactory::end(SessionId session_id, int flags) {
}

_close:
if (conn == nullptr || conn->active == 0) {
swoole_set_last_error(SW_ERROR_SESSION_NOT_EXIST);
return false;
} else if (conn->close_force) {
goto _do_close;
} else if (conn->closing) {
if (conn->closing) {
swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SESSION_CLOSING, "session#%ld is closing", session_id);
return false;
} else if (conn->closed) {
} else if (!(conn->close_force || conn->close_reset) && conn->closed) {
swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SESSION_CLOSED, "session#%ld is closed", session_id);
return false;
} else {
_do_close:
conn->closing = 1;
if (server_->onClose != nullptr) {
info.fd = session_id;
if (conn->close_actively) {
info.reactor_id = -1;
} else {
info.reactor_id = conn->reactor_id;
}
info.server_fd = conn->server_fd;
server_->onClose(server_, &info);
}

if (server_->onClose != nullptr && !conn->closed) {
info.fd = session_id;
if (conn->close_actively) {
info.reactor_id = -1;
} else {
info.reactor_id = conn->reactor_id;
}
info.server_fd = conn->server_fd;
conn->closing = 1;
server_->onClose(server_, &info);
conn->closing = 0;
conn->closed = 1;
conn->close_errno = 0;
return finish(&_send);
}
conn->closed = 1;
conn->close_errno = 0;
return finish(&_send);
}

} // namespace swoole
85 changes: 85 additions & 0 deletions tests/swoole_server/close_reset.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
--TEST--
swoole_server: close with reset
--SKIPIF--
<?php
require __DIR__ . '/../include/skipif.inc';
?>
--FILE--
<?php
require __DIR__ . '/../include/bootstrap.php';

use Swoole\Server;
use Swoole\Constant;
use Swoole\Timer;
use Swoole\Coroutine\Client;

$pm = new SwooleTest\ProcessManager;

const N = 4 * 1024 * 1024;

$pm->parentFunc = function ($pid) use ($pm) {
go(function () use ($pm) {
$client = new Client(SWOOLE_SOCK_TCP);
$client->set(['socket_buffer_size' => 128 * 1024]);
if (!$client->connect('127.0.0.1', $pm->getFreePort())) {
exit("connect failed\n");
}
$client->send("close");
Co::sleep(1);
$data = '';

while (true) {
$ret = $client->recv();
if (empty($ret)) {
break;
}
$data .= $ret;
if (substr($ret, -2, 2) == "\r\n") {
break;
}
}
Assert::lessThan(strlen($data), N);
echo "DONE\n";
});
Swoole\Event::wait();
$pm->kill();
};

$pm->childFunc = function () use ($pm) {
$serv = new Server('127.0.0.1', $pm->getFreePort(), SERVER_MODE_RANDOM);
$serv->set([
'worker_num' => 1,
'log_file' => TEST_LOG_FILE,
'kernel_socket_send_buffer_size' => 128 * 1024,
'socket_buffer_size' => 8 * 1024 * 1024,
]);
$serv->on("workerStart", function ($serv) use ($pm) {
$pm->wakeup();
});
$serv->on('receive', function (Server $serv, $fd, $reactor_id, $data) {
$serv->send($fd, str_repeat('A', N) . "\r\n");
Assert::eq($serv->stats()['connection_num'], 1);
phpt_var_dump("close[0]");
Assert::true($serv->close($fd));
usleep(50000);
phpt_var_dump("close[1]");
Assert::false($serv->close($fd));
Assert::eq(swoole_last_error(), SWOOLE_ERROR_SESSION_CLOSED);
Assert::eq($serv->stats()['connection_num'], 1);
Timer::after(100, function () use ($fd, $serv) {
phpt_var_dump("close[2]");
$serv->close($fd, true);
usleep(50000);
Assert::eq($serv->stats()['connection_num'], 0);
});
});
$serv->on(Constant::EVENT_CLOSE, function (Server $serv, $fd, $reactor_id) {
});
$serv->start();
};

$pm->childFirst();
$pm->run();
?>
--EXPECT--
DONE

0 comments on commit 6d2b2e3

Please sign in to comment.