Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 92 additions & 7 deletions src/libpcp_web/src/webgroup.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ typedef struct webgroups {
struct context *pending_timer_init;
unsigned int active;
unsigned int gc_timer_started;

pmWebGroupModule *module; /* owner; used when freeing after uv_close */
unsigned int close_pending; /* remaining embedded handle closes */
} webgroups;

static struct webgroups *
Expand All @@ -76,6 +79,7 @@ webgroups_lookup(pmWebGroupModule *module)
if (module->privdata == NULL) {
module->privdata = calloc(1, sizeof(struct webgroups));
groups = (struct webgroups *)module->privdata;
groups->module = module;
uv_mutex_init(&groups->mutex);
}
return groups;
Expand All @@ -102,6 +106,34 @@ webgroup_release_context(uv_handle_t *handle)
pmwebapi_free_context(context);
}

/*
* Drop a context from the deferred timer-init queue (see webgroup_new_context
* and webgroup_async_cb). If the context is freed while still queued, the
* async callback would otherwise follow dangling next_pending pointers.
*/
static void
webgroup_pending_timer_init_remove(struct context *context, struct webgroups *groups)
{
struct context **pp, *p;

if (groups == NULL)
groups = (struct webgroups *)context->privdata;
if (groups == NULL)
return;

uv_mutex_lock(&groups->mutex);
pp = &groups->pending_timer_init;
while ((p = *pp) != NULL) {
if (p == context) {
*pp = p->next_pending;
context->next_pending = NULL;
break;
}
pp = &p->next_pending;
}
uv_mutex_unlock(&groups->mutex);
}

static void
webgroup_drop_context(struct context *context, struct webgroups *groups)
{
Expand All @@ -110,6 +142,7 @@ webgroup_drop_context(struct context *context, struct webgroups *groups)
context, context->refcount);

if (webgroup_deref_context(context) == 0) {
webgroup_pending_timer_init_remove(context, groups);
if (context->garbage == 0) {
context->garbage = 1;
if (context->timer_init)
Expand Down Expand Up @@ -546,9 +579,15 @@ pmWebGroupContext(pmWebGroupSettings *sp, sds id, dict *params, void *arg)
pmWebSource context;
sds msg = NULL;
int sts = 0;
unsigned int context_key = 0;
int have_context_key = 0;
char *endptr = NULL;

if ((cp = webgroup_lookup_context(sp, &id, params, &sts, &msg, arg))) {
id = cp->origin;
context_key = (unsigned int)strtoul(id, &endptr, 10);
if (*endptr == '\0')
have_context_key = 1;
pmwebapi_context_hash(cp);
context.source = pmwebapi_hash_sds(NULL, cp->name.hash);
context.hostspec = cp->host;
Expand All @@ -564,7 +603,16 @@ pmWebGroupContext(pmWebGroupSettings *sp, sds id, dict *params, void *arg)
}

sp->callbacks.on_done(id, sts, msg, arg);
webgroup_deref_context(cp);

if (have_context_key) {
struct webgroups *groups = webgroups_lookup(&sp->module);
dictEntry *entry;

entry = dictFind(groups->contexts, &context_key);
if (entry != NULL)
webgroup_deref_context((struct context *)dictGetVal(entry));
}

sdsfree(msg);
return sts;
}
Expand Down Expand Up @@ -2633,11 +2681,30 @@ pmWebGroupSetMetricRegistry(pmWebGroupModule *module, mmv_registry_t *registry)
return -ENOMEM;
}

static void
webgroups_handle_closed(uv_handle_t *handle)
{
struct webgroups *groups = (struct webgroups *)handle->data;
pmWebGroupModule *module;

if (groups == NULL)
return;
if (--groups->close_pending != 0)
return;

module = groups->module;
uv_mutex_destroy(&groups->mutex);
free(groups);
if (module != NULL)
module->privdata = NULL;
}

void
pmWebGroupClose(pmWebGroupModule *module)
{
struct webgroups *groups = (struct webgroups *)module->privdata;
dictEntry *entry;
unsigned int close_pending;

if (groups) {
/* walk the contexts, stop timers and free resources */
Expand All @@ -2648,12 +2715,30 @@ pmWebGroupClose(pmWebGroupModule *module)
webgroup_drop_context((context_t *)dictGetVal(entry), NULL);
}
dictRelease(groups->contexts);
webgroup_timers_stop(groups);
if (groups->events)
uv_close((uv_handle_t *)&groups->async, NULL);
memset(groups, 0, sizeof(struct webgroups));
free(groups);
module->privdata = NULL;
groups->contexts = NULL;

close_pending = 0;
if (groups->active && groups->gc_timer_started) {
uv_timer_stop(&groups->timer);
groups->timer.data = (void *)groups;
uv_close((uv_handle_t *)&groups->timer, webgroups_handle_closed);
close_pending++;
}
groups->active = 0;
groups->gc_timer_started = 0;

if (groups->events) {
groups->async.data = (void *)groups;
uv_close((uv_handle_t *)&groups->async, webgroups_handle_closed);
close_pending++;
}

if (close_pending == 0) {
uv_mutex_destroy(&groups->mutex);
free(groups);
module->privdata = NULL;
} else
groups->close_pending = close_pending;
}

sdsfree(PARAM_HOSTNAME);
Expand Down
13 changes: 11 additions & 2 deletions src/pmfind/source.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ sources_release(void *arg, const struct dictEntry *entry)
if (pmDebugOptions.discovery)
fprintf(stderr, "releasing context %s\n", ctx);

dictDelete(sp->contexts, ctx);
source_release(sp, cp, ctx);
}

Expand Down Expand Up @@ -166,8 +167,9 @@ on_source_done(sds context, int status, sds message, void *arg)
if (remove) {
if (pmDebugOptions.discovery)
fprintf(stderr, "remove context %s\n", context);
source_release(sp, cp, context);

dictDelete(sp->contexts, context);
source_release(sp, cp, context);
}

if (release) {
Expand Down Expand Up @@ -231,6 +233,8 @@ sources_discovery_start(uv_timer_t *arg)

dictRelease(dp);
pmWebTimerClose();

uv_stop(((uv_timer_t *)arg)->loop);
}

/*
Expand Down Expand Up @@ -281,12 +285,17 @@ source_discovery(int count, char **urls)
uv_timer_init(loop, &timing);
uv_timer_start(&timing, sources_discovery_start, 0, 0);
uv_run(loop, UV_RUN_DEFAULT);

pmWebGroupClose(&settings.module);

uv_close((uv_handle_t *)&timing, NULL);
(void)uv_run(loop, UV_RUN_DEFAULT);

uv_loop_close(loop);

/*
* Finished, release all resources acquired so far
*/
pmWebGroupClose(&settings.module);
uv_mutex_destroy(&find.mutex);
dictRelease(find.uniq);
dictRelease(find.params);
Expand Down
Loading