Skip to content

Commit

Permalink
Improve stats (#4410)
Browse files Browse the repository at this point in the history
* Improve stats

* Fix tests

* add tests

* fix tests
  • Loading branch information
matyhtf authored Sep 22, 2021
1 parent 6d2b2e3 commit 808f0bd
Show file tree
Hide file tree
Showing 13 changed files with 140 additions and 23 deletions.
45 changes: 44 additions & 1 deletion core-tests/src/os/async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

#include <atomic>

using swoole::AsyncEvent;
using namespace swoole;

static int callback_count;

Expand All @@ -48,3 +48,46 @@ TEST(async, dispatch) {
ASSERT_EQ(handle_count, 1000);
ASSERT_EQ(callback_count, 1000);
}

TEST(async, schedule) {
callback_count = 0;
std::atomic<int> handle_count(0);

int N = 1000;

swoole_event_init(SW_EVENTLOOP_WAIT_EXIT);

AsyncEvent event{};
event.object = &handle_count;
event.callback = [](AsyncEvent *event) { callback_count++; };
event.handler = [](AsyncEvent *event) {
usleep(swoole_rand(50000, 100000));
(*(std::atomic<int> *) event->object)++;
};

SwooleG.aio_core_worker_num = 4;
SwooleG.aio_worker_num = 128;
SwooleG.aio_max_wait_time = 0.05;
SwooleG.aio_max_idle_time = 0.5;

int count = N;
swoole_timer_tick(2, [&count, &event, N](Timer *, TimerNode *timer) {
SW_LOOP_N(swoole_rand(5, 15)) {
auto ret = swoole::async::dispatch(&event);
EXPECT_EQ(ret->object, event.object);
count--;
if (count == 0) {
swoole_timer_del(timer);
ASSERT_EQ(SwooleTG.async_threads->get_worker_num(), 128);
ASSERT_GT(SwooleTG.async_threads->get_queue_size(), 100);
ASSERT_GT(SwooleTG.async_threads->get_task_num(), 100);
break;
}
}
});

swoole_event_wait();

ASSERT_EQ(handle_count, N);
ASSERT_EQ(callback_count, N);
}
6 changes: 4 additions & 2 deletions ext-src/swoole_coroutine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1011,11 +1011,13 @@ static PHP_METHOD(swoole_coroutine, stats) {
return_value, ZEND_STRL("signal_listener_num"), SwooleTG.signal_listener_num + SwooleTG.co_signal_listener_num);

if (SwooleTG.async_threads) {
add_assoc_long_ex(return_value, ZEND_STRL("aio_task_num"), SwooleTG.async_threads->task_num);
add_assoc_long_ex(return_value, ZEND_STRL("aio_worker_num"), SwooleTG.async_threads->thread_count());
add_assoc_long_ex(return_value, ZEND_STRL("aio_task_num"), SwooleTG.async_threads->get_task_num());
add_assoc_long_ex(return_value, ZEND_STRL("aio_worker_num"), SwooleTG.async_threads->get_worker_num());
add_assoc_long_ex(return_value, ZEND_STRL("aio_queue_size"), SwooleTG.async_threads->get_queue_size());
} else {
add_assoc_long_ex(return_value, ZEND_STRL("aio_task_num"), 0);
add_assoc_long_ex(return_value, ZEND_STRL("aio_worker_num"), 0);
add_assoc_long_ex(return_value, ZEND_STRL("aio_queue_size"), 0);
}
add_assoc_long_ex(return_value, ZEND_STRL("c_stack_size"), Coroutine::get_stack_size());
add_assoc_long_ex(return_value, ZEND_STRL("coroutine_num"), Coroutine::count());
Expand Down
5 changes: 5 additions & 0 deletions ext-src/swoole_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2961,6 +2961,10 @@ static PHP_METHOD(swoole_server, stats) {
add_assoc_long_ex(return_value, ZEND_STRL("tasking_num"), tasking_num);
add_assoc_long_ex(return_value, ZEND_STRL("request_count"), serv->gs->request_count);
add_assoc_long_ex(return_value, ZEND_STRL("dispatch_count"), serv->gs->dispatch_count);
add_assoc_long_ex(return_value, ZEND_STRL("pipe_packet_msg_id"), serv->gs->pipe_packet_msg_id);
add_assoc_long_ex(return_value, ZEND_STRL("session_round"), serv->gs->session_round);
add_assoc_long_ex(return_value, ZEND_STRL("min_fd"), serv->gs->min_fd);
add_assoc_long_ex(return_value, ZEND_STRL("max_fd"), serv->gs->max_fd);

if (SwooleWG.worker) {
add_assoc_long_ex(return_value, ZEND_STRL("worker_request_count"), SwooleWG.worker->request_count);
Expand All @@ -2982,6 +2986,7 @@ static PHP_METHOD(swoole_server, stats) {
}

add_assoc_long_ex(return_value, ZEND_STRL("coroutine_num"), Coroutine::count());
add_assoc_long_ex(return_value, ZEND_STRL("coroutine_peek_num"), Coroutine::get_peak_num());
}

static PHP_METHOD(swoole_server, reload) {
Expand Down
9 changes: 7 additions & 2 deletions include/swoole_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ struct AsyncEvent {
class AsyncThreads {
public:
bool schedule = false;
uint32_t task_num = 0;
size_t task_num = 0;
Pipe *pipe = nullptr;
async::ThreadPool *pool = nullptr;
network::Socket *read_socket = nullptr;
Expand All @@ -78,7 +78,12 @@ class AsyncThreads {
AsyncThreads();
~AsyncThreads();

size_t thread_count();
size_t get_task_num() {
return task_num;
}

size_t get_queue_size();
size_t get_worker_num();
void notify_one();

static int callback(Reactor *reactor, Event *event);
Expand Down
1 change: 1 addition & 0 deletions include/swoole_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class GlobalMemory : public MemoryPool {
void free(void *ptr);
void destroy();
size_t capacity();
size_t get_memory_size();
};
} // namespace swoole

Expand Down
5 changes: 4 additions & 1 deletion include/swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ class MessageBus {
return buffer_size_;
}

size_t get_memory_size();

bool alloc_buffer() {
void *_ptr = allocator_->malloc(sizeof(*buffer_) + buffer_size_);
if (_ptr) {
Expand Down Expand Up @@ -467,6 +469,8 @@ struct ListenPort {
int listen();
void close();
bool import(int sock);
const char *get_protocols();

#ifdef SW_USE_OPENSSL
bool ssl_create_context(SSLContext *context);
bool ssl_create(Connection *conn, network::Socket *sock);
Expand All @@ -476,7 +480,6 @@ struct ListenPort {
void ssl_set_key_file(const std::string &file) {
ssl_context->key_file = file;
}

void ssl_set_cert_file(const std::string &file) {
ssl_context->cert_file = file;
}
Expand Down
4 changes: 4 additions & 0 deletions src/memory/global_memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ size_t GlobalMemory::capacity() {
return impl->pagesize - impl->alloc_offset;
}

size_t GlobalMemory::get_memory_size() {
return impl->pagesize * impl->pages.size();
}

GlobalMemory::~GlobalMemory() {
delete impl;
}
Expand Down
12 changes: 8 additions & 4 deletions src/os/async_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ class ThreadPool {
return _event_copy;
}

inline size_t worker_count() {
inline size_t get_worker_num() {
return threads.size();
}

inline size_t queue_count() {
inline size_t get_queue_size() {
std::unique_lock<std::mutex> lock(event_mutex);
return _queue.count();
}
Expand Down Expand Up @@ -361,8 +361,12 @@ int AsyncThreads::callback(Reactor *reactor, Event *event) {
return SW_OK;
}

size_t AsyncThreads::thread_count() {
return pool ? pool->worker_count() : 0;
size_t AsyncThreads::get_worker_num() {
return pool ? pool->get_worker_num() : 0;
}

size_t AsyncThreads::get_queue_size() {
return pool ? pool->get_queue_size() : 0;
}

void AsyncThreads::notify_one() {
Expand Down
8 changes: 8 additions & 0 deletions src/server/message_bus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,4 +268,12 @@ bool MessageBus::write(Socket *sock, SendData *resp) {
return true;
}

size_t MessageBus::get_memory_size() {
size_t size = buffer_size_;
for (auto p : packet_pool_) {
size += p.second->size;
}
return size;
}

} // namespace swoole
46 changes: 37 additions & 9 deletions src/server/port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -518,10 +518,10 @@ static int Port_onRead_http(Reactor *reactor, ListenPort *port, Event *event) {
if (!request->header_parsed) {
request->parse_header_info();
swoole_trace_log(SW_TRACE_SERVER,
"content-length=%u, keep-alive=%u, chunked=%u",
request->content_length_,
request->keep_alive,
request->chunked);
"content-length=%u, keep-alive=%u, chunked=%u",
request->content_length_,
request->keep_alive,
request->chunked);
}

// content length (equal to 0) or (field not found but not chunked)
Expand Down Expand Up @@ -614,11 +614,12 @@ static int Port_onRead_http(Reactor *reactor, ListenPort *port, Event *event) {
if (request->has_expect_header()) {
_socket->send(SW_STRL(SW_HTTP_100_CONTINUE_PACKET), 0);
} else {
swoole_trace_log(SW_TRACE_SERVER,
"PostWait: request->content_length=%d, buffer->length=%zu, request->header_length=%d\n",
request->content_length,
buffer_->length,
request->header_length);
swoole_trace_log(
SW_TRACE_SERVER,
"PostWait: request->content_length=%d, buffer->length=%zu, request->header_length=%d\n",
request->content_length,
buffer_->length,
request->header_length);
}
#endif
goto _recv_data;
Expand Down Expand Up @@ -723,4 +724,31 @@ void ListenPort::close() {
}
}

const char *ListenPort::get_protocols() {
if (open_eof_check) {
return "eof";
} else if (open_length_check) {
return "length";
} else if (open_http_protocol) {
#ifdef SW_USE_HTTP2
if (open_http2_protocol && open_websocket_protocol) {
return "http|http2|websocket";
} else if (open_http2_protocol) {
return "http|http2";
} else
#endif
if (open_websocket_protocol) {
return "http|websocket";
} else {
return "http";
}
} else if (open_mqtt_protocol) {
return "mqtt";
} else if (open_redis_protocol) {
return "redis";
} else {
return "raw";
}
}

} // namespace swoole
5 changes: 2 additions & 3 deletions src/server/process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ bool ProcessFactory::shutdown() {
int status;

if (swoole_kill(server_->gs->manager_pid, SIGTERM) < 0) {
swoole_sys_warning("swKill(%d) failed", server_->gs->manager_pid);
swoole_sys_warning("kill(%d) failed", server_->gs->manager_pid);
}

if (swoole_waitpid(server_->gs->manager_pid, &status, 0) < 0) {
Expand Down Expand Up @@ -97,7 +97,7 @@ bool ProcessFactory::start() {
* The manager process must be started first, otherwise it will have a thread fork
*/
if (server_->start_manager_process() < 0) {
swoole_warning("FactoryProcess_manager_start failed");
swoole_warning("failed to start");
return false;
}
return true;
Expand Down Expand Up @@ -262,7 +262,6 @@ bool ProcessFactory::end(SessionId session_id, int flags) {
}
// Reset send buffer, Immediately close the connection.
if (flags & Server::CLOSE_RESET) {
swoole_warning("close session=%ld, force", session_id);
conn->close_reset = 1;
}
// Server is initiative to close the connection
Expand Down
12 changes: 11 additions & 1 deletion tests/swoole_server/bug_11000_01.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ $pm->childFirst();
$pm->run();
?>
--EXPECTF--
array(13) {
array(%d) {
["start_time"]=>
int(%d)
["connection_num"]=>
Expand All @@ -47,10 +47,20 @@ array(13) {
int(0)
["dispatch_count"]=>
int(0)
["pipe_packet_msg_id"]=>
int(%d)
["session_round"]=>
int(0)
["min_fd"]=>
int(%d)
["max_fd"]=>
int(%d)
["worker_request_count"]=>
int(0)
["worker_dispatch_count"]=>
int(0)
["coroutine_num"]=>
int(0)
["coroutine_peek_num"]=>
int(0)
}
5 changes: 5 additions & 0 deletions tests/swoole_server/stats_file.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ task_worker_num: %d
tasking_num: %d
request_count: %d
dispatch_count: %d
pipe_packet_msg_id: %d
session_round: %d
min_fd: %d
max_fd: %d
worker_request_count: %d
worker_dispatch_count: %d
coroutine_num: %d
coroutine_peek_num: %d

0 comments on commit 808f0bd

Please sign in to comment.