Skip to content

Commit

Permalink
Optimize code
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Sep 13, 2024
1 parent 51ead69 commit 93dbf3b
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 149 deletions.
15 changes: 9 additions & 6 deletions core-tests/src/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,8 +545,8 @@ TEST(server, task_worker) {
exit(2);
}

serv.onTask = [](Server *serv, swEventData *task) -> int {
EXPECT_EQ(serv->get_task_count(), 1);
serv.onTask = [](Server *serv, EventData *task) -> int {
EXPECT_EQ(serv->get_tasking_num(), 1);
EXPECT_EQ(string(task->data, task->info.len), string(packet));
serv->gs->task_workers.running = 0;
return 0;
Expand All @@ -556,13 +556,14 @@ TEST(server, task_worker) {
ASSERT_EQ(serv.create_task_workers(), SW_OK);

thread t1([&serv]() {
SwooleWG.run_always = true;
serv.gs->task_workers.running = 1;
serv.gs->tasking_num++;
serv.gs->task_workers.main_loop(&serv.gs->task_workers, &serv.gs->task_workers.workers[0]);
serv.gs->tasking_num--;
EXPECT_EQ(serv.get_task_count(), 0);
EXPECT_EQ(serv.get_tasking_num(), 0);
serv.gs->tasking_num--;
EXPECT_EQ(serv.get_task_count(), 0);
EXPECT_EQ(serv.get_tasking_num(), 0);
EXPECT_EQ(serv.get_idle_task_worker_num(), serv.task_worker_num);
});

Expand All @@ -581,6 +582,8 @@ TEST(server, task_worker) {

t1.join();
serv.gs->task_workers.destroy();

ASSERT_EQ(serv.gs->)
}

// PHP_METHOD(swoole_server, task)
Expand Down Expand Up @@ -724,7 +727,7 @@ TEST(server, task_worker4) {
serv->gs->task_workers.dispatch(&buf, &_dst_worker_id);
sleep(1);

EventData *task_result = &(serv->task_result[swoole_get_process_id()]);
EventData *task_result = serv->get_task_result();
sw_memset_zero(task_result, sizeof(*task_result));
memset(&buf.info, 0, sizeof(buf.info));
buf.info.len = strlen(packet);
Expand Down Expand Up @@ -779,7 +782,7 @@ TEST(server, task_worker5) {
if (worker->id == 1) {
int _dst_worker_id = 0;

EventData *task_result = &(serv->task_result[worker->id]);
EventData *task_result = &(serv->task_results[worker->id]);
sw_memset_zero(task_result, sizeof(*task_result));

File fp = make_tmpfile();
Expand Down
2 changes: 1 addition & 1 deletion ext-src/php_swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ struct ServerObject {

struct TaskCo {
Coroutine *co;
int *list;
TaskId *list;
uint32_t count;
zval *result;
};
Expand Down
128 changes: 37 additions & 91 deletions ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ static void php_swoole_server_onManagerStart(Server *serv);
static void php_swoole_server_onManagerStop(Server *serv);

static int php_swoole_server_task_finish(Server *serv, zval *zdata, EventData *current_task);
static TaskId php_swoole_server_task_pack(EventData *task, zval *data);
static TaskId php_swoole_server_task_pack(zval *data, EventData *task);
static bool php_swoole_server_task_unpack(zval *zresult, EventData *task_result);
static int php_swoole_server_dispatch_func(Server *serv, Connection *conn, SendData *data);
static zval *php_swoole_server_add_port(ServerObject *server_object, ListenPort *port);
Expand Down Expand Up @@ -655,7 +655,7 @@ int php_swoole_create_dir(const char *path, size_t length) {
return php_stream_mkdir(path, 0777, PHP_STREAM_MKDIR_RECURSIVE | REPORT_ERRORS, nullptr) ? 0 : -1;
}

static TaskId php_swoole_server_task_pack(EventData *task, zval *zdata) {
static TaskId php_swoole_server_task_pack(zval *zdata, EventData *task) {
smart_str serialized_data = {};
php_serialize_data_t var_hash;

Expand Down Expand Up @@ -2973,7 +2973,8 @@ static PHP_METHOD(swoole_server, stats) {

if (serv->task_worker_num > 0) {
add_assoc_long_ex(return_value, ZEND_STRL("task_idle_worker_num"), serv->get_idle_task_worker_num());
add_assoc_long_ex(return_value, ZEND_STRL("tasking_num"), serv->get_task_count());
add_assoc_long_ex(return_value, ZEND_STRL("tasking_num"), serv->get_tasking_num());
add_assoc_long_ex(return_value, ZEND_STRL("task_count"), serv->gs->task_count);
}

add_assoc_long_ex(return_value, ZEND_STRL("coroutine_num"), Coroutine::count());
Expand Down Expand Up @@ -3056,9 +3057,6 @@ static PHP_METHOD(swoole_server, taskwait) {
RETURN_FALSE;
}

EventData buf;
memset(&buf.info, 0, sizeof(buf.info));

zval *zdata;
double timeout = SW_TASKWAIT_TIMEOUT;
zend_long dst_worker_id = -1;
Expand All @@ -3074,11 +3072,11 @@ static PHP_METHOD(swoole_server, taskwait) {
RETURN_FALSE;
}

if (php_swoole_server_task_pack(&buf, zdata) < 0) {
EventData buf;
if (php_swoole_server_task_pack(zdata, &buf) < 0) {
RETURN_FALSE;
}

int _dst_worker_id = (int) dst_worker_id;
TaskId task_id = serv->get_task_id(&buf);

// coroutine
Expand All @@ -3091,9 +3089,7 @@ static PHP_METHOD(swoole_server, taskwait) {
task_co.count = 1;
task_co.result = return_value;

sw_atomic_fetch_add(&serv->gs->tasking_num, 1);
if (serv->gs->task_workers.dispatch(&buf, &_dst_worker_id) < 0) {
sw_atomic_fetch_sub(&serv->gs->tasking_num, 1);
if (!serv->task(&buf, (int *) &dst_worker_id)) {
RETURN_FALSE;
}

Expand All @@ -3104,49 +3100,19 @@ static PHP_METHOD(swoole_server, taskwait) {
if (!retval) {
RETURN_FALSE;
}
return;
}

uint64_t notify;
EventData *task_result = &(serv->task_result[swoole_get_process_id()]);
sw_memset_zero(task_result, sizeof(*task_result));
Pipe *pipe = serv->task_notify_pipes.at(swoole_get_process_id()).get();
network::Socket *task_notify_socket = pipe->get_socket(false);

// clear history task
while (task_notify_socket->wait_event(0, SW_EVENT_READ) == SW_OK) {
if (task_notify_socket->read(&notify, sizeof(notify)) <= 0) {
break;
} else {
auto retval = serv->task_sync(&buf, (int *) &dst_worker_id, timeout);
if (!retval) {
RETURN_FALSE;
}
}

sw_atomic_fetch_add(&serv->gs->tasking_num, 1);

if (serv->gs->task_workers.dispatch_blocking(&buf, &_dst_worker_id) == SW_OK) {
while (1) {
if (task_notify_socket->wait_event((int) (timeout * 1000), SW_EVENT_READ) != SW_OK) {
break;
}
if (pipe->read(&notify, sizeof(notify)) > 0) {
if (serv->get_task_id(task_result) != task_id) {
continue;
}
zval zresult;
if (!php_swoole_server_task_unpack(&zresult, task_result)) {
RETURN_FALSE;
} else {
RETURN_ZVAL(&zresult, 0, 0);
}
break;
} else {
php_swoole_sys_error(E_WARNING, "taskwait failed");
break;
}
zval zresult;
auto task_result = serv->get_task_result();
if (!php_swoole_server_task_unpack(&zresult, task_result)) {
RETURN_FALSE;
} else {
RETURN_ZVAL(&zresult, 0, 0);
}
} else {
sw_atomic_fetch_sub(&serv->gs->tasking_num, 1);
}
RETURN_FALSE;
}

static PHP_METHOD(swoole_server, taskWaitMulti) {
Expand All @@ -3164,9 +3130,6 @@ static PHP_METHOD(swoole_server, taskWaitMulti) {
RETURN_FALSE;
}

EventData buf;
memset(&buf.info, 0, sizeof(buf.info));

zval *ztasks;
double timeout = SW_TASKWAIT_TIMEOUT;

Expand All @@ -3187,10 +3150,10 @@ static PHP_METHOD(swoole_server, taskWaitMulti) {
RETURN_FALSE;
}

int list_of_id[SW_MAX_CONCURRENT_TASK] = {};
TaskId list_of_id[SW_MAX_CONCURRENT_TASK] = {};

uint64_t notify;
EventData *task_result = &(serv->task_result[swoole_get_process_id()]);
EventData *task_result = serv->get_task_result();
sw_memset_zero(task_result, sizeof(*task_result));
Pipe *pipe = serv->task_notify_pipes.at(swoole_get_process_id()).get();
Worker *worker = serv->get_worker(swoole_get_process_id());
Expand Down Expand Up @@ -3219,7 +3182,9 @@ static PHP_METHOD(swoole_server, taskWaitMulti) {

zval *ztask;
SW_HASHTABLE_FOREACH_START(Z_ARRVAL_P(ztasks), ztask)
TaskId task_id = php_swoole_server_task_pack(&buf, ztask);

EventData buf;
TaskId task_id = php_swoole_server_task_pack(ztask, &buf);
if (task_id < 0) {
php_swoole_fatal_error(E_WARNING, "task pack failed");
goto _fail;
Expand Down Expand Up @@ -3280,7 +3245,7 @@ static PHP_METHOD(swoole_server, taskWaitMulti) {
}
(void) add_index_zval(return_value, j, &zresult);
_next:
content->offset += sizeof(DataHead) + result->info.len;
content->offset += result->size();
} while (content->offset < 0 || (size_t) content->offset < content->length);
// delete tmp file
unlink(file_path.c_str());
Expand Down Expand Up @@ -3313,9 +3278,6 @@ static PHP_METHOD(swoole_server, taskCo) {
int i = 0;
uint32_t n_task = php_swoole_array_length(ztasks);

EventData buf;
memset(&buf.info, 0, sizeof(buf.info));

if (n_task >= SW_MAX_CONCURRENT_TASK) {
php_swoole_fatal_error(E_WARNING, "too many concurrent tasks");
RETURN_FALSE;
Expand All @@ -3325,7 +3287,7 @@ static PHP_METHOD(swoole_server, taskCo) {
RETURN_FALSE;
}

int *list = (int *) ecalloc(n_task, sizeof(int));
TaskId *list = (TaskId *) ecalloc(n_task, sizeof(TaskId));
if (list == nullptr) {
RETURN_FALSE;
}
Expand All @@ -3337,20 +3299,19 @@ static PHP_METHOD(swoole_server, taskCo) {

zval *ztask;
SW_HASHTABLE_FOREACH_START(Z_ARRVAL_P(ztasks), ztask) {
task_id = php_swoole_server_task_pack(&buf, ztask);
EventData buf;
task_id = php_swoole_server_task_pack(ztask, &buf);
if (task_id < 0) {
php_swoole_fatal_error(E_WARNING, "failed to pack task");
goto _fail;
}
buf.info.ext_flags |= (SW_TASK_NONBLOCK | SW_TASK_COROUTINE);
dst_worker_id = -1;
sw_atomic_fetch_add(&serv->gs->tasking_num, 1);
if (serv->gs->task_workers.dispatch(&buf, &dst_worker_id) < 0) {
if (!serv->task(&buf, &dst_worker_id)) {
task_id = -1;
_fail:
add_index_bool(return_value, i, 0);
n_task--;
sw_atomic_fetch_sub(&serv->gs->tasking_num, 1);
} else {
server_object->property->task_coroutine_map[task_id] = &task_co;
}
Expand Down Expand Up @@ -3406,14 +3367,11 @@ static PHP_METHOD(swoole_server, task) {
}

EventData buf;
memset(&buf.info, 0, sizeof(buf.info));

if (php_swoole_server_task_pack(&buf, zdata) < 0) {
TaskId task_id = php_swoole_server_task_pack(zdata, &buf);
if (task_id < 0) {
RETURN_FALSE;
}

TaskId task_id = serv->get_task_id(&buf);

if (!serv->is_worker()) {
buf.info.ext_flags |= SW_TASK_NOREPLY;
} else if (fci.size) {
Expand All @@ -3424,15 +3382,11 @@ static PHP_METHOD(swoole_server, task) {

buf.info.ext_flags |= SW_TASK_NONBLOCK;

int _dst_worker_id = (int) dst_worker_id;
sw_atomic_fetch_add(&serv->gs->tasking_num, 1);

if (serv->gs->task_workers.dispatch(&buf, &_dst_worker_id) >= 0) {
if (serv->task(&buf, (int *) &dst_worker_id)) {
RETURN_LONG(task_id);
} else {
RETURN_FALSE;
}

sw_atomic_fetch_sub(&serv->gs->tasking_num, 1);
RETURN_FALSE;
}

static PHP_METHOD(swoole_server, command) {
Expand Down Expand Up @@ -3523,17 +3477,11 @@ static PHP_METHOD(swoole_server, sendMessage) {
}

EventData buf;
memset(&buf.info, 0, sizeof(buf.info));

if (php_swoole_server_task_pack(&buf, zmessage) < 0) {
if (php_swoole_server_task_pack(zmessage, &buf) < 0) {
RETURN_FALSE;
}

buf.info.type = SW_SERVER_EVENT_PIPE_MESSAGE;

Worker *to_worker = serv->get_worker(worker_id);
SW_CHECK_RETURN(serv->send_to_worker_from_worker(
to_worker, &buf, sizeof(buf.info) + buf.info.len, SW_PIPE_MASTER | SW_PIPE_NONBLOCK));
RETURN_BOOL(serv->send_pipe_message(worker_id, &buf));
}

static PHP_METHOD(swoole_server, finish) {
Expand Down Expand Up @@ -3576,21 +3524,19 @@ static PHP_METHOD(swoole_server_task, finish) {
}

static PHP_METHOD(swoole_server_task, pack) {
EventData buf;
memset(&buf.info, 0, sizeof(buf.info));

zval *zdata;

ZEND_PARSE_PARAMETERS_START(1, 1)
Z_PARAM_ZVAL(zdata)
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

if (php_swoole_server_task_pack(&buf, zdata) < 0) {
EventData buf;
if (php_swoole_server_task_pack(zdata, &buf) < 0) {
RETURN_FALSE;
}
buf.info.ext_flags |= (SW_TASK_NONBLOCK | SW_TASK_NOREPLY);

RETURN_STRINGL((char *) &buf, sizeof(buf.info) + buf.info.len);
RETURN_STRINGL((char *) &buf, buf.size());
}

static PHP_METHOD(swoole_server_task, unpack) {
Expand Down
8 changes: 8 additions & 0 deletions include/swoole.h
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,14 @@ struct DataHead {
struct EventData {
DataHead info;
char data[SW_IPC_BUFFER_SIZE];

uint32_t size() {
return sizeof(info) + len();
}

uint32_t len() {
return info.len;
}
};

struct SendData {
Expand Down
1 change: 0 additions & 1 deletion include/swoole_process_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ struct Worker {
uint8_t msgqueue_mode;
uint8_t child_process;

sw_atomic_t tasking_num;
uint32_t concurrency;
time_t start_time;

Expand Down
Loading

0 comments on commit 93dbf3b

Please sign in to comment.