Skip to content

Commit

Permalink
add request metric to rds; handle connection queue fullness (twitter#248
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Yao Yue authored and michalbiesek committed Sep 10, 2019
1 parent a013f53 commit 5f6af0c
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 4 deletions.
9 changes: 8 additions & 1 deletion src/core/data/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,19 @@ _tcp_accept(struct buf_sock *ss)
}

if (!ss->hdl->accept(sc, s->ch)) {
buf_sock_reset(s);
buf_sock_return(&s);
return false;
}

/* push buf_sock to queue */
ring_array_push(&s, conn_new);
if (ring_array_push(&s, conn_new) != CC_OK) { /* close if can't enqueue */
log_error("new connetion queue is full, closing connection");
buf_sock_reset(s);
buf_sock_return(&s);
return false;
}

/* notify worker, note this may fail and will be retried via write event */
_server_pipe_write();

Expand Down
14 changes: 12 additions & 2 deletions src/core/data/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ worker_add_stream(void)
i);
return;
}
INCR(worker_metrics, worker_add_stream);
log_verb("Adding new buf_sock %p to worker thread", s);
s->owner = ctx;
s->hdl = hdl;
Expand Down Expand Up @@ -157,7 +158,16 @@ worker_ret_stream(struct buf_sock *s)
event_del(ctx->evb, hdl->rid(s->ch));

/* push buf_sock to queue */
ring_array_push(&s, conn_term);
INCR(worker_metrics, worker_ret_stream);
if (ring_array_push(&s, conn_term) != CC_OK) {
/* here we have no choice but to clean up the stream to avoid leak */
log_error("term connetion queue is full");
hdl->term(s->ch);
buf_sock_reset(s);
buf_sock_return(&s);

return;
}
/* conn_term */
_worker_pipe_write();
}
Expand Down Expand Up @@ -248,7 +258,7 @@ core_worker_setup(worker_options_st *options, worker_metrics_st *metrics)
hdl->accept = NULL;
hdl->reject = NULL;
hdl->open = NULL;
hdl->term = NULL;
hdl->term = (channel_term_fn)tcp_close;
hdl->recv = (channel_recv_fn)tcp_recv;
hdl->send = (channel_send_fn)tcp_send;
hdl->rid = (channel_id_fn)tcp_read_id;
Expand Down
4 changes: 3 additions & 1 deletion src/core/data/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ typedef struct {
ACTION( worker_event_loop, METRIC_COUNTER, "# worker event loops returned" )\
ACTION( worker_event_read, METRIC_COUNTER, "# worker core_read events" )\
ACTION( worker_event_write, METRIC_COUNTER, "# worker core_write events" )\
ACTION( worker_event_error, METRIC_COUNTER, "# worker core_error events" )
ACTION( worker_event_error, METRIC_COUNTER, "# worker core_error events" )\
ACTION( worker_add_stream, METRIC_COUNTER, "# worker adding a stream" )\
ACTION( worker_ret_stream, METRIC_COUNTER, "# worker returning a stream" )

typedef struct {
CORE_WORKER_METRIC(METRIC_DECLARE)
Expand Down
3 changes: 3 additions & 0 deletions src/server/rds/data/process.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ process_request(struct response *rsp, struct request *req)
struct command cmd;
command_fn func = command_registry[req->type];

log_verb("processing req %p, write rsp to %p", req, rsp);
INCR(process_metrics, process_req);

if (func == NULL) {
struct element *reply = (struct element *)array_push(rsp->token);
log_warn("command is recognized but not implemented");
Expand Down

0 comments on commit 5f6af0c

Please sign in to comment.