Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[6.0]Server for thread mode #5281

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ext-src/swoole_http_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ namespace WebSocket = swoole::websocket;
zend_class_entry *swoole_http_server_ce;
zend_object_handlers swoole_http_server_handlers;

static std::queue<HttpContext *> queued_http_contexts;
static std::unordered_map<SessionId, zend::Variable> client_ips;
static SW_THREAD_LOCAL std::queue<HttpContext *> queued_http_contexts;
static SW_THREAD_LOCAL std::unordered_map<SessionId, zend::Variable> client_ips;

static bool http_context_send_data(HttpContext *ctx, const char *data, size_t length);
static bool http_context_sendfile(HttpContext *ctx, const char *file, uint32_t l_file, off_t offset, size_t length);
Expand Down
9 changes: 8 additions & 1 deletion ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1868,11 +1868,14 @@ static PHP_METHOD(swoole_server, __construct) {
Z_PARAM_LONG(serv_mode)
Z_PARAM_LONG(sock_type)
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

#ifndef SW_THREAD
if (serv_mode != Server::MODE_BASE && serv_mode != Server::MODE_PROCESS) {
zend_throw_error(NULL, "invalid $mode parameters %d", (int) serv_mode);
RETURN_FALSE;
}
#else
serv_mode = Server::MODE_THREAD;
#endif

serv = new Server((enum Server::Mode) serv_mode);
serv->private_data_2 = sw_zval_dup(zserv);
Expand Down Expand Up @@ -2577,6 +2580,10 @@ static PHP_METHOD(swoole_server, start) {
server_object->register_callback();
server_object->on_before_start();

#ifdef SW_THREAD
serv->worker_num = 1;
#endif

if (serv->start() < 0) {
php_swoole_fatal_error(E_ERROR, "failed to start server. Error: %s", sw_error);
}
Expand Down
8 changes: 8 additions & 0 deletions include/swoole.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@ typedef unsigned long ulong_t;
#endif
#define SW_START_SLEEP usleep(100000) // sleep 1s,wait fork and pthread_create

#ifdef SW_THREAD
#define SW_THREAD_LOCAL thread_local
#include "swoole_lock.h"
extern swoole::Mutex thread_lock;
#else
#define SW_THREAD_LOCAL
#endif

/*-----------------------------------Memory------------------------------------*/
void *sw_malloc(size_t size);
void sw_free(void *ptr);
Expand Down
6 changes: 0 additions & 6 deletions include/swoole_coroutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,6 @@ typedef std::chrono::microseconds seconds_type;
#define CALC_EXECUTE_USEC(yield_coroutine, resume_coroutine)
#endif

#ifdef SW_THREAD
#define SW_THREAD_LOCAL thread_local
#else
#define SW_THREAD_LOCAL
#endif

namespace swoole {
class Coroutine {
public:
Expand Down
2 changes: 1 addition & 1 deletion include/swoole_process_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,5 +325,5 @@ static sw_inline int swoole_kill(pid_t __pid, int __sig) {
return kill(__pid, __sig);
}

extern swoole::WorkerGlobal SwooleWG; // Worker Global Variable
extern SW_THREAD_LOCAL swoole::WorkerGlobal SwooleWG; // Worker Global Variable
typedef swoole::ProtocolType swProtocolType;
18 changes: 17 additions & 1 deletion include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
#include "swoole_pipe.h"
#include "swoole_channel.h"
#include "swoole_message_bus.h"
#ifdef SW_THREAD
#include "swoole_lock.h"
#endif

#ifdef SW_USE_OPENSSL
#include "swoole_dtls.h"
Expand Down Expand Up @@ -488,6 +491,9 @@ class Server {
enum Mode {
MODE_BASE = 1,
MODE_PROCESS = 2,
#ifdef SW_THREAD
MODE_THREAD = 3,
#endif
};

enum TaskIpcMode {
Expand Down Expand Up @@ -1000,9 +1006,19 @@ class Server {
}

bool is_base_mode() {
#ifndef SW_THREAD
return mode_ == MODE_BASE;
#else
return mode_ == MODE_BASE || mode_ == MODE_THREAD;
#endif
}

#ifdef SW_THREAD
bool is_thread_mode() {
return mode_ == MODE_THREAD;
}
#endif

bool is_enable_coroutine() {
if (is_task_worker()) {
return task_enable_coroutine;
Expand Down Expand Up @@ -1451,7 +1467,7 @@ typedef swoole::Server swServer;
typedef swoole::ListenPort swListenPort;
typedef swoole::RecvData swRecvData;

extern swoole::Server *g_server_instance;
extern SW_THREAD_LOCAL swoole::Server *g_server_instance;

static inline swoole::Server *sw_server() {
return g_server_instance;
Expand Down
19 changes: 18 additions & 1 deletion src/core/base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ swoole::Global SwooleG = {};
__thread swoole::ThreadGlobal SwooleTG = {};

static std::unordered_map<std::string, void *> functions;
static swoole::Logger *g_logger_instance = nullptr;
static SW_THREAD_LOCAL swoole::Logger *g_logger_instance = nullptr;

#ifdef __MACH__
static __thread char _sw_error_buf[SW_ERROR_MSG_SIZE];
Expand Down Expand Up @@ -152,7 +152,20 @@ static void bug_report_message_init() {
}

void swoole_init(void) {
#ifdef SW_THREAD
thread_lock.lock();
#endif
if (SwooleG.init) {
#ifdef SW_THREAD
thread_lock.unlock();
SwooleTG.buffer_stack = new swoole::String(SW_STACK_BUFFER_SIZE);
g_logger_instance = new swoole::Logger;
#ifdef SW_DEBUG
sw_logger()->set_level(0);
#else
sw_logger()->set_level(SW_LOG_INFO);
#endif
#endif
return;
}

Expand Down Expand Up @@ -211,6 +224,10 @@ void swoole_init(void) {

// init bug report message
bug_report_message_init();

#ifdef SW_THREAD
thread_lock.unlock();
#endif
}

SW_EXTERN_C_BEGIN
Expand Down
85 changes: 60 additions & 25 deletions src/server/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

#include "swoole_server.h"
#include "swoole_memory.h"
#ifndef SW_THREAD
#include "swoole_lock.h"
#endif
#include "swoole_util.h"

#include <assert.h>
Expand All @@ -25,7 +27,11 @@
using swoole::network::SendfileTask;
using swoole::network::Socket;

swoole::Server *g_server_instance = nullptr;
SW_THREAD_LOCAL swoole::Server *g_server_instance = nullptr;
#ifdef SW_THREAD
swoole::Mutex thread_lock(0);
static std::vector<swListenPort *> listen_ports;
#endif

namespace swoole {

Expand Down Expand Up @@ -696,10 +702,10 @@
file_put_contents(pid_file, sw_tg_buffer()->str, n);
}
int ret;
if (is_base_mode()) {
ret = start_reactor_processes();
} else {
if (is_process_mode()) {
ret = start_reactor_threads();
} else {
ret = start_reactor_processes();
}
// failed to start
if (ret < 0) {
Expand Down Expand Up @@ -851,12 +857,12 @@
}

int retval;
if (is_base_mode()) {
factory = new BaseFactory(this);
retval = create_reactor_processes();
} else {
if (is_process_mode()) {
factory = new ProcessFactory(this);
retval = create_reactor_threads();
} else {
factory = new BaseFactory(this);
retval = create_reactor_processes();
}

#ifdef HAVE_PTHREAD_BARRIER
Expand Down Expand Up @@ -978,7 +984,7 @@
join_reactor_thread();
}

release_pipe_buffers();
release_pipe_buffers();

for (auto port : ports) {
port->close();
Expand Down Expand Up @@ -1515,7 +1521,7 @@
"sendfile name[%.8s...] length %u is exceed the max name len %u",
file,
l_file,
(uint32_t)(SW_IPC_BUFFER_SIZE - sizeof(SendfileTask) - 1));
(uint32_t) (SW_IPC_BUFFER_SIZE - sizeof(SendfileTask) - 1));
return false;
}
// string must be zero termination (for `state` system call)
Expand Down Expand Up @@ -1769,7 +1775,7 @@

#ifdef SW_USE_OPENSSL
if (type & SW_SOCK_SSL) {
type = (SocketType)(type & (~SW_SOCK_SSL));
type = (SocketType) (type & (~SW_SOCK_SSL));
ls->type = type;
ls->ssl = 1;
ls->ssl_context = new SSLContext();
Expand All @@ -1793,24 +1799,53 @@
}
#endif

ls->socket = make_socket(
ls->type, ls->is_dgram() ? SW_FD_DGRAM_SERVER : SW_FD_STREAM_SERVER, SW_SOCK_CLOEXEC | SW_SOCK_NONBLOCK);
if (ls->socket == nullptr) {
swoole_set_last_error(errno);
return nullptr;
#ifdef SW_THREAD
thread_lock.lock();
if (listen_ports.size() > 0) {
for (auto listen_port : listen_ports) {
if (listen_port->type == type && listen_port->port == port && listen_port->host == host) {
ls->socket = listen_port->socket;
thread_lock.unlock();
break;
}
}
}

if (ls->socket == nullptr) {
#endif
ls->socket = make_socket(
ls->type, ls->is_dgram() ? SW_FD_DGRAM_SERVER : SW_FD_STREAM_SERVER, SW_SOCK_CLOEXEC | SW_SOCK_NONBLOCK);
if (ls->socket == nullptr) {
swoole_set_last_error(errno);

Check warning on line 1819 in src/server/master.cc

View check run for this annotation

Codecov / codecov/patch

src/server/master.cc#L1819

Added line #L1819 was not covered by tests
#ifdef SW_THREAD
thread_lock.unlock();
#endif
return nullptr;

Check warning on line 1823 in src/server/master.cc

View check run for this annotation

Codecov / codecov/patch

src/server/master.cc#L1823

Added line #L1823 was not covered by tests
}

#if defined(SW_SUPPORT_DTLS) && defined(HAVE_KQUEUE)
if (ls->is_dtls()) {
ls->socket->set_reuse_port();
}
if (ls->is_dtls()) {
ls->socket->set_reuse_port();
}
#endif

if (ls->socket->bind(ls->host, &ls->port) < 0) {
swoole_set_last_error(errno);
ls->socket->free();
return nullptr;
if (ls->socket->bind(ls->host, &ls->port) < 0) {
swoole_set_last_error(errno);
ls->socket->free();

Check warning on line 1834 in src/server/master.cc

View check run for this annotation

Codecov / codecov/patch

src/server/master.cc#L1833-L1834

Added lines #L1833 - L1834 were not covered by tests
#ifdef SW_THREAD
thread_lock.unlock();
#endif
return nullptr;

Check warning on line 1838 in src/server/master.cc

View check run for this annotation

Codecov / codecov/patch

src/server/master.cc#L1838

Added line #L1838 was not covered by tests
}

ls->socket->info.assign(ls->type, ls->host, ls->port);

#ifdef SW_THREAD
listen_ports.push_back(ls);
thread_lock.unlock();
}
ls->socket->info.assign(ls->type, ls->host, ls->port);
#endif

check_port_type(ls);
ptr.release();
ports.push_back(ls);
Expand Down Expand Up @@ -2016,7 +2051,7 @@
}

void Server::release_pipe_buffers() {
message_bus.free_buffer();
message_bus.free_buffer();
}

int Server::get_idle_worker_num() {
Expand Down
56 changes: 38 additions & 18 deletions src/server/reactor_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@
#endif

static bool Server_is_single(Server *serv) {
return serv->worker_num == 1 && serv->task_worker_num == 0 && serv->max_request == 0 &&
serv->user_worker_list.empty();
return
#ifdef SW_THREAD
serv->is_thread_mode() ||
#endif
(serv->worker_num == 1 && serv->task_worker_num == 0 && serv->max_request == 0 &&
serv->user_worker_list.empty());
}

int Server::create_reactor_processes() {
Expand All @@ -48,32 +52,48 @@
sw_free(connection_list);
}

#ifdef SW_THREAD
static bool one_off_task = false;
#endif

int Server::start_reactor_processes() {
single_thread = 1;

// listen TCP
if (have_stream_sock == 1) {
for (auto ls : ports) {
if (ls->is_dgram()) {
continue;
}
#ifdef HAVE_REUSEPORT
if (enable_reuse_port) {
if (::close(ls->socket->fd) < 0) {
swoole_sys_warning("close(%d) failed", ls->socket->fd);
#ifdef SW_THREAD
thread_lock.lock();
if (!one_off_task) {
#endif
for (auto ls : ports) {
if (ls->is_dgram()) {
continue;

Check warning on line 70 in src/server/reactor_process.cc

View check run for this annotation

Codecov / codecov/patch

src/server/reactor_process.cc#L70

Added line #L70 was not covered by tests
}
delete ls->socket;
ls->socket = nullptr;
continue;
} else
#ifdef HAVE_REUSEPORT
if (enable_reuse_port) {
if (::close(ls->socket->fd) < 0) {
swoole_sys_warning("close(%d) failed", ls->socket->fd);

Check warning on line 75 in src/server/reactor_process.cc

View check run for this annotation

Codecov / codecov/patch

src/server/reactor_process.cc#L75

Added line #L75 was not covered by tests
}
delete ls->socket;
ls->socket = nullptr;
continue;
} else
#endif
{
// listen server socket
if (ls->listen() < 0) {
return SW_ERR;
{
// listen server socket
if (ls->listen() < 0) {
#ifdef SW_THREAD
thread_lock.unlock();
#endif
return SW_ERR;

Check warning on line 88 in src/server/reactor_process.cc

View check run for this annotation

Codecov / codecov/patch

src/server/reactor_process.cc#L88

Added line #L88 was not covered by tests
}
}
}
#ifdef SW_THREAD
one_off_task = true;
}
thread_lock.unlock();
#endif
}

ProcessPool *pool = &gs->event_workers;
Expand Down
Loading
Loading