Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions zan-extension/config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,9 @@ if test "$PHP_ZAN" != "no"; then
src/core/array.c \
src/core/list.c \
src/core/heap.c \
src/core/log.c \
src/core/rbtree.c \
src/core/log.c \
src/core/rbtree.c \
src/core/clock.c \
src/memory/ShareMemory.c \
src/memory/MemoryGlobal.c \
src/memory/RingBuffer.c \
Expand Down
28 changes: 28 additions & 0 deletions zan-extension/include/swClock.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
+----------------------------------------------------------------------+
| Zan |
+----------------------------------------------------------------------+
| Copyright (c) 2016-2017 Zan Group <https://github.com/youzan/zan> |
+----------------------------------------------------------------------+
| This source file is subject to version 2.0 of the Apache license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.apache.org/licenses/LICENSE-2.0.html |
| If you did not receive a copy of the Apache2.0 license and are unable|
| to obtain it through the world-wide-web, please send a note to |
| zan@zanphp.io so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
| Author: Zan Group <zan@zanphp.io> |
+----------------------------------------------------------------------+
*/

#ifndef _SW_CLOCK_H_
#define _SW_CLOCK_H_

#include <sys/time.h>

int swClock_init();
int swClock_get(struct timeval *tv);

#endif

5 changes: 5 additions & 0 deletions zan-extension/include/swGlobalDef.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ struct _swServer
*/
uint32_t max_request;

/**
* request terminate timeout
*/
uint32_t terminate_timeout;

int timeout_sec;
int timeout_usec;

Expand Down
8 changes: 6 additions & 2 deletions zan-extension/include/swStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ typedef struct {
sw_atomic_long_t total_request_count;
sw_atomic_long_t request_count;
sw_atomic_t start_count;
struct timeval accepted;
} swWorkerStats;

typedef struct
Expand All @@ -50,8 +51,11 @@ typedef struct
swLock lock;
} swServerStats;

#define sw_stats_incr(val) sw_atomic_fetch_add(val, 1)
#define sw_stats_decr(val) sw_atomic_fetch_sub(val, 1)
#define sw_stats_atom_incr(val) sw_atomic_fetch_add(val, 1)
#define sw_stats_atom_decr(val) sw_atomic_fetch_sub(val, 1)
#define sw_stats_incr(val) ((*val)++)
#define sw_stats_decr(val) ((*val)--)


void sw_stats_set_worker_status(swWorker *worker, int status);

Expand Down
126 changes: 126 additions & 0 deletions zan-extension/src/core/clock.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
+----------------------------------------------------------------------+
| Zan |
+----------------------------------------------------------------------+
| Copyright (c) 2016-2017 Zan Group <https://github.com/youzan/zan> |
+----------------------------------------------------------------------+
| This source file is subject to version 2.0 of the Apache license, |
| that is bundled with this package in the file LICENSE, and is |
| available through the world-wide-web at the following url: |
| http://www.apache.org/licenses/LICENSE-2.0.html |
| If you did not receive a copy of the Apache2.0 license and are unable|
| to obtain it through the world-wide-web, please send a note to |
| zan@zanphp.io so we can mail you a copy immediately. |
+----------------------------------------------------------------------+
| Author: Zan Group <zan@zanphp.io> |
+----------------------------------------------------------------------+
*/

#if defined(HAVE_CLOCK_GETTIME)
#include <time.h>
#endif

#include "swClock.h"
#include "swLog.h"


#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)

static int monotonic_works;

int swClock_init()
{
struct timespec ts;

monotonic_works = 0;

if (0 == clock_gettime(CLOCK_MONOTONIC, &ts)) {
monotonic_works = 1;
}

return 0;
}

int swClock_get(struct timeval *tv)
{
if (monotonic_works) {
struct timespec ts;

if (0 > clock_gettime(CLOCK_MONOTONIC, &ts)) {
swError("clock_gettime() failed");
return -1;
}

tv->tv_sec = ts.tv_sec;
tv->tv_usec = ts.tv_nsec / 1000;
return 0;
}

return gettimeofday(tv, 0);
}

/* macosx clock */
#elif defined(HAVE_CLOCK_GET_TIME)

#include <mach/mach.h>
#include <mach/clock.h>
#include <mach/mach_error.h>

static clock_serv_t mach_clock;

/* this code borrowed from here: http://lists.apple.com/archives/Darwin-development/2002/Mar/msg00746.html */
/* mach_clock also should be re-initialized in child process after fork */
int swClock_init()
{
kern_return_t ret;
mach_timespec_t aTime;

ret = host_get_clock_service(mach_host_self(), REALTIME_CLOCK, &mach_clock);

if (ret != KERN_SUCCESS) {
swError("host_get_clock_service() failed: %s", mach_error_string(ret));
return -1;
}

/* test if it works */
ret = clock_get_time(mach_clock, &aTime);

if (ret != KERN_SUCCESS) {
swError("clock_get_time() failed: %s", mach_error_string(ret));
return -1;
}

return 0;
}

int swClock_get(struct timeval *tv)
{
kern_return_t ret;
mach_timespec_t aTime;

ret = clock_get_time(mach_clock, &aTime);

if (ret != KERN_SUCCESS) {
swError("clock_get_time() failed: %s", mach_error_string(ret));
return -1;
}

tv->tv_sec = aTime.tv_sec;
tv->tv_usec = aTime.tv_nsec / 1000;

return 0;
}

#else /* no clock */

int swClock_init()
{
return 0;
}

int swClock_get(struct timeval *tv)
{
return gettimeofday(tv, 0);
}

#endif
4 changes: 2 additions & 2 deletions zan-extension/src/factory/ProcessPool.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ int swProcessPool_dispatch(swProcessPool *pool, swEventData *data, int *dst_work
}
else
{
sw_stats_incr(&worker->tasking_num);
sw_stats_atom_incr(&worker->tasking_num);
}

return ret;
Expand Down Expand Up @@ -212,7 +212,7 @@ int swProcessPool_dispatch_blocking(swProcessPool *pool, swEventData *data, int
}
else
{
sw_stats_incr(&worker->tasking_num);
sw_stats_atom_incr(&worker->tasking_num);
}

return ret;
Expand Down
46 changes: 44 additions & 2 deletions zan-extension/src/network/Manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "swSignal.h"
#include "swExecutor.h"
#include "swBaseOperator.h"
#include "swClock.h"

#include <sys/wait.h>

Expand All @@ -37,12 +38,16 @@ typedef struct

} swManagerProcess;

// manager process
static int swManager_loop_async(swFactory *factory);
static int swManager_loop_sync(swFactory *factory);
static void swManager_signal_handle(int sig);
static pid_t swManager_spawn_worker(swFactory *factory, int worker_id);
static void swManager_check_exit_status(swServer *serv, int worker_id, pid_t pid, int status);

static int swManager_init_process_check();
static int swManager_check_request_timeout();

static swManagerProcess ManagerProcess;

//create worker child proccess
Expand Down Expand Up @@ -472,6 +477,7 @@ static int swManager_loop_sync(swFactory *factory)
swSignal_add(SIGRTMIN, swManager_signal_handle);
#endif
//swSignal_add(SIGINT, swManager_signal_handle);
swManager_init_process_check();

SwooleG.main_reactor = NULL;
int pid = -1;
Expand Down Expand Up @@ -531,7 +537,7 @@ static int swManager_loop_sync(swFactory *factory)
}
else
{
sw_stats_incr(status == 0 ? &SwooleStats->worker_normal_exit
sw_stats_atom_incr(status == 0 ? &SwooleStats->worker_normal_exit
: &SwooleStats->worker_abnormal_exit);
swManager_check_exit_status(serv, index, pid, status);
pid = 0;
Expand Down Expand Up @@ -561,7 +567,7 @@ static int swManager_loop_sync(swFactory *factory)
exit_worker = swHashMap_find_int(SwooleGS->task_workers.map, pid);
if (exit_worker != NULL)
{
sw_stats_incr(status == 0 ? &SwooleStats->task_worker_normal_exit
sw_stats_atom_incr(status == 0 ? &SwooleStats->task_worker_normal_exit
: &SwooleStats->task_worker_abnormal_exit);
swManager_check_exit_status(serv, exit_worker->id, pid, status);
if (exit_worker->deleted == 1) //主动回收不重启
Expand Down Expand Up @@ -690,6 +696,38 @@ static pid_t swManager_spawn_worker(swFactory *factory, int worker_id)
}
}

static int swManager_init_process_check()
{
if (SwooleG.serv->terminate_timeout) {
swSignal_add(SIGALRM, swManager_signal_handle);
alarm(1);
}
return SW_OK;
}

static int swManager_check_request_timeout()
{
int i, consumed;
swServer *serv = SwooleG.serv;
struct timeval tv;
struct timeval now;

swClock_get(&now);

for (i = 0; i < serv->worker_num; i++) {
tv = SwooleStats->workers[i].accepted;
if (serv->workers[i].status == SW_WORKER_BUSY) {
consumed = (now.tv_sec - tv.tv_sec) * 1000000 + (now.tv_usec - tv.tv_usec);
if (consumed > (serv->terminate_timeout * 1000000)) {
kill(serv->workers[i].pid, SIGUSR2);
}
}
}

alarm(1);
return SW_OK;
}

static void swManager_signal_handle(int sig)
{
switch (sig)
Expand Down Expand Up @@ -717,6 +755,10 @@ static void swManager_signal_handle(int sig)
ManagerProcess.reload_task_worker = 1;
}
break;
case SIGALRM:
// check request terminate timeout
swManager_check_request_timeout();
break;
default:
#ifdef SIGRTMIN
if (sig == SIGRTMIN)
Expand Down
2 changes: 1 addition & 1 deletion zan-extension/src/network/Port.c
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ static int swPort_onRead_http(swReactor *reactor, swListenPort *port, swEvent *e
goto close_fd;
}

//support method:get post put delete patch head options
//support method:get post put delete patch head options
if ((request->method > 0 && request->method <= HTTP_PATCH) || request->method == HTTP_OPTIONS)
{
//receive data of http header
Expand Down
4 changes: 2 additions & 2 deletions zan-extension/src/network/ReactorAccept.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ static swConnection* swConnection_create(swServer *serv, swListenPort *ls, int f
{
swConnection* connection = NULL;

sw_stats_incr(&SwooleStats->accept_count);
sw_stats_incr(&SwooleStats->connection_num);
sw_stats_atom_incr(&SwooleStats->accept_count);
sw_stats_atom_incr(&SwooleStats->connection_num);

if (fd > swServer_get_maxfd(serv))
{
Expand Down
4 changes: 2 additions & 2 deletions zan-extension/src/network/ReactorThread.c
Original file line number Diff line number Diff line change
Expand Up @@ -877,8 +877,8 @@ int swReactorThread_close(swReactor *reactor, int fd)
assert(fd % serv->reactor_num == SwooleTG.id);
}

sw_stats_incr(&SwooleStats->close_count);
sw_stats_decr(&SwooleStats->connection_num);
sw_stats_atom_incr(&SwooleStats->close_count);
sw_stats_atom_decr(&SwooleStats->connection_num);

swTrace("Close Event.fd=%d|from=%d", fd, reactor->id);

Expand Down
9 changes: 5 additions & 4 deletions zan-extension/src/network/Server.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
#include "swConnection.h"
#include "swBaseOperator.h"
#include "swGlobalVars.h"

#include "swClock.h"

swServerG SwooleG; /// 超全局本地变量,此全局变量子进程中修改,其它进程不感知
swServerGS *SwooleGS = NULL; /// 超全局共享变量,此全局变量是基于共享内存的,修改字段,其它进程可感知
swWorkerG SwooleWG; /// 进程内全局变量,此全局变量在worker进程内初始化
swWorkerG SwooleWG; /// 进程内全局变量,此全局变量在worker进程内初始
swServerStats *SwooleStats = NULL;
__thread swThreadG SwooleTG; /// 线程独立变量

Expand Down Expand Up @@ -356,6 +356,7 @@ void swServer_init(swServer *serv)
bzero(serv, sizeof(swServer));

swoole_init();
swClock_init();
serv->factory_mode = SW_MODE_BASE;

serv->reactor_num = SW_REACTOR_NUM > SW_REACTOR_MAX_THREAD ? SW_REACTOR_MAX_THREAD : SW_REACTOR_NUM;
Expand Down Expand Up @@ -984,9 +985,9 @@ swListenPort* swServer_add_port(swServer *serv, int type, char *host, int port)
swFatalError("this port is listen now");
return NULL;
}

}

swListenPort *ls = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swListenPort));
if (ls == NULL)
{
Expand Down
4 changes: 2 additions & 2 deletions zan-extension/src/network/TaskWorker.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ int swTaskWorker_onTask(swProcessPool *pool, swEventData *task)
ret = serv->onTask(serv, task);
}

sw_stats_incr(&SwooleStats->workers[SwooleWG.id].request_count);
sw_stats_incr(&SwooleStats->workers[SwooleWG.id].total_request_count);
sw_stats_atom_incr(&SwooleStats->workers[SwooleWG.id].request_count);
sw_stats_atom_incr(&SwooleStats->workers[SwooleWG.id].total_request_count);
return ret;
}

Expand Down
Loading