-
-
Notifications
You must be signed in to change notification settings - Fork 5.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix intermittent threaded loop executed in order
test warning
#44479
Fix intermittent threaded loop executed in order
test warning
#44479
Conversation
Co-Authored-By: Takafumi Arakaki <29282+tkf@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also add more random spins to increase jittering. But we can also test the original fix in this PR first to see if it is enough to fix the issue.
Co-authored-by: Takafumi Arakaki <takafumi.a@gmail.com>
It wasn't:
https://build.julialang.org/#/builders/20/builds/3289/steps/5/logs/stdio |
10 elements sound too small to expect reordering to happen semi-robustly. I guess we can ensure thread 1 to be slow by using two latches: @inline function countdownlatch!(counter)
counter[] == 0 && return false
countdownlatch_wait!(counter)
return true
end
@noinline function countdownlatch_wait!(counter)
Threads.atomic_sub!(counter, 1)
spins = 0
while counter[] != 0
GC.safepoint()
ccall(:jl_cpu_pause, Cvoid, ())
spins += 1
if spins > 500_000_000 # about 10 seconds
@warn "Failed wait for all workers. Unfinished rogue tasks occupying worker threads?"
break
end
end
end
# parallel loop with parallel atomic addition
function threaded_loop(a, r, x)
counter1 = Threads.Atomic{Int}(min(Threads.nthreads(), length(r)))
counter2 = Threads.Atomic{Int}(min(Threads.nthreads(), length(r)))
@threads for i in r
# synchronize the start given that each partition is started sequentially,
# meaning that without the wait, if the loop is too fast the iteration can happen in order
if countdownlatch!(counter1)
# thread 1 waits for other threads to complete the first iteration
Threads.threadid() == 1 && countdownlatch!(counter2)
end
j = i - firstindex(r) + 1
a[j] = 1 + atomic_add!(x, 1)
Threads.threadid() != 1 && countdownlatch!(counter2)
end
end |
Actually, what do we want to test with the unordered results in function threaded_loop(a, r, x)
counter = Threads.Atomic{Int}(min(Threads.nthreads(), length(r)))
success = fill!(Vector{Bool}(undef, Threads.nthreads()), false)
@threads for i in r
# synchronize the start given that each partition is started sequentially,
# meaning that without the wait, if the loop is too fast the iteration can happen in order
if counter[] != 0
Threads.atomic_sub!(counter, 1)
spins = 0
ok = true
while counter[] != 0
GC.safepoint()
ccall(:jl_cpu_pause, Cvoid, ())
spins += 1
if spins > 500_000_000 # about 10 seconds
@warn "Failed wait for all workers. Unfinished rogue tasks occupying worker threads?"
ok = false
break
end
end
success[Threads.threadid()] = ok
end
j = i - firstindex(r) + 1
a[j] = 1 + atomic_add!(x, 1)
end
return success
end |
Yeah, I think what we need is simply provided by the But really, I think what's needed is just what I pushed? |
Yeah, I think it is fine as-is. I just thought the count-down latch might be reliable enough to promote the warning to a
Oops, I should've written |
I tried that but it wasn't quite enough as I think this serves the goal. |
(cherry picked from commit cfb0d46)
Intends to fix the intermittent
Warning: threaded loop executed in order
warnings in tests.Because the partitions of
@threads
are contiguous chunks of the input range, and are started sequentially, it's possible for the loop to be executed range order if the loop partition takes a shorter time to execute than the next partition takes to start up. Executing in order makes this test think the loop isn't threading, so it warns.This synchronizes the start of the partitions to make threaded concurrency much more likely.
Co-authored by @tkf