Skip to content

Commit 531e479

Browse files
committed
Put "straggler" task in sticky queue
1 parent e862411 commit 531e479

File tree

1 file changed

+10
-58
lines changed

1 file changed

+10
-58
lines changed

src/partr.c

Lines changed: 10 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,10 @@ static int wsdeque_push(jl_task_t *task, int16_t priority_ignord)
272272
return -1;
273273
jl_atomic_store_relaxed(
274274
(_Atomic(jl_task_t *) *)&wsdeques[tid].tasks[b % tasks_per_heap], task);
275+
if (jl_atomic_load_acquire(&task->tid) != -1)
276+
// If the `task` still hasn't finished the context switch at this point, abort push
277+
// and put it in the sticky queue.
278+
return -1;
275279
jl_fence_release();
276280
jl_atomic_store_relaxed(&wsdeques[tid].bottom, b + 1);
277281
return 0;
@@ -317,75 +321,23 @@ static jl_task_t *wsdeque_steal(int16_t tid)
317321
}
318322

319323

320-
static const int wsdeque_pop_stash = 16;
321-
322-
323324
static jl_task_t *wsdeque_pop_or_steal(void)
324325
{
325326
jl_ptls_t ptls = jl_current_task->ptls;
326-
jl_task_t *task = NULL;
327-
328-
// Try pop and lock the top `wsdeque_pop_stash` tasks in the local deque.
329-
jl_task_t *stash[wsdeque_pop_stash];
330-
int n_stashed = 0;
331-
for (; n_stashed < wsdeque_pop_stash; n_stashed++) {
332-
task = wsdeque_pop();
333-
if (task != NULL)
334-
if (!jl_set_task_tid(task, ptls->tid)) {
335-
stash[n_stashed] = task;
336-
task = NULL;
337-
continue;
338-
}
339-
break;
340-
}
341-
// Put back stashed tasks in the original order; TODO: batch insert?
342-
for (int i = n_stashed - 1; i >= 0; i--) {
343-
int err = wsdeque_push(stash[i], 0);
344-
(void)err;
345-
assert(!err);
346-
}
347-
int pushed = n_stashed;
348-
if (task)
349-
goto done;
350-
if (jl_n_threads < 2)
351-
goto done;
352-
353-
// Compute the lower bound of the number of empty slots. It's OK to be
354-
// smaller than the actual number (which can happen when other threads steal
355-
// some tasks). Note that `- 1` here is required since Chase-Lev deque needs
356-
// one empty slot.
357-
int64_t empty_slots = tasks_per_heap - 1;
358-
if (n_stashed > 0) {
359-
int64_t b = jl_atomic_load_relaxed(&wsdeques[ptls->tid].bottom);
360-
int64_t t = jl_atomic_load_relaxed(&wsdeques[ptls->tid].top);
361-
empty_slots -= b - t;
362-
}
327+
jl_task_t *task = wsdeque_pop();
328+
if (task || jl_n_threads < 2)
329+
return task;
363330

364331
int ntries = jl_n_threads;
365-
if (ntries > empty_slots)
366-
ntries = empty_slots; // because `wsdeque_push` below can't fail
367332
for (int i = 0; i < ntries; ++i) {
368333
uint64_t tid = cong(jl_n_threads - 1, cong_unbias, &ptls->rngseed);
369334
if (tid >= ptls->tid)
370335
tid++;
371336
task = wsdeque_steal(tid);
372-
if (task != NULL) {
373-
if (!jl_set_task_tid(task, ptls->tid)) {
374-
int err = wsdeque_push(task, 0);
375-
(void)err;
376-
assert(!err);
377-
pushed = 1;
378-
task = NULL;
379-
continue;
380-
}
381-
break;
382-
}
337+
if (task)
338+
return task;
383339
}
384-
385-
done:
386-
if (pushed)
387-
jl_wakeup_thread(-1);
388-
return task;
340+
return NULL;
389341
}
390342

391343

0 commit comments

Comments
 (0)