Skip to content

Commit

Permalink
zebra: don't build queue at slow dplane plugin
Browse files Browse the repository at this point in the history
Police new work entering the dplane system so that we don't
build up a queue at a slow dplane plugin.

Signed-off-by: Mark Stapp <mjs@voltanet.io>
  • Loading branch information
Mark Stapp committed Sep 3, 2020
1 parent 8641233 commit efa6d92
Showing 1 changed file with 42 additions and 20 deletions.
62 changes: 42 additions & 20 deletions zebra/zebra_dplane.c
Original file line number Diff line number Diff line change
Expand Up @@ -4231,11 +4231,19 @@ static int dplane_thread_loop(struct thread *event)
struct dplane_ctx_q error_list;
struct zebra_dplane_provider *prov;
struct zebra_dplane_ctx *ctx, *tctx;
int limit, counter, error_counter;
uint64_t curr, high;
uint32_t default_limit, limit, counter, error_counter;
uint64_t prov_q_counter, curr, high;

/* TODO
* X limit queue at first plugin
* - limit queue at each plugin (decide whether to check in-q first,
* or requeue to previous plugin if no room)
* - ensure the dplane pthread is scheduled if there are queues
*/

/* Capture work limit per cycle */
limit = zdplane_info.dg_updates_per_cycle;
default_limit = zdplane_info.dg_updates_per_cycle;
limit = default_limit;

/* Init temporary lists used to move contexts among providers */
TAILQ_INIT(&work_list);
Expand All @@ -4254,6 +4262,13 @@ static int dplane_thread_loop(struct thread *event)
/* Locate initial registered provider */
prov = TAILQ_FIRST(&zdplane_info.dg_providers_q);

prov_q_counter = atomic_load_explicit(&prov->dp_in_queued,
memory_order_relaxed);

/* Don't let the plugin queue build up too far */
if (prov_q_counter > 2 * limit)
limit = 0;

/* Move new work from incoming list to temp list */
for (counter = 0; counter < limit; counter++) {
ctx = TAILQ_FIRST(&zdplane_info.dg_update_ctx_q);
Expand All @@ -4271,8 +4286,9 @@ static int dplane_thread_loop(struct thread *event)

DPLANE_UNLOCK();

atomic_fetch_sub_explicit(&zdplane_info.dg_routes_queued, counter,
memory_order_relaxed);
if (counter > 0)
atomic_fetch_sub_explicit(&zdplane_info.dg_routes_queued,
counter, memory_order_relaxed);

if (IS_ZEBRA_DEBUG_DPLANE_DETAIL)
zlog_debug("dplane: incoming new work counter: %d", counter);
Expand Down Expand Up @@ -4315,25 +4331,26 @@ static int dplane_thread_loop(struct thread *event)
}

/* Enqueue new work to the provider */
dplane_provider_lock(prov);
if (TAILQ_FIRST(&work_list)) {
dplane_provider_lock(prov);

if (TAILQ_FIRST(&work_list))
TAILQ_CONCAT(&(prov->dp_ctx_in_q), &work_list,
zd_q_entries);

atomic_fetch_add_explicit(&prov->dp_in_counter, counter,
memory_order_relaxed);
atomic_fetch_add_explicit(&prov->dp_in_queued, counter,
memory_order_relaxed);
curr = atomic_load_explicit(&prov->dp_in_queued,
memory_order_relaxed);
high = atomic_load_explicit(&prov->dp_in_max,
memory_order_relaxed);
if (curr > high)
atomic_store_explicit(&prov->dp_in_max, curr,
memory_order_relaxed);

dplane_provider_unlock(prov);
atomic_fetch_add_explicit(&prov->dp_in_counter, counter,
memory_order_relaxed);
atomic_fetch_add_explicit(&prov->dp_in_queued, counter,
memory_order_relaxed);
curr = atomic_load_explicit(&prov->dp_in_queued,
memory_order_relaxed);
high = atomic_load_explicit(&prov->dp_in_max,
memory_order_relaxed);
if (curr > high)
atomic_store_explicit(&prov->dp_in_max, curr,
memory_order_relaxed);

dplane_provider_unlock(prov);
}

/* Reset the temp list (though the 'concat' may have done this
* already), and the counter
Expand Down Expand Up @@ -4367,6 +4384,11 @@ static int dplane_thread_loop(struct thread *event)
break;
}

if (counter > 0)
atomic_fetch_sub_explicit(&(prov->dp_out_queued),
counter,
memory_order_relaxed);

dplane_provider_unlock(prov);

if (IS_ZEBRA_DEBUG_DPLANE_DETAIL)
Expand Down

0 comments on commit efa6d92

Please sign in to comment.