Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/include/pcp/pmwebapi.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ extern int pmWebTimerSetup(void);
extern int pmWebTimerSetEventLoop(void *);
extern int pmWebTimerSetMetricRegistry(struct mmv_registry *);
extern void pmWebTimerClose(void);
extern void pmWebTimerLoopFinalize(void); /* uv_close timer; before uv_loop_close */

/*
* Asynchronous archive location and contents discovery services
Expand Down
4 changes: 4 additions & 0 deletions src/libpcp_web/src/exports.in
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,7 @@ PCP_WEB_1.23 {
dictSetVal;
keySlotsContextFree;
} PCP_WEB_1.22;

PCP_WEB_1.24 {
pmWebTimerLoopFinalize;
} PCP_WEB_1.23;
6 changes: 6 additions & 0 deletions src/libpcp_web/src/notimer.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ pmWebTimerClose(void)
/* noop */
}

void
pmWebTimerLoopFinalize(void)
{
/* noop */
}

int
pmWebTimerSetEventLoop(void *arg)
{
Expand Down
14 changes: 14 additions & 0 deletions src/libpcp_web/src/timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ pmWebTimerClose(void)
uv_mutex_unlock(&server.callback_mutex);
}

/*
* uv_close(2) the global instrumentation timer after pmWebTimerClose().
* Call before uv_loop_close(); run the loop once to process the close cb.
*/
void
pmWebTimerLoopFinalize(void)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just in terms of naming, "pmWebTimerDestroy" might be more consistent with other parts of libpcp_web?

{
if (server.events == NULL)
return;
if (uv_is_closing((uv_handle_t *)&server.timer))
return;
uv_close((uv_handle_t *)&server.timer, NULL);
}

static void
timer_worker(uv_timer_t *arg)
{
Expand Down
52 changes: 46 additions & 6 deletions src/libpcp_web/src/webgroup.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
#include <assert.h>
#include <ctype.h>
#include <stddef.h>
#include "pmapi.h"
#include "pmda.h"
#include "schema.h"
Expand Down Expand Up @@ -68,6 +69,9 @@ typedef struct webgroups {
unsigned int gc_timer_started;
} webgroups;

#define WG_FROM_TIMER(h) ((struct webgroups *)((char *)(h) - offsetof(struct webgroups, timer)))
#define WG_FROM_ASYNC(h) ((struct webgroups *)((char *)(h) - offsetof(struct webgroups, async)))

static struct webgroups *
webgroups_lookup(pmWebGroupModule *module)
{
Expand Down Expand Up @@ -331,6 +335,29 @@ webgroup_timers_stop(struct webgroups *groups)
groups->active = 0;
}

/*
* pmWebGroupClose: the GC timer and async handles live inside struct webgroups.
* Free the struct only after libuv has finished closing them (valgrind-safe).
*/
static void
webgroup_module_close_async_cb(uv_handle_t *handle)
{
struct webgroups *groups = WG_FROM_ASYNC(handle);

memset(groups, 0, sizeof(*groups));
free(groups);
}

static void
webgroup_module_close_timer_cb(uv_handle_t *handle)
{
struct webgroups *groups = WG_FROM_TIMER(handle);

groups->gc_timer_started = 0;
groups->active = 0;
uv_close((uv_handle_t *)&groups->async, webgroup_module_close_async_cb);
}

static void
webgroup_garbage_collect(struct webgroups *groups)
{
Expand Down Expand Up @@ -2639,6 +2666,12 @@ pmWebGroupClose(pmWebGroupModule *module)
struct webgroups *groups = (struct webgroups *)module->privdata;
dictEntry *entry;

/*
* Clear module->privdata before async closes finish so a nested close cannot
* run teardown twice; struct webgroups stays valid until close callbacks run.
*/
module->privdata = NULL;

if (groups) {
/* walk the contexts, stop timers and free resources */
{
Expand All @@ -2648,12 +2681,19 @@ pmWebGroupClose(pmWebGroupModule *module)
webgroup_drop_context((context_t *)dictGetVal(entry), NULL);
}
dictRelease(groups->contexts);
webgroup_timers_stop(groups);
if (groups->events)
uv_close((uv_handle_t *)&groups->async, NULL);
memset(groups, 0, sizeof(struct webgroups));
free(groups);
module->privdata = NULL;
groups->contexts = NULL;

if (groups->events) {
if (groups->active && groups->gc_timer_started) {
uv_timer_stop(&groups->timer);
uv_close((uv_handle_t *)&groups->timer, webgroup_module_close_timer_cb);
} else {
uv_close((uv_handle_t *)&groups->async, webgroup_module_close_async_cb);
}
} else {
memset(groups, 0, sizeof(struct webgroups));
free(groups);
}
}

sdsfree(PARAM_HOSTNAME);
Expand Down
112 changes: 101 additions & 11 deletions src/pmfind/source.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,23 @@
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
#include <stdlib.h>
#include <uv.h>
#include "dict.h"
#include "pmapi.h"
#include "source.h"
#include "pmwebapi.h"

/*
* uv_handle_get_loop() is not in older libuv (e.g. Ubuntu 18.04 packages). All
* libuv 1.x keep the owning loop as handle->loop (UV_HANDLE_FIELDS).
*/
static inline uv_loop_t *
pmfind_handle_loop(const uv_handle_t *handle)
{
return handle->loop;
}

extern dictType sdsDictCallBacks;
extern dictType sdsOwnDictCallBacks;
extern dictType sdsKeyDictCallBacks;
Expand All @@ -31,13 +42,21 @@ typedef struct {
typedef struct {
int status; /* exit status */
int count; /* url count */
int defer_pending; /* deferred source_release timers */
char **urls; /* callers */
uv_mutex_t mutex;
dict *uniq;
dict *params;
dict *contexts;
} sources_t;

typedef struct source_defer {
uv_timer_t timer;
sources_t *sp;
context_t *cp;
sds ctx;
} source_defer_t;

static void
source_release(sources_t *sp, context_t *cp, sds ctx)
{
Expand All @@ -48,16 +67,51 @@ source_release(sources_t *sp, context_t *cp, sds ctx)
}

static void
sources_release(void *arg, const struct dictEntry *entry)
source_defer_close(uv_handle_t *handle)
{
sources_t *sp = (sources_t *)arg;
context_t *cp = (context_t *)dictGetVal(entry);
sds ctx = (sds)dictGetKey(entry);
source_defer_t *d = (source_defer_t *)handle->data;

if (pmDebugOptions.discovery)
fprintf(stderr, "releasing context %s\n", ctx);
free(d);
}

source_release(sp, cp, ctx);
/*
* pmWebGroupContext calls on_done, then webgroup_deref_context(cp). Calling
* pmWebGroupDestroy from on_done frees cp first — use-after-free. Defer destroy
* to the next event-loop tick so deref runs while cp is still valid.
*/
static void
source_defer_cb(uv_timer_t *handle)
{
source_defer_t *d = (source_defer_t *)handle->data;
uv_loop_t *loop = pmfind_handle_loop((uv_handle_t *)handle);

source_release(d->sp, d->cp, d->ctx);
if (--d->sp->defer_pending == 0)
uv_stop(loop);
uv_close((uv_handle_t *)handle, source_defer_close);
}

static int
source_release_deferred(uv_loop_t *loop, sources_t *sp, context_t *cp, sds ctx)
{
source_defer_t *d = malloc(sizeof(*d));

if (d == NULL) {
fprintf(stderr, "%s: out of memory deferring context release\n",
pmGetProgname());
return -ENOMEM;
}
d->sp = sp;
d->cp = cp;
d->ctx = ctx;
uv_timer_init(loop, &d->timer);
d->timer.data = d;
if (uv_timer_start(&d->timer, source_defer_cb, 0, 0) != 0) {
free(d);
return -EINVAL;
}
sp->defer_pending++;
return 0;
}

static void
Expand Down Expand Up @@ -166,8 +220,14 @@ on_source_done(sds context, int status, sds message, void *arg)
if (remove) {
if (pmDebugOptions.discovery)
fprintf(stderr, "remove context %s\n", context);
source_release(sp, cp, context);
/*
* Remove from the dict before pmWebGroupDestroy. The lookup key is the
* PMWEBAPI context id (libpcp_web frees it in pmWebGroupDestroy); calling
* dictDelete after that uses freed memory and corrupts the heap.
*/
dictDelete(sp->contexts, context);
if (source_release_deferred(uv_default_loop(), sp, cp, context) < 0)
sp->status = 1;
}

if (release) {
Expand All @@ -176,7 +236,9 @@ on_source_done(sds context, int status, sds message, void *arg)
dictEntry *entry;
dictInitIterator(&iter, sp->contexts);
while ((entry = dictNext(&iter)) != NULL) {
sources_release(sp, entry);
if (source_release_deferred(uv_default_loop(), sp,
(context_t *)dictGetVal(entry), (sds)dictGetKey(entry)) < 0)
sp->status = 1;
}
} else if (pmDebugOptions.discovery) {
fprintf(stderr, "not yet releasing (count=%d)\n", count);
Expand All @@ -198,14 +260,17 @@ on_source_info(pmLogLevel level, sds message, void *arg)
static void
sources_discovery_start(uv_timer_t *arg)
{
uv_loop_t *loop = pmfind_handle_loop((uv_handle_t *)arg);
uv_handle_t *handle = (uv_handle_t *)arg;
sources_t *sp = (sources_t *)handle->data;
dict *dp = dictCreate(&sdsOwnDictCallBacks);
sds name, value;
int i, fail = 0, total = sp->count;

if (dp == NULL)
if (dp == NULL) {
uv_stop(loop);
return;
}
name = sdsnew("hostspec");

for (i = 0; i < total; i++) {
Expand All @@ -231,6 +296,11 @@ sources_discovery_start(uv_timer_t *arg)

dictRelease(dp);
pmWebTimerClose();
/*
* Allow uv_run() in source_discovery() to return: libpcp_web leaves uv_async
* and timers referenced until pmWebGroupClose(), which runs after the loop.
*/
uv_stop(loop);
}

/*
Expand Down Expand Up @@ -281,12 +351,32 @@ source_discovery(int count, char **urls)
uv_timer_init(loop, &timing);
uv_timer_start(&timing, sources_discovery_start, 0, 0);
uv_run(loop, UV_RUN_DEFAULT);
/*
* on_source_done defers pmWebGroupDestroy. Without uv_stop, a second
* UV_RUN_DEFAULT would block forever (uv_async, GC timer, ...). The last
* deferred callback calls uv_stop so this run exits after work is drained.
*/
if (find.defer_pending > 0)
uv_run(loop, UV_RUN_DEFAULT);

/*
* Close libpcp_web loop handles before uv_loop_close(); drain each round of
* uv_close callbacks on this loop (Approach A — minimal shutdown).
*/
pmWebGroupClose(&settings.module);
uv_run(loop, UV_RUN_DEFAULT);

pmWebTimerLoopFinalize();
uv_run(loop, UV_RUN_DEFAULT);

uv_close((uv_handle_t *)&timing, NULL);
uv_run(loop, UV_RUN_DEFAULT);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wow, so many explicit calls to uv_run ... are they all really necessary?


uv_loop_close(loop);

/*
* Finished, release all resources acquired so far
*/
pmWebGroupClose(&settings.module);
uv_mutex_destroy(&find.mutex);
dictRelease(find.uniq);
dictRelease(find.params);
Expand Down
Loading