Skip to content

Commit

Permalink
Add a generic way for any bufferevent to make its callback deferred
Browse files Browse the repository at this point in the history
svn:r1197
  • Loading branch information
nmathewson committed Apr 17, 2009
1 parent 99de186 commit a98a512
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 39 deletions.
5 changes: 4 additions & 1 deletion ChangeLog
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
Changes in current version:
Changes in 2.0.2-alpha:
o Add a new flag to bufferevents to make all callbacks automatically deferred.

Changes in 2.0.1-alpha:
o free minheap on event_base_free(); from Christopher Layne
o debug cleanups in signal.c; from Christopher Layne
o provide event_base_new() that does not set the current_base global
Expand Down
11 changes: 11 additions & 0 deletions bufferevent-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,15 @@ struct bufferevent_private {

/** If set, read is suspended until evbuffer some. */
unsigned read_suspended : 1;
/** If set, we should free the lock when we free the bufferevent. */
unsigned own_lock : 1;

unsigned readcb_pending : 1;
unsigned writecb_pending : 1;
short errorcb_pending;
int errno_pending;
struct deferred_cb deferred;

enum bufferevent_options options;

int refcnt;
Expand Down Expand Up @@ -113,6 +120,10 @@ int bufferevent_enable_locking(struct bufferevent *bufev, void *lock);
void bufferevent_incref(struct bufferevent *bufev);
void _bufferevent_decref_and_unlock(struct bufferevent *bufev);

void _bufferevent_run_readcb(struct bufferevent *bufev);
void _bufferevent_run_writecb(struct bufferevent *bufev);
void _bufferevent_run_errorcb(struct bufferevent *bufev, short what);

#define BEV_UPCAST(b) EVUTIL_UPCAST((b), struct bufferevent_private, bev)

#define BEV_LOCK(b) do { \
Expand Down
87 changes: 87 additions & 0 deletions bufferevent.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#ifdef WIN32
#include <winsock2.h>
#endif
#include <errno.h>

#include "event2/util.h"
#include "event2/bufferevent.h"
Expand Down Expand Up @@ -112,6 +113,87 @@ bufferevent_inbuf_wm_cb(struct evbuffer *buf,
}
}

static void
bufferevent_run_deferred_callbacks(struct deferred_cb *_, void *arg)
{
struct bufferevent_private *bufev_private = arg;
struct bufferevent *bufev = &bufev_private->bev;

BEV_LOCK(bufev);
if (bufev_private->readcb_pending && bufev->readcb) {
bufev_private->readcb_pending = 0;
bufev->readcb(bufev, bufev->cbarg);
}
if (bufev_private->writecb_pending && bufev->writecb) {
bufev_private->writecb_pending = 0;
bufev->writecb(bufev, bufev->cbarg);
}
if (bufev_private->errorcb_pending && bufev->errorcb) {
short what = bufev_private->errorcb_pending;
int err = bufev_private->errno_pending;
bufev_private->errorcb_pending = 0;
bufev_private->errno_pending = 0;
EVUTIL_SET_SOCKET_ERROR(err);
bufev->errorcb(bufev, what, bufev->cbarg);
}
_bufferevent_decref_and_unlock(bufev);
}

void
_bufferevent_run_readcb(struct bufferevent *bufev)
{
/* Requires lock. */
struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->readcb_pending = 1;
if (!p->deferred.queued) {
bufferevent_incref(bufev);
event_deferred_cb_schedule(
bufev->ev_base, &p->deferred);
}
} else {
bufev->readcb(bufev, bufev->cbarg);
}
}

void
_bufferevent_run_writecb(struct bufferevent *bufev)
{
/* Requires lock. */
struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->writecb_pending = 1;
if (!p->deferred.queued) {
bufferevent_incref(bufev);
event_deferred_cb_schedule(
bufev->ev_base, &p->deferred);
}
} else {
bufev->writecb(bufev, bufev->cbarg);
}
}

void
_bufferevent_run_errorcb(struct bufferevent *bufev, short what)
{
/* Requires lock. */
struct bufferevent_private *p =
EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
if (p->options & BEV_OPT_DEFER_CALLBACKS) {
p->errorcb_pending |= what;
p->errno_pending = EVUTIL_SOCKET_ERROR();
if (!p->deferred.queued) {
bufferevent_incref(bufev);
event_deferred_cb_schedule(
bufev->ev_base, &p->deferred);
}
} else {
bufev->errorcb(bufev, what, bufev->cbarg);
}
}

int
bufferevent_init_common(struct bufferevent_private *bufev_private,
struct event_base *base,
Expand Down Expand Up @@ -152,6 +234,11 @@ bufferevent_init_common(struct bufferevent_private *bufev_private,
}
}
#endif
if (options & BEV_OPT_DEFER_CALLBACKS) {
event_deferred_cb_init(&bufev_private->deferred,
bufferevent_run_deferred_callbacks,
bufev_private);
}

bufev_private->options = options;

Expand Down
6 changes: 3 additions & 3 deletions bufferevent_filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ be_filter_process_output(struct bufferevent_filtered *bevf,
if (processed && bufev->writecb &&
evbuffer_get_length(bufev->output) <= bufev->wm_write.low) {
/* call the write callback.*/
(*bufev->writecb)(bufev, bufev->cbarg);
_bufferevent_run_writecb(bufev);

if (res == BEV_OK &&
(bufev->enabled & EV_WRITE) &&
Expand Down Expand Up @@ -396,7 +396,7 @@ be_filter_readcb(struct bufferevent *underlying, void *_me)
if (processed_any &&
evbuffer_get_length(bufev->input) >= bufev->wm_read.low &&
bufev->readcb != NULL)
(*bufev->readcb)(bufev, bufev->cbarg);
_bufferevent_run_readcb(bufev);
}

/* Called when the underlying socket has drained enough that we can write to
Expand All @@ -419,7 +419,7 @@ be_filter_errorcb(struct bufferevent *underlying, short what, void *_me)

/* All we can really to is tell our own errorcb. */
if (bev->errorcb)
bev->errorcb(bev, what, bev->cbarg);
_bufferevent_run_errorcb(bev, what);
}

static int
Expand Down
36 changes: 6 additions & 30 deletions bufferevent_pair.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@
struct bufferevent_pair {
struct bufferevent_private bev;
struct bufferevent_pair *partner;
struct deferred_cb deferred_write_cb;
struct deferred_cb deferred_read_cb;
};


Expand All @@ -69,25 +67,6 @@ upcast(struct bufferevent *bev)
static void be_pair_outbuf_cb(struct evbuffer *,
const struct evbuffer_cb_info *, void *);

static void
run_callback(struct deferred_cb *cb, void *arg)
{
struct bufferevent_pair *bufev = arg;
struct bufferevent *bev = downcast(bufev);

BEV_LOCK(bev);
if (cb == &bufev->deferred_read_cb) {
if (bev->readcb) {
bev->readcb(bev, bev->cbarg);
}
} else {
if (bev->writecb) {
bev->writecb(bev, bev->cbarg);
}
}
BEV_UNLOCK(bev);
}

static struct bufferevent_pair *
bufferevent_pair_elt_new(struct event_base *base,
enum bufferevent_options options)
Expand All @@ -106,8 +85,6 @@ bufferevent_pair_elt_new(struct event_base *base,
bufferevent_free(downcast(bufev));
return NULL;
}
event_deferred_cb_init(&bufev->deferred_read_cb, run_callback, bufev);
event_deferred_cb_init(&bufev->deferred_write_cb, run_callback, bufev);

return bufev;
}
Expand All @@ -117,7 +94,10 @@ bufferevent_pair_new(struct event_base *base, enum bufferevent_options options,
struct bufferevent *pair[2])
{
struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL;
enum bufferevent_options tmp_options = options & ~BEV_OPT_THREADSAFE;
enum bufferevent_options tmp_options;

options |= BEV_OPT_DEFER_CALLBACKS;
tmp_options = options & ~BEV_OPT_THREADSAFE;

bufev1 = bufferevent_pair_elt_new(base, options);
if (!bufev1)
Expand Down Expand Up @@ -175,12 +155,10 @@ be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
dst_size = evbuffer_get_length(dst->input);

if (dst_size >= dst->wm_read.low && dst->readcb) {
event_deferred_cb_schedule(dst->ev_base,
&(upcast(dst)->deferred_read_cb));
_bufferevent_run_readcb(dst);
}
if (src_size <= src->wm_write.low && src->writecb) {
event_deferred_cb_schedule(src->ev_base,
&(upcast(src)->deferred_write_cb));
_bufferevent_run_writecb(src);
}
done:
evbuffer_freeze(src->output, 1);
Expand Down Expand Up @@ -247,8 +225,6 @@ be_pair_destruct(struct bufferevent *bev)
bev_p->partner->partner = NULL;
bev_p->partner = NULL;
}
event_deferred_cb_cancel(bev->ev_base, &bev_p->deferred_write_cb);
event_deferred_cb_cancel(bev->ev_base, &bev_p->deferred_read_cb);
}

static void
Expand Down
9 changes: 4 additions & 5 deletions bufferevent_sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)
/* Invoke the user callback - must always be called last */
if (evbuffer_get_length(input) >= bufev->wm_read.low &&
bufev->readcb != NULL)
(*bufev->readcb)(bufev, bufev->cbarg);
_bufferevent_run_readcb(bufev);

return;

Expand All @@ -165,8 +165,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg)

error:
event_del(&bufev->ev_read);
(*bufev->errorcb)(bufev, what, bufev->cbarg);

_bufferevent_run_errorcb(bufev, what);
}

static void
Expand Down Expand Up @@ -207,7 +206,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
*/
if (bufev->writecb != NULL &&
evbuffer_get_length(bufev->output) <= bufev->wm_write.low)
(*bufev->writecb)(bufev, bufev->cbarg);
_bufferevent_run_writecb(bufev);

return;

Expand All @@ -218,7 +217,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)

error:
event_del(&bufev->ev_write);
(*bufev->errorcb)(bufev, what, bufev->cbarg);
_bufferevent_run_errorcb(bufev, what);
}

struct bufferevent *
Expand Down
3 changes: 3 additions & 0 deletions include/event2/bufferevent.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ enum bufferevent_options {
/** If set, and threading is enabled, operations on this bufferevent
* are protected by a lock */
BEV_OPT_THREADSAFE = (1<<1),

/** If set, callbacks are run deferred in the event loop. */
BEV_OPT_DEFER_CALLBACKS = (1<<2),
};

/**
Expand Down

0 comments on commit a98a512

Please sign in to comment.