From 9d6f71fd2ed0a290781e547c7573e86010ff660f Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Sun, 8 Sep 2024 15:27:28 -0400 Subject: [PATCH 01/23] Fix `sync.Benaphore` The calls to `atomic_add*` return the value before adding, not after, so the previous code was causing the occasional data race. --- core/sync/extended.odin | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/sync/extended.odin b/core/sync/extended.odin index b446fefa015..ffba40ef898 100644 --- a/core/sync/extended.odin +++ b/core/sync/extended.odin @@ -355,7 +355,7 @@ from entering any critical sections associated with the same benaphore, until until the lock is released. */ benaphore_lock :: proc "contextless" (b: ^Benaphore) { - if atomic_add_explicit(&b.counter, 1, .Acquire) > 1 { + if atomic_add_explicit(&b.counter, 1, .Acquire) > 0 { sema_wait(&b.sema) } } @@ -384,7 +384,7 @@ are waiting on the lock, exactly one thread is allowed into a critical section associated with the same banaphore. */ benaphore_unlock :: proc "contextless" (b: ^Benaphore) { - if atomic_sub_explicit(&b.counter, 1, .Release) > 0 { + if atomic_sub_explicit(&b.counter, 1, .Release) > 1 { sema_post(&b.sema) } } @@ -740,4 +740,4 @@ Make event available. one_shot_event_signal :: proc "contextless" (e: ^One_Shot_Event) { atomic_store_explicit(&e.state, 1, .Release) futex_broadcast(&e.state) -} \ No newline at end of file +} From 74b28f1ff91d4776475f4009fa2bcda71c655cd5 Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Sun, 8 Sep 2024 17:25:48 -0400 Subject: [PATCH 02/23] Fix rare double-join possibility in POSIX `thread._join` This was occuring about 1/100 times with the test runner's thread pool. --- core/thread/thread_unix.odin | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/core/thread/thread_unix.odin b/core/thread/thread_unix.odin index ddc47244c16..d165560aca4 100644 --- a/core/thread/thread_unix.odin +++ b/core/thread/thread_unix.odin @@ -9,8 +9,6 @@ import "core:time" _IS_SUPPORTED :: true -CAS :: sync.atomic_compare_exchange_strong - // NOTE(tetra): Aligned here because of core/unix/pthread_linux.odin/pthread_t. // Also see core/sys/darwin/mach_darwin.odin/semaphore_t. Thread_Os_Specific :: struct #align(16) { @@ -140,24 +138,18 @@ _is_done :: proc(t: ^Thread) -> bool { } _join :: proc(t: ^Thread) { - // sync.guard(&t.mutex) - if unix.pthread_equal(unix.pthread_self(), t.unix_thread) { return } - // Preserve other flags besides `.Joined`, like `.Started`. - unjoined := sync.atomic_load(&t.flags) - {.Joined} - joined := unjoined + {.Joined} - - // Try to set `t.flags` from unjoined to joined. If it returns joined, - // it means the previous value had that flag set and we can return. - if res, ok := CAS(&t.flags, unjoined, joined); res == joined && !ok { + // If the previous value was already `Joined`, then we can return. + if .Joined in sync.atomic_or(&t.flags, {.Joined}) { return } + // Prevent non-started threads from blocking main thread with initial wait // condition. - if .Started not_in unjoined { + if .Started not_in sync.atomic_load(&t.flags) { _start(t) } unix.pthread_join(t.unix_thread, nil) From cbd4d5e765646ef07c4133ab65e06652a87a1916 Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Sun, 8 Sep 2024 17:54:45 -0400 Subject: [PATCH 03/23] Fix data race in `atomic_sema_wait_with_timeout` --- core/sync/primitives_atomic.odin | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/sync/primitives_atomic.odin b/core/sync/primitives_atomic.odin index 1d8e423db72..076a74b2024 100644 --- a/core/sync/primitives_atomic.odin +++ b/core/sync/primitives_atomic.odin @@ -361,7 +361,7 @@ atomic_sema_wait_with_timeout :: proc "contextless" (s: ^Atomic_Sema, duration: if !futex_wait_with_timeout(&s.count, u32(original_count), remaining) { return false } - original_count = s.count + original_count = atomic_load_explicit(&s.count, .Relaxed) } if original_count == atomic_compare_exchange_strong_explicit(&s.count, original_count, original_count-1, .Acquire, .Acquire) { return true From 4d14b4257e7570216826c5cbcee94aa51116e3b3 Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Sun, 8 Sep 2024 18:05:34 -0400 Subject: [PATCH 04/23] Convert POSIX `Thread` to use semaphore instead One less value to store, and it should be less of a hack too. Semaphores will not wait around if they have the go-ahead; they depend on an internal value being non-zero, instead of whatever was loaded when they started waiting, which is the case with a `Cond`. --- core/thread/thread_unix.odin | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/core/thread/thread_unix.odin b/core/thread/thread_unix.odin index d165560aca4..3d3b419b096 100644 --- a/core/thread/thread_unix.odin +++ b/core/thread/thread_unix.odin @@ -5,7 +5,6 @@ package thread import "base:runtime" import "core:sync" import "core:sys/unix" -import "core:time" _IS_SUPPORTED :: true @@ -13,8 +12,7 @@ _IS_SUPPORTED :: true // Also see core/sys/darwin/mach_darwin.odin/semaphore_t. Thread_Os_Specific :: struct #align(16) { unix_thread: unix.pthread_t, // NOTE: very large on Darwin, small on Linux. - cond: sync.Cond, - mutex: sync.Mutex, + start_ok: sync.Sema, } // // Creates a thread which will run the given procedure. @@ -27,14 +25,10 @@ _create :: proc(procedure: Thread_Proc, priority: Thread_Priority) -> ^Thread { // We need to give the thread a moment to start up before we enable cancellation. can_set_thread_cancel_state := unix.pthread_setcancelstate(unix.PTHREAD_CANCEL_ENABLE, nil) == 0 - sync.lock(&t.mutex) - t.id = sync.current_thread_id() - for (.Started not_in sync.atomic_load(&t.flags)) { - // HACK: use a timeout so in the event that the condition is signalled at THIS comment's exact point - // (after checking flags, before starting the wait) it gets itself out of that deadlock after a ms. - sync.wait_with_timeout(&t.cond, &t.mutex, time.Millisecond) + if .Started not_in sync.atomic_load(&t.flags) { + sync.wait(&t.start_ok) } if .Joined in sync.atomic_load(&t.flags) { @@ -64,8 +58,6 @@ _create :: proc(procedure: Thread_Proc, priority: Thread_Priority) -> ^Thread { sync.atomic_or(&t.flags, { .Done }) - sync.unlock(&t.mutex) - if .Self_Cleanup in sync.atomic_load(&t.flags) { res := unix.pthread_detach(t.unix_thread) assert_contextless(res == 0) @@ -130,7 +122,7 @@ _create :: proc(procedure: Thread_Proc, priority: Thread_Priority) -> ^Thread { _start :: proc(t: ^Thread) { sync.atomic_or(&t.flags, { .Started }) - sync.signal(&t.cond) + sync.post(&t.start_ok) } _is_done :: proc(t: ^Thread) -> bool { From 45da0093774276223e3724a89e5b0a9f8ef7c9f7 Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Sun, 8 Sep 2024 18:21:55 -0400 Subject: [PATCH 05/23] Use more atomic handling of thread flags This can prevent a data race on Linux with `Self_Cleanup`. --- core/thread/thread.odin | 12 ++++++------ core/thread/thread_windows.odin | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/thread/thread.odin b/core/thread/thread.odin index 17ba1a0a2c2..c1cbceb42c3 100644 --- a/core/thread/thread.odin +++ b/core/thread/thread.odin @@ -272,7 +272,7 @@ create_and_start :: proc(fn: proc(), init_context: Maybe(runtime.Context) = nil, t := create(thread_proc, priority) t.data = rawptr(fn) if self_cleanup { - t.flags += {.Self_Cleanup} + intrinsics.atomic_or(&t.flags, {.Self_Cleanup}) } t.init_context = init_context start(t) @@ -307,7 +307,7 @@ create_and_start_with_data :: proc(data: rawptr, fn: proc(data: rawptr), init_co t.user_index = 1 t.user_args[0] = data if self_cleanup { - t.flags += {.Self_Cleanup} + intrinsics.atomic_or(&t.flags, {.Self_Cleanup}) } t.init_context = init_context start(t) @@ -347,7 +347,7 @@ create_and_start_with_poly_data :: proc(data: $T, fn: proc(data: T), init_contex mem.copy(&t.user_args[0], &data, size_of(T)) if self_cleanup { - t.flags += {.Self_Cleanup} + intrinsics.atomic_or(&t.flags, {.Self_Cleanup}) } t.init_context = init_context @@ -394,7 +394,7 @@ create_and_start_with_poly_data2 :: proc(arg1: $T1, arg2: $T2, fn: proc(T1, T2), _ = copy(user_args[n:], mem.ptr_to_bytes(&arg2)) if self_cleanup { - t.flags += {.Self_Cleanup} + intrinsics.atomic_or(&t.flags, {.Self_Cleanup}) } t.init_context = init_context @@ -443,7 +443,7 @@ create_and_start_with_poly_data3 :: proc(arg1: $T1, arg2: $T2, arg3: $T3, fn: pr _ = copy(user_args[n:], mem.ptr_to_bytes(&arg3)) if self_cleanup { - t.flags += {.Self_Cleanup} + intrinsics.atomic_or(&t.flags, {.Self_Cleanup}) } t.init_context = init_context @@ -494,7 +494,7 @@ create_and_start_with_poly_data4 :: proc(arg1: $T1, arg2: $T2, arg3: $T3, arg4: _ = copy(user_args[n:], mem.ptr_to_bytes(&arg4)) if self_cleanup { - t.flags += {.Self_Cleanup} + intrinsics.atomic_or(&t.flags, {.Self_Cleanup}) } t.init_context = init_context diff --git a/core/thread/thread_windows.odin b/core/thread/thread_windows.odin index 50a4e5fbc10..22c3eae6589 100644 --- a/core/thread/thread_windows.odin +++ b/core/thread/thread_windows.odin @@ -27,7 +27,7 @@ _create :: proc(procedure: Thread_Proc, priority: Thread_Priority) -> ^Thread { __windows_thread_entry_proc :: proc "system" (t_: rawptr) -> win32.DWORD { t := (^Thread)(t_) - if .Joined in t.flags { + if .Joined in sync.atomic_load(&t.flags) { return 0 } @@ -48,9 +48,9 @@ _create :: proc(procedure: Thread_Proc, priority: Thread_Priority) -> ^Thread { t.procedure(t) } - intrinsics.atomic_store(&t.flags, t.flags + {.Done}) + intrinsics.atomic_or(&t.flags, {.Done}) - if .Self_Cleanup in t.flags { + if .Self_Cleanup in sync.atomic_load(&t.flags) { win32.CloseHandle(t.win32_thread) t.win32_thread = win32.INVALID_HANDLE // NOTE(ftphikari): It doesn't matter which context 'free' received, right? From dbb783fbf20df1bba899b7a2bcbd65f71eb32fef Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Sun, 8 Sep 2024 18:59:55 -0400 Subject: [PATCH 06/23] Fix atomic memory order for `sync.ticket_mutex_unlock` --- core/sync/extended.odin | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/sync/extended.odin b/core/sync/extended.odin index ffba40ef898..fd2bda08ac0 100644 --- a/core/sync/extended.odin +++ b/core/sync/extended.odin @@ -297,7 +297,7 @@ waiting to acquire the lock, exactly one of those threads is unblocked and allowed into the critical section. */ ticket_mutex_unlock :: #force_inline proc "contextless" (m: ^Ticket_Mutex) { - atomic_add_explicit(&m.serving, 1, .Relaxed) + atomic_add_explicit(&m.serving, 1, .Release) } /* From c3f363cfbcee453c7d90b37429c92115e91216af Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Sun, 8 Sep 2024 21:59:55 -0400 Subject: [PATCH 07/23] Fix data race when `pool_stop_task` is called --- core/thread/thread_pool.odin | 1 + 1 file changed, 1 insertion(+) diff --git a/core/thread/thread_pool.odin b/core/thread/thread_pool.odin index 9bcc42968f0..d9166b45090 100644 --- a/core/thread/thread_pool.odin +++ b/core/thread/thread_pool.odin @@ -60,6 +60,7 @@ pool_thread_runner :: proc(t: ^Thread) { if task, ok := pool_pop_waiting(pool); ok { data.task = task pool_do_work(pool, task) + sync.guard(&pool.mutex) data.task = {} } } From 0a594147afbfdacece1d221d2dee744e612362c6 Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Mon, 9 Sep 2024 00:23:30 -0400 Subject: [PATCH 08/23] Use `contextless` procs in `core:sync` instead --- core/sync/extended.odin | 10 +++++----- core/sync/futex_darwin.odin | 12 ++++++------ core/sync/futex_freebsd.odin | 8 ++++---- core/sync/futex_linux.odin | 8 ++++---- core/sync/futex_netbsd.odin | 8 ++++---- core/sync/futex_openbsd.odin | 8 ++++---- core/sync/futex_wasm.odin | 8 ++++---- core/sync/primitives.odin | 18 +----------------- core/sync/primitives_atomic.odin | 2 +- 9 files changed, 33 insertions(+), 49 deletions(-) diff --git a/core/sync/extended.odin b/core/sync/extended.odin index fd2bda08ac0..83cc648b477 100644 --- a/core/sync/extended.odin +++ b/core/sync/extended.odin @@ -48,12 +48,12 @@ wait_group_add :: proc "contextless" (wg: ^Wait_Group, delta: int) { atomic_add(&wg.counter, delta) if wg.counter < 0 { - _panic("sync.Wait_Group negative counter") + panic_contextless("sync.Wait_Group negative counter") } if wg.counter == 0 { cond_broadcast(&wg.cond) if wg.counter != 0 { - _panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait") + panic_contextless("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait") } } } @@ -81,7 +81,7 @@ wait_group_wait :: proc "contextless" (wg: ^Wait_Group) { if wg.counter != 0 { cond_wait(&wg.cond, &wg.mutex) if wg.counter != 0 { - _panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait") + panic_contextless("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait") } } } @@ -105,7 +105,7 @@ wait_group_wait_with_timeout :: proc "contextless" (wg: ^Wait_Group, duration: t return false } if wg.counter != 0 { - _panic("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait") + panic_contextless("sync.Wait_Group misuse: sync.wait_group_add called concurrently with sync.wait_group_wait") } } return true @@ -494,7 +494,7 @@ for other threads for entering. */ recursive_benaphore_unlock :: proc "contextless" (b: ^Recursive_Benaphore) { tid := current_thread_id() - _assert(tid == b.owner, "tid != b.owner") + assert_contextless(tid == b.owner, "tid != b.owner") b.recursion -= 1 recursion := b.recursion if recursion == 0 { diff --git a/core/sync/futex_darwin.odin b/core/sync/futex_darwin.odin index fca9aadfe1f..daefd669992 100644 --- a/core/sync/futex_darwin.odin +++ b/core/sync/futex_darwin.odin @@ -48,7 +48,7 @@ _futex_wait_with_timeout :: proc "contextless" (f: ^Futex, expected: u32, durati case -ETIMEDOUT: return false case: - _panic("darwin.os_sync_wait_on_address_with_timeout failure") + panic_contextless("darwin.os_sync_wait_on_address_with_timeout failure") } } else { @@ -63,7 +63,7 @@ _futex_wait_with_timeout :: proc "contextless" (f: ^Futex, expected: u32, durati case ETIMEDOUT: return false case: - _panic("futex_wait failure") + panic_contextless("futex_wait failure") } return true @@ -83,7 +83,7 @@ _futex_signal :: proc "contextless" (f: ^Futex) { case -ENOENT: return case: - _panic("darwin.os_sync_wake_by_address_any failure") + panic_contextless("darwin.os_sync_wake_by_address_any failure") } } } else { @@ -99,7 +99,7 @@ _futex_signal :: proc "contextless" (f: ^Futex) { case ENOENT: return case: - _panic("futex_wake_single failure") + panic_contextless("futex_wake_single failure") } } @@ -119,7 +119,7 @@ _futex_broadcast :: proc "contextless" (f: ^Futex) { case -ENOENT: return case: - _panic("darwin.os_sync_wake_by_address_all failure") + panic_contextless("darwin.os_sync_wake_by_address_all failure") } } } else { @@ -135,7 +135,7 @@ _futex_broadcast :: proc "contextless" (f: ^Futex) { case ENOENT: return case: - _panic("futex_wake_all failure") + panic_contextless("futex_wake_all failure") } } diff --git a/core/sync/futex_freebsd.odin b/core/sync/futex_freebsd.odin index ac6e2400a4f..82021a71a09 100644 --- a/core/sync/futex_freebsd.odin +++ b/core/sync/futex_freebsd.odin @@ -21,7 +21,7 @@ _futex_wait :: proc "contextless" (f: ^Futex, expected: u32) -> bool { continue } - _panic("_futex_wait failure") + panic_contextless("_futex_wait failure") } unreachable() @@ -44,14 +44,14 @@ _futex_wait_with_timeout :: proc "contextless" (f: ^Futex, expected: u32, durati return false } - _panic("_futex_wait_with_timeout failure") + panic_contextless("_futex_wait_with_timeout failure") } _futex_signal :: proc "contextless" (f: ^Futex) { errno := freebsd._umtx_op(f, .WAKE, 1, nil, nil) if errno != nil { - _panic("_futex_signal failure") + panic_contextless("_futex_signal failure") } } @@ -59,6 +59,6 @@ _futex_broadcast :: proc "contextless" (f: ^Futex) { errno := freebsd._umtx_op(f, .WAKE, cast(c.ulong)max(i32), nil, nil) if errno != nil { - _panic("_futex_broadcast failure") + panic_contextless("_futex_broadcast failure") } } diff --git a/core/sync/futex_linux.odin b/core/sync/futex_linux.odin index fe57c12eda3..4d9101b9ff7 100644 --- a/core/sync/futex_linux.odin +++ b/core/sync/futex_linux.odin @@ -15,7 +15,7 @@ _futex_wait :: proc "contextless" (futex: ^Futex, expected: u32) -> bool { return true case: // TODO(flysand): More descriptive panic messages based on the vlaue of `errno` - _panic("futex_wait failure") + panic_contextless("futex_wait failure") } } @@ -34,7 +34,7 @@ _futex_wait_with_timeout :: proc "contextless" (futex: ^Futex, expected: u32, du case .NONE, .EINTR, .EAGAIN: return true case: - _panic("futex_wait_with_timeout failure") + panic_contextless("futex_wait_with_timeout failure") } } @@ -44,7 +44,7 @@ _futex_signal :: proc "contextless" (futex: ^Futex) { case .NONE: return case: - _panic("futex_wake_single failure") + panic_contextless("futex_wake_single failure") } } @@ -57,6 +57,6 @@ _futex_broadcast :: proc "contextless" (futex: ^Futex) { case .NONE: return case: - _panic("_futex_wake_all failure") + panic_contextless("_futex_wake_all failure") } } diff --git a/core/sync/futex_netbsd.odin b/core/sync/futex_netbsd.odin index d12409f3278..f81a126757b 100644 --- a/core/sync/futex_netbsd.odin +++ b/core/sync/futex_netbsd.odin @@ -35,7 +35,7 @@ _futex_wait :: proc "contextless" (futex: ^Futex, expected: u32) -> bool { case EINTR, EAGAIN: return true case: - _panic("futex_wait failure") + panic_contextless("futex_wait failure") } } return true @@ -55,7 +55,7 @@ _futex_wait_with_timeout :: proc "contextless" (futex: ^Futex, expected: u32, du case ETIMEDOUT: return false case: - _panic("futex_wait_with_timeout failure") + panic_contextless("futex_wait_with_timeout failure") } } return true @@ -63,12 +63,12 @@ _futex_wait_with_timeout :: proc "contextless" (futex: ^Futex, expected: u32, du _futex_signal :: proc "contextless" (futex: ^Futex) { if _, ok := intrinsics.syscall_bsd(unix.SYS___futex, uintptr(futex), FUTEX_WAKE_PRIVATE, 1, 0, 0, 0); !ok { - _panic("futex_wake_single failure") + panic_contextless("futex_wake_single failure") } } _futex_broadcast :: proc "contextless" (futex: ^Futex) { if _, ok := intrinsics.syscall_bsd(unix.SYS___futex, uintptr(futex), FUTEX_WAKE_PRIVATE, uintptr(max(i32)), 0, 0, 0); !ok { - _panic("_futex_wake_all failure") + panic_contextless("_futex_wake_all failure") } } diff --git a/core/sync/futex_openbsd.odin b/core/sync/futex_openbsd.odin index 4883a084145..1ffe4a9a562 100644 --- a/core/sync/futex_openbsd.odin +++ b/core/sync/futex_openbsd.odin @@ -36,7 +36,7 @@ _futex_wait :: proc "contextless" (f: ^Futex, expected: u32) -> bool { return false } - _panic("futex_wait failure") + panic_contextless("futex_wait failure") } _futex_wait_with_timeout :: proc "contextless" (f: ^Futex, expected: u32, duration: time.Duration) -> bool { @@ -62,14 +62,14 @@ _futex_wait_with_timeout :: proc "contextless" (f: ^Futex, expected: u32, durati return false } - _panic("futex_wait_with_timeout failure") + panic_contextless("futex_wait_with_timeout failure") } _futex_signal :: proc "contextless" (f: ^Futex) { res := _unix_futex(f, FUTEX_WAKE_PRIVATE, 1, nil) if res == -1 { - _panic("futex_wake_single failure") + panic_contextless("futex_wake_single failure") } } @@ -77,6 +77,6 @@ _futex_broadcast :: proc "contextless" (f: ^Futex) { res := _unix_futex(f, FUTEX_WAKE_PRIVATE, u32(max(i32)), nil) if res == -1 { - _panic("_futex_wake_all failure") + panic_contextless("_futex_wake_all failure") } } diff --git a/core/sync/futex_wasm.odin b/core/sync/futex_wasm.odin index de88e8198a1..27532587c5b 100644 --- a/core/sync/futex_wasm.odin +++ b/core/sync/futex_wasm.odin @@ -10,7 +10,7 @@ import "core:time" _futex_wait :: proc "contextless" (f: ^Futex, expected: u32) -> bool { when !intrinsics.has_target_feature("atomics") { - _panic("usage of `core:sync` requires the `-target-feature:\"atomics\"` or a `-microarch` that supports it") + panic_contextless("usage of `core:sync` requires the `-target-feature:\"atomics\"` or a `-microarch` that supports it") } else { s := intrinsics.wasm_memory_atomic_wait32((^u32)(f), expected, -1) return s != 0 @@ -19,7 +19,7 @@ _futex_wait :: proc "contextless" (f: ^Futex, expected: u32) -> bool { _futex_wait_with_timeout :: proc "contextless" (f: ^Futex, expected: u32, duration: time.Duration) -> bool { when !intrinsics.has_target_feature("atomics") { - _panic("usage of `core:sync` requires the `-target-feature:\"atomics\"` or a `-microarch` that supports it") + panic_contextless("usage of `core:sync` requires the `-target-feature:\"atomics\"` or a `-microarch` that supports it") } else { s := intrinsics.wasm_memory_atomic_wait32((^u32)(f), expected, i64(duration)) return s != 0 @@ -28,7 +28,7 @@ _futex_wait_with_timeout :: proc "contextless" (f: ^Futex, expected: u32, durati _futex_signal :: proc "contextless" (f: ^Futex) { when !intrinsics.has_target_feature("atomics") { - _panic("usage of `core:sync` requires the `-target-feature:\"atomics\"` or a `-microarch` that supports it") + panic_contextless("usage of `core:sync` requires the `-target-feature:\"atomics\"` or a `-microarch` that supports it") } else { loop: for { s := intrinsics.wasm_memory_atomic_notify32((^u32)(f), 1) @@ -41,7 +41,7 @@ _futex_signal :: proc "contextless" (f: ^Futex) { _futex_broadcast :: proc "contextless" (f: ^Futex) { when !intrinsics.has_target_feature("atomics") { - _panic("usage of `core:sync` requires the `-target-feature:\"atomics\"` or a `-microarch` that supports it") + panic_contextless("usage of `core:sync` requires the `-target-feature:\"atomics\"` or a `-microarch` that supports it") } else { loop: for { s := intrinsics.wasm_memory_atomic_notify32((^u32)(f), ~u32(0)) diff --git a/core/sync/primitives.odin b/core/sync/primitives.odin index a2282448115..8187c904bbd 100644 --- a/core/sync/primitives.odin +++ b/core/sync/primitives.odin @@ -1,6 +1,5 @@ package sync -import "base:runtime" import "core:time" /* @@ -560,7 +559,7 @@ futex_wait :: proc "contextless" (f: ^Futex, expected: u32) { return } ok := _futex_wait(f, expected) - _assert(ok, "futex_wait failure") + assert_contextless(ok, "futex_wait failure") } /* @@ -597,18 +596,3 @@ Wake up multiple threads waiting on a futex. futex_broadcast :: proc "contextless" (f: ^Futex) { _futex_broadcast(f) } - - -@(private) -_assert :: proc "contextless" (cond: bool, msg: string) { - if !cond { - _panic(msg) - } -} - -@(private) -_panic :: proc "contextless" (msg: string) -> ! { - runtime.print_string(msg) - runtime.print_byte('\n') - runtime.trap() -} diff --git a/core/sync/primitives_atomic.odin b/core/sync/primitives_atomic.odin index 076a74b2024..3c4324eb7f8 100644 --- a/core/sync/primitives_atomic.odin +++ b/core/sync/primitives_atomic.odin @@ -240,7 +240,7 @@ atomic_recursive_mutex_lock :: proc "contextless" (m: ^Atomic_Recursive_Mutex) { atomic_recursive_mutex_unlock :: proc "contextless" (m: ^Atomic_Recursive_Mutex) { tid := current_thread_id() - _assert(tid == m.owner, "tid != m.owner") + assert_contextless(tid == m.owner, "tid != m.owner") m.recursion -= 1 recursion := m.recursion if recursion == 0 { From 73f5ab473c4129ae209838d7967286684ac3f462 Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Mon, 9 Sep 2024 14:18:01 -0400 Subject: [PATCH 09/23] Keep `chan.can_recv` from deadlocking --- core/sync/chan/chan.odin | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index 53a3bff4b69..aca08d82ead 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -423,7 +423,7 @@ raw_queue_pop :: proc "contextless" (q: ^Raw_Queue) -> (data: rawptr) { can_recv :: proc "contextless" (c: ^Raw_Chan) -> bool { sync.guard(&c.mutex) if is_buffered(c) { - return len(c) > 0 + return c.queue.len > 0 } return sync.atomic_load(&c.w_waiting) > 0 } From 026aef69e3a42021c5d9666737c7401dc75dc89a Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Mon, 9 Sep 2024 14:42:50 -0400 Subject: [PATCH 10/23] Fix deadlock on sending to full, buffered, closed `Chan` This will also keep messages from being sent to closed, buffered channels in general. --- core/sync/chan/chan.odin | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index aca08d82ead..cb299f23ffd 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -164,12 +164,17 @@ send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) { } if c.queue != nil { // buffered sync.guard(&c.mutex) - for c.queue.len == c.queue.cap { + for !sync.atomic_load(&c.closed) && + c.queue.len == c.queue.cap { sync.atomic_add(&c.w_waiting, 1) sync.wait(&c.w_cond, &c.mutex) sync.atomic_sub(&c.w_waiting, 1) } + if sync.atomic_load(&c.closed) { + return false + } + ok = raw_queue_push(c.queue, msg_in) if sync.atomic_load(&c.r_waiting) > 0 { sync.signal(&c.r_cond) From e9a6a344809daf5c0a3b725dd52e1527382d8c41 Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Mon, 9 Sep 2024 16:04:18 -0400 Subject: [PATCH 11/23] Forbid `chan.try_send` on closed buffered channels --- core/sync/chan/chan.odin | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index cb299f23ffd..f0b04f3b445 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -260,6 +260,10 @@ try_send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) return false } + if sync.atomic_load(&c.closed) { + return false + } + ok = raw_queue_push(c.queue, msg_in) if sync.atomic_load(&c.r_waiting) > 0 { sync.signal(&c.r_cond) From 8a14a656fbd8843628b61e5a20877b40b772482c Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Mon, 9 Sep 2024 16:05:29 -0400 Subject: [PATCH 12/23] Fix `chan.can_send` for unbuffered channels `w_waiting` is the signal that says a caller is waiting to be able to send something. It is incremented upon send and - in the case of an unbuffered channel - it can only hold one message. Therefore, check that `w_waiting` is zero instead. --- core/sync/chan/chan.odin | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index f0b04f3b445..5b9a764b47a 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -444,7 +444,7 @@ can_send :: proc "contextless" (c: ^Raw_Chan) -> bool { if is_buffered(c) { return c.queue.len < c.queue.cap } - return sync.atomic_load(&c.r_waiting) > 0 + return sync.atomic_load(&c.w_waiting) == 0 } From 074314b8874886f8956e9a7e9a773ac059051030 Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Mon, 9 Sep 2024 18:38:29 -0400 Subject: [PATCH 13/23] Fix data race in `test_core_flags` --- tests/core/flags/test_core_flags.odin | 33 ++++++++++++++++----------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/tests/core/flags/test_core_flags.odin b/tests/core/flags/test_core_flags.odin index f8305f2ead9..8fcd6a8a72c 100644 --- a/tests/core/flags/test_core_flags.odin +++ b/tests/core/flags/test_core_flags.odin @@ -12,6 +12,26 @@ import "core:strings" import "core:testing" import "core:time/datetime" +Custom_Data :: struct { + a: int, +} + +@(init) +init_custom_type_setter :: proc() { + // NOTE: This is done here so it can be out of the flow of the + // multi-threaded test runner, to prevent any data races that could be + // reported by using `-sanitize:thread`. + // + // Do mind that this means every test here acknowledges the `Custom_Data` type. + flags.register_type_setter(proc (data: rawptr, data_type: typeid, _, _: string) -> (string, bool, runtime.Allocator_Error) { + if data_type == Custom_Data { + (cast(^Custom_Data)data).a = 32 + return "", true, nil + } + return "", false, nil + }) +} + @(test) test_no_args :: proc(t: ^testing.T) { S :: struct { @@ -1230,9 +1250,6 @@ test_net :: proc(t: ^testing.T) { @(test) test_custom_type_setter :: proc(t: ^testing.T) { Custom_Bool :: distinct bool - Custom_Data :: struct { - a: int, - } S :: struct { a: Custom_Data, @@ -1240,16 +1257,6 @@ test_custom_type_setter :: proc(t: ^testing.T) { } s: S - // NOTE: Mind that this setter is global state, and the test runner is multi-threaded. - // It should be fine so long as all type setter tests are in this one test proc. - flags.register_type_setter(proc (data: rawptr, data_type: typeid, _, _: string) -> (string, bool, runtime.Allocator_Error) { - if data_type == Custom_Data { - (cast(^Custom_Data)data).a = 32 - return "", true, nil - } - return "", false, nil - }) - defer flags.register_type_setter(nil) args := [?]string { "-a:hellope", "-b:true" } result := flags.parse(&s, args[:]) testing.expect_value(t, result, nil) From 3a6010918033e1548e84d57a07074cdbf802ff9b Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Mon, 9 Sep 2024 19:09:16 -0400 Subject: [PATCH 14/23] Fix signalling test child threads crashing test 0 A thread made inside a test does not share the test index of its parent, so any time one of those threads failed an assert, it would tell the runner to shutdown test index zero. --- core/testing/signal_handler_libc.odin | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/core/testing/signal_handler_libc.odin b/core/testing/signal_handler_libc.odin index 27d1a073503..e2d17059534 100644 --- a/core/testing/signal_handler_libc.odin +++ b/core/testing/signal_handler_libc.odin @@ -26,6 +26,8 @@ import "core:os" @(private="file", thread_local) local_test_index: libc.sig_atomic_t +@(private="file", thread_local) +local_test_index_set: bool // Windows does not appear to have a SIGTRAP, so this is defined here, instead // of in the libc package, just so there's no confusion about it being @@ -45,6 +47,13 @@ stop_runner_callback :: proc "c" (sig: libc.int) { @(private="file") stop_test_callback :: proc "c" (sig: libc.int) { + if !local_test_index_set { + // We're a thread created by a test thread. + // + // There's nothing we can do to inform the test runner about who + // signalled, so hopefully the test will handle their own sub-threads. + return + } if local_test_index == -1 { // We're the test runner, and we ourselves have caught a signal from // which there is no recovery. @@ -114,6 +123,7 @@ This is a dire bug and should be reported to the Odin developers. _setup_signal_handler :: proc() { local_test_index = -1 + local_test_index_set = true // Catch user interrupt / CTRL-C. libc.signal(libc.SIGINT, stop_runner_callback) @@ -135,6 +145,7 @@ _setup_signal_handler :: proc() { _setup_task_signal_handler :: proc(test_index: int) { local_test_index = cast(libc.sig_atomic_t)test_index + local_test_index_set = true } _should_stop_runner :: proc() -> bool { From b2c2235e587bb8902dbf35ef84373bb5f616a814 Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Mon, 9 Sep 2024 19:11:44 -0400 Subject: [PATCH 15/23] Fix `recursive_benaphore_try_lock` Previously, if the owner called this, it would fail. --- core/sync/extended.odin | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/sync/extended.odin b/core/sync/extended.odin index 83cc648b477..d5521935afe 100644 --- a/core/sync/extended.odin +++ b/core/sync/extended.odin @@ -474,10 +474,10 @@ recursive_benaphore_try_lock :: proc "contextless" (b: ^Recursive_Benaphore) -> tid := current_thread_id() if b.owner == tid { atomic_add_explicit(&b.counter, 1, .Acquire) - } - - if v, _ := atomic_compare_exchange_strong_explicit(&b.counter, 0, 1, .Acquire, .Acquire); v != 0 { - return false + } else { + if v, _ := atomic_compare_exchange_strong_explicit(&b.counter, 0, 1, .Acquire, .Acquire); v != 0 { + return false + } } // inside the lock b.owner = tid From fec1ccd7a3f0946a84b9914875845cc9f902ee74 Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Mon, 9 Sep 2024 19:15:06 -0400 Subject: [PATCH 16/23] Fix data races in `sync.Recursive_Benaphore` --- core/sync/extended.odin | 31 ++++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/core/sync/extended.odin b/core/sync/extended.odin index d5521935afe..924ba92a691 100644 --- a/core/sync/extended.odin +++ b/core/sync/extended.odin @@ -449,13 +449,15 @@ recursive benaphore, until the lock is released. */ recursive_benaphore_lock :: proc "contextless" (b: ^Recursive_Benaphore) { tid := current_thread_id() - if atomic_add_explicit(&b.counter, 1, .Acquire) > 1 { - if tid != b.owner { - sema_wait(&b.sema) + check_owner: if tid != atomic_load_explicit(&b.owner, .Acquire) { + atomic_add_explicit(&b.counter, 1, .Relaxed) + if _, ok := atomic_compare_exchange_strong_explicit(&b.owner, 0, tid, .Release, .Relaxed); ok { + break check_owner } + sema_wait(&b.sema) + atomic_store_explicit(&b.owner, tid, .Release) } // inside the lock - b.owner = tid b.recursion += 1 } @@ -472,15 +474,14 @@ benaphore, until the lock is released. */ recursive_benaphore_try_lock :: proc "contextless" (b: ^Recursive_Benaphore) -> bool { tid := current_thread_id() - if b.owner == tid { - atomic_add_explicit(&b.counter, 1, .Acquire) - } else { - if v, _ := atomic_compare_exchange_strong_explicit(&b.counter, 0, 1, .Acquire, .Acquire); v != 0 { - return false + check_owner: if tid != atomic_load_explicit(&b.owner, .Acquire) { + if _, ok := atomic_compare_exchange_strong_explicit(&b.owner, 0, tid, .Release, .Relaxed); ok { + atomic_add_explicit(&b.counter, 1, .Relaxed) + break check_owner } + return false } // inside the lock - b.owner = tid b.recursion += 1 return true } @@ -494,14 +495,14 @@ for other threads for entering. */ recursive_benaphore_unlock :: proc "contextless" (b: ^Recursive_Benaphore) { tid := current_thread_id() - assert_contextless(tid == b.owner, "tid != b.owner") + assert_contextless(tid == atomic_load_explicit(&b.owner, .Relaxed), "tid != b.owner") b.recursion -= 1 recursion := b.recursion + if recursion == 0 { - b.owner = 0 - } - if atomic_sub_explicit(&b.counter, 1, .Release) > 0 { - if recursion == 0 { + if atomic_sub_explicit(&b.counter, 1, .Relaxed) == 1 { + atomic_store_explicit(&b.owner, 0, .Release) + } else { sema_post(&b.sema) } } From a1435a6a904b2eb2b154a463c5d60f6c1f55abbc Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Tue, 10 Sep 2024 14:51:00 -0400 Subject: [PATCH 17/23] Fix deadlock in `Auto_Reset_Event` --- core/sync/extended.odin | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/sync/extended.odin b/core/sync/extended.odin index 924ba92a691..0971516a3d2 100644 --- a/core/sync/extended.odin +++ b/core/sync/extended.odin @@ -228,15 +228,14 @@ thread. */ auto_reset_event_signal :: proc "contextless" (e: ^Auto_Reset_Event) { old_status := atomic_load_explicit(&e.status, .Relaxed) + new_status := old_status + 1 if old_status < 1 else 1 for { - new_status := old_status + 1 if old_status < 1 else 1 if _, ok := atomic_compare_exchange_weak_explicit(&e.status, old_status, new_status, .Release, .Relaxed); ok { break } - - if old_status < 0 { - sema_post(&e.sema) - } + } + if old_status < 0 { + sema_post(&e.sema) } } From b1db33b519bf52e2d0e6e42ca9daccc7470d6b8b Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Tue, 10 Sep 2024 19:04:44 -0400 Subject: [PATCH 18/23] Add `cpu_relax` to `sync.auto_reset_event_signal` --- core/sync/extended.odin | 1 + 1 file changed, 1 insertion(+) diff --git a/core/sync/extended.odin b/core/sync/extended.odin index 0971516a3d2..0b1f79df212 100644 --- a/core/sync/extended.odin +++ b/core/sync/extended.odin @@ -233,6 +233,7 @@ auto_reset_event_signal :: proc "contextless" (e: ^Auto_Reset_Event) { if _, ok := atomic_compare_exchange_weak_explicit(&e.status, old_status, new_status, .Release, .Relaxed); ok { break } + cpu_relax() } if old_status < 0 { sema_post(&e.sema) From 2938655a3d8801d1327b0076812edcf357d760df Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Wed, 11 Sep 2024 07:07:09 -0400 Subject: [PATCH 19/23] Fix CPU count detection in FreeBSD & NetBSD --- core/os/os_freebsd.odin | 2 +- core/os/os_netbsd.odin | 2 +- src/gb/gb.h | 12 ++++++++++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/core/os/os_freebsd.odin b/core/os/os_freebsd.odin index c05a06129c8..241f42c0bb0 100644 --- a/core/os/os_freebsd.odin +++ b/core/os/os_freebsd.odin @@ -920,7 +920,7 @@ get_page_size :: proc() -> int { _processor_core_count :: proc() -> int { count : int = 0 count_size := size_of(count) - if _sysctlbyname("hw.logicalcpu", &count, &count_size, nil, 0) == 0 { + if _sysctlbyname("hw.ncpu", &count, &count_size, nil, 0) == 0 { if count > 0 { return count } diff --git a/core/os/os_netbsd.odin b/core/os/os_netbsd.odin index a56c0b78467..ba9b40fc365 100644 --- a/core/os/os_netbsd.odin +++ b/core/os/os_netbsd.odin @@ -978,7 +978,7 @@ get_page_size :: proc() -> int { _processor_core_count :: proc() -> int { count : int = 0 count_size := size_of(count) - if _sysctlbyname("hw.logicalcpu", &count, &count_size, nil, 0) == 0 { + if _sysctlbyname("hw.ncpu", &count, &count_size, nil, 0) == 0 { if count > 0 { return count } diff --git a/src/gb/gb.h b/src/gb/gb.h index 0e65696e260..1fef4b4f511 100644 --- a/src/gb/gb.h +++ b/src/gb/gb.h @@ -3195,11 +3195,11 @@ void gb_affinity_init(gbAffinity *a) { a->core_count = 1; a->threads_per_core = 1; - if (sysctlbyname("hw.logicalcpu", &count, &count_size, NULL, 0) == 0) { + if (sysctlbyname("kern.smp.cpus", &count, &count_size, NULL, 0) == 0) { if (count > 0) { a->thread_count = count; // Get # of physical cores - if (sysctlbyname("hw.physicalcpu", &count, &count_size, NULL, 0) == 0) { + if (sysctlbyname("kern.smp.cores", &count, &count_size, NULL, 0) == 0) { if (count > 0) { a->core_count = count; a->threads_per_core = a->thread_count / count; @@ -3210,6 +3210,14 @@ void gb_affinity_init(gbAffinity *a) { } } } + } else if (sysctlbyname("hw.ncpu", &count, &count_size, NULL, 0) == 0) { + // SMP disabled or unavailable. + if (count > 0) { + a->is_accurate = true; + a->thread_count = count; + a->core_count = count; + a->threads_per_core = 1; + } } } From 16cd16b91e9cabfba2d12e271712ca55cb16fa7d Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Sun, 8 Sep 2024 18:23:28 -0400 Subject: [PATCH 20/23] Fix comments --- core/sync/extended.odin | 18 +++++++++--------- core/sync/primitives.odin | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/core/sync/extended.odin b/core/sync/extended.odin index 0b1f79df212..30b1b277079 100644 --- a/core/sync/extended.odin +++ b/core/sync/extended.odin @@ -8,7 +8,7 @@ _ :: vg Wait group. Wait group is a synchronization primitive used by the waiting thread to wait, -until a all working threads finish work. +until all working threads finish work. The waiting thread first sets the number of working threads it will expect to wait for using `wait_group_add` call, and start waiting using `wait_group_wait` @@ -35,7 +35,7 @@ Wait_Group :: struct #no_copy { /* Increment an internal counter of a wait group. -This procedure atomicaly increments a number to the specified wait group's +This procedure atomically increments a number to the specified wait group's internal counter by a specified amount. This operation can be done on any thread. */ @@ -121,7 +121,7 @@ When `barrier_wait` procedure is called by any thread, that thread will block the execution, until all threads associated with the barrier reach the same point of execution and also call `barrier_wait`. -when barrier is initialized, a `thread_count` parameter is passed, signifying +When a barrier is initialized, a `thread_count` parameter is passed, signifying the amount of participant threads of the barrier. The barrier also keeps track of an internal atomic counter. When a thread calls `barrier_wait`, the internal counter is incremented. When the internal counter reaches `thread_count`, it is @@ -208,7 +208,7 @@ Represents a thread synchronization primitive that, when signalled, releases one single waiting thread and then resets automatically to a state where it can be signalled again. -When a thread calls `auto_reset_event_wait`, it's execution will be blocked, +When a thread calls `auto_reset_event_wait`, its execution will be blocked, until the event is signalled by another thread. The call to `auto_reset_event_signal` wakes up exactly one thread waiting for the event. */ @@ -331,8 +331,8 @@ Benaphore. A benaphore is a combination of an atomic variable and a semaphore that can improve locking efficiency in a no-contention system. Acquiring a benaphore -lock doesn't call into an internal semaphore, if no other thread in a middle of -a critical section. +lock doesn't call into an internal semaphore, if no other thread is in the +middle of a critical section. Once a lock on a benaphore is acquired by a thread, no other thread is allowed into any critical sections, associted with the same benaphore, until the lock @@ -381,7 +381,7 @@ Release a lock on a benaphore. This procedure releases a lock on the specified benaphore. If any of the threads are waiting on the lock, exactly one thread is allowed into a critical section -associated with the same banaphore. +associated with the same benaphore. */ benaphore_unlock :: proc "contextless" (b: ^Benaphore) { if atomic_sub_explicit(&b.counter, 1, .Release) > 1 { @@ -418,8 +418,8 @@ benaphore_guard :: proc "contextless" (m: ^Benaphore) -> bool { /* Recursive benaphore. -Recurisve benaphore is just like a plain benaphore, except it allows reentrancy -into the critical section. +A recursive benaphore is just like a plain benaphore, except it allows +reentrancy into the critical section. When a lock is acquired on a benaphore, all other threads attempting to acquire a lock on the same benaphore will be blocked from any critical sections, diff --git a/core/sync/primitives.odin b/core/sync/primitives.odin index 8187c904bbd..f091de0452d 100644 --- a/core/sync/primitives.odin +++ b/core/sync/primitives.odin @@ -389,7 +389,7 @@ recursive_mutex_guard :: proc "contextless" (m: ^Recursive_Mutex) -> bool { A condition variable. `Cond` implements a condition variable, a rendezvous point for threads waiting -for signalling the occurence of an event. Condition variables are used on +for signalling the occurence of an event. Condition variables are used in conjuction with mutexes to provide a shared access to one or more shared variable. From 7f7cfebc91cc319918bc0042789b0c7a931e56e2 Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Mon, 9 Sep 2024 16:17:08 -0400 Subject: [PATCH 21/23] Add tests for `core:sync` and `core:sync/chan` --- tests/core/normal.odin | 2 + tests/core/sync/chan/test_core_sync_chan.odin | 274 +++++++ tests/core/sync/test_core_sync.odin | 718 ++++++++++++++++++ 3 files changed, 994 insertions(+) create mode 100644 tests/core/sync/chan/test_core_sync_chan.odin create mode 100644 tests/core/sync/test_core_sync.odin diff --git a/tests/core/normal.odin b/tests/core/normal.odin index 6174f2d5cd0..0670842ac8b 100644 --- a/tests/core/normal.odin +++ b/tests/core/normal.odin @@ -39,6 +39,8 @@ download_assets :: proc() { @(require) import "slice" @(require) import "strconv" @(require) import "strings" +@(require) import "sync" +@(require) import "sync/chan" @(require) import "sys/posix" @(require) import "sys/windows" @(require) import "text/i18n" diff --git a/tests/core/sync/chan/test_core_sync_chan.odin b/tests/core/sync/chan/test_core_sync_chan.odin new file mode 100644 index 00000000000..9b8d9b3549d --- /dev/null +++ b/tests/core/sync/chan/test_core_sync_chan.odin @@ -0,0 +1,274 @@ +package test_core_sync_chan + +import "base:runtime" +import "base:intrinsics" +import "core:log" +import "core:math/rand" +import "core:sync/chan" +import "core:testing" +import "core:thread" +import "core:time" + + +Message_Type :: enum i32 { + Result, + Add, + Multiply, + Subtract, + Divide, + End, +} + +Message :: struct { + type: Message_Type, + i: i64, +} + +Comm :: struct { + host: chan.Chan(Message), + client: chan.Chan(Message), + manual_buffering: bool, +} + +BUFFER_SIZE :: 8 +MAX_RAND :: 32 +FAIL_TIME :: 1 * time.Second +SLEEP_TIME :: 1 * time.Millisecond + +comm_client :: proc(th: ^thread.Thread) { + data := cast(^Comm)th.data + manual_buffering := data.manual_buffering + + n: i64 + + for manual_buffering && !chan.can_recv(data.host) { + thread.yield() + } + + recv_loop: for msg in chan.recv(data.host) { + #partial switch msg.type { + case .Add: n += msg.i + case .Multiply: n *= msg.i + case .Subtract: n -= msg.i + case .Divide: n /= msg.i + case .End: + break recv_loop + case: + panic("Unknown message type for client.") + } + + for manual_buffering && !chan.can_recv(data.host) { + thread.yield() + } + } + + for manual_buffering && !chan.can_send(data.host) { + thread.yield() + } + + chan.send(data.client, Message{.Result, n}) + chan.close(data.client) +} + +send_messages :: proc(t: ^testing.T, host: chan.Chan(Message), manual_buffering: bool = false) -> (expected: i64) { + expected = 1 + for manual_buffering && !chan.can_send(host) { + thread.yield() + } + chan.send(host, Message{.Add, 1}) + log.debug(Message{.Add, 1}) + + for _ in 0..<1+2*BUFFER_SIZE { + msg: Message + msg.i = 1 + rand.int63_max(MAX_RAND) + switch rand.int_max(4) { + case 0: + msg.type = .Add + expected += msg.i + case 1: + msg.type = .Multiply + expected *= msg.i + case 2: + msg.type = .Subtract + expected -= msg.i + case 3: + msg.type = .Divide + expected /= msg.i + } + + for manual_buffering && !chan.can_send(host) { + thread.yield() + } + if manual_buffering { + testing.expect(t, chan.len(host) == 0) + } + + chan.send(host, msg) + log.debug(msg) + } + + for manual_buffering && !chan.can_send(host) { + thread.yield() + } + chan.send(host, Message{.End, 0}) + log.debug(Message{.End, 0}) + chan.close(host) + + return +} + +@test +test_chan_buffered :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + comm: Comm + alloc_err: runtime.Allocator_Error + comm.host, alloc_err = chan.create_buffered(chan.Chan(Message), BUFFER_SIZE, context.allocator) + assert(alloc_err == nil, "allocation failed") + comm.client, alloc_err = chan.create_buffered(chan.Chan(Message), BUFFER_SIZE, context.allocator) + assert(alloc_err == nil, "allocation failed") + defer { + chan.destroy(comm.host) + chan.destroy(comm.client) + } + + testing.expect(t, chan.is_buffered(comm.host)) + testing.expect(t, chan.is_buffered(comm.client)) + testing.expect(t, !chan.is_unbuffered(comm.host)) + testing.expect(t, !chan.is_unbuffered(comm.client)) + testing.expect_value(t, chan.len(comm.host), 0) + testing.expect_value(t, chan.len(comm.client), 0) + testing.expect_value(t, chan.cap(comm.host), BUFFER_SIZE) + testing.expect_value(t, chan.cap(comm.client), BUFFER_SIZE) + + reckoner := thread.create(comm_client) + defer thread.destroy(reckoner) + reckoner.data = &comm + thread.start(reckoner) + + expected := send_messages(t, comm.host, manual_buffering = false) + + // Sleep so we can give the other thread enough time to buffer its message. + time.sleep(SLEEP_TIME) + + testing.expect_value(t, chan.len(comm.client), 1) + result, ok := chan.try_recv(comm.client) + + // One more sleep to ensure it has enough time to close. + time.sleep(SLEEP_TIME) + + testing.expect_value(t, chan.is_closed(comm.client), true) + testing.expect_value(t, ok, true) + testing.expect_value(t, result.i, expected) + log.debug(result, expected) + + // Make sure sending to closed channels fails. + testing.expect_value(t, chan.send(comm.host, Message{.End, 0}), false) + testing.expect_value(t, chan.send(comm.client, Message{.End, 0}), false) + testing.expect_value(t, chan.try_send(comm.host, Message{.End, 0}), false) + testing.expect_value(t, chan.try_send(comm.client, Message{.End, 0}), false) + _, ok = chan.recv(comm.host); testing.expect_value(t, ok, false) + _, ok = chan.recv(comm.client); testing.expect_value(t, ok, false) + _, ok = chan.try_recv(comm.host); testing.expect_value(t, ok, false) + _, ok = chan.try_recv(comm.client); testing.expect_value(t, ok, false) +} + +@test +test_chan_unbuffered :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + comm: Comm + comm.manual_buffering = true + alloc_err: runtime.Allocator_Error + comm.host, alloc_err = chan.create_unbuffered(chan.Chan(Message), context.allocator) + assert(alloc_err == nil, "allocation failed") + comm.client, alloc_err = chan.create_unbuffered(chan.Chan(Message), context.allocator) + assert(alloc_err == nil, "allocation failed") + defer { + chan.destroy(comm.host) + chan.destroy(comm.client) + } + + testing.expect(t, !chan.is_buffered(comm.host)) + testing.expect(t, !chan.is_buffered(comm.client)) + testing.expect(t, chan.is_unbuffered(comm.host)) + testing.expect(t, chan.is_unbuffered(comm.client)) + testing.expect_value(t, chan.len(comm.host), 0) + testing.expect_value(t, chan.len(comm.client), 0) + testing.expect_value(t, chan.cap(comm.host), 0) + testing.expect_value(t, chan.cap(comm.client), 0) + + reckoner := thread.create(comm_client) + defer thread.destroy(reckoner) + reckoner.data = &comm + thread.start(reckoner) + + for !chan.can_send(comm.client) { + thread.yield() + } + + expected := send_messages(t, comm.host) + testing.expect_value(t, chan.is_closed(comm.host), true) + + for !chan.can_recv(comm.client) { + thread.yield() + } + + result, ok := chan.try_recv(comm.client) + testing.expect_value(t, ok, true) + testing.expect_value(t, result.i, expected) + log.debug(result, expected) + + // Sleep so we can give the other thread enough time to close its side + // after we've received its message. + time.sleep(SLEEP_TIME) + + testing.expect_value(t, chan.is_closed(comm.client), true) + + // Make sure sending and receiving on closed channels fails. + testing.expect_value(t, chan.send(comm.host, Message{.End, 0}), false) + testing.expect_value(t, chan.send(comm.client, Message{.End, 0}), false) + testing.expect_value(t, chan.try_send(comm.host, Message{.End, 0}), false) + testing.expect_value(t, chan.try_send(comm.client, Message{.End, 0}), false) + _, ok = chan.recv(comm.host); testing.expect_value(t, ok, false) + _, ok = chan.recv(comm.client); testing.expect_value(t, ok, false) + _, ok = chan.try_recv(comm.host); testing.expect_value(t, ok, false) + _, ok = chan.try_recv(comm.client); testing.expect_value(t, ok, false) +} + +@test +test_full_buffered_closed_chan_deadlock :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + ch, alloc_err := chan.create_buffered(chan.Chan(int), 1, context.allocator) + assert(alloc_err == nil, "allocation failed") + defer chan.destroy(ch) + + testing.expect(t, chan.can_send(ch)) + testing.expect(t, chan.send(ch, 32)) + testing.expect(t, chan.close(ch)) + testing.expect(t, !chan.send(ch, 32)) +} + +// This test guarantees a buffered channel's messages can still be received +// even after closing. This is currently how the API works. If that changes, +// this test will need to change. +@test +test_accept_message_from_closed_buffered_chan :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + ch, alloc_err := chan.create_buffered(chan.Chan(int), 2, context.allocator) + assert(alloc_err == nil, "allocation failed") + defer chan.destroy(ch) + + testing.expect(t, chan.can_send(ch)) + testing.expect(t, chan.send(ch, 32)) + testing.expect(t, chan.send(ch, 64)) + testing.expect(t, chan.close(ch)) + result, ok := chan.recv(ch) + testing.expect_value(t, result, 32) + testing.expect(t, ok) + result, ok = chan.try_recv(ch) + testing.expect_value(t, result, 64) + testing.expect(t, ok) +} diff --git a/tests/core/sync/test_core_sync.odin b/tests/core/sync/test_core_sync.odin new file mode 100644 index 00000000000..32c08f9351d --- /dev/null +++ b/tests/core/sync/test_core_sync.odin @@ -0,0 +1,718 @@ +// NOTE(Feoramund): These tests should be run a few hundred times, with and +// without `-sanitize:thread` enabled, to ensure maximum safety. +// +// Keep in mind that running with the debug logs uncommented can result in +// failures disappearing due to the delay of sending the log message causing +// different synchronization patterns. +// +// These tests are temporarily disabled on Darwin, as there is currently a +// stall occurring which I cannot debug. + +//+build !darwin +package test_core_sync + +import "base:intrinsics" +// import "core:log" +import "core:sync" +import "core:testing" +import "core:thread" +import "core:time" + +FAIL_TIME :: 1 * time.Second +SLEEP_TIME :: 1 * time.Millisecond +SMALL_SLEEP_TIME :: 10 * time.Microsecond + +// This needs to be high enough to cause a data race if any of the +// synchronization primitives fail. +THREADS :: 8 + +// Manually wait on all threads to finish. +// +// This reduces a dependency on a `Wait_Group` or similar primitives. +// +// It's also important that we wait for every thread to finish, as it's +// possible for a thread to finish after the test if we don't check, despite +// joining it to the test thread. +wait_for :: proc(threads: []^thread.Thread) { + wait_loop: for { + count := len(threads) + for v in threads { + if thread.is_done(v) { + count -= 1 + } + } + if count == 0 { + break wait_loop + } + thread.yield() + } + for t in threads { + thread.join(t) + thread.destroy(t) + } +} + +// +// core:sync/primitives.odin +// + +@test +test_mutex :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + m: sync.Mutex, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + // log.debugf("MUTEX-%v> locking", th.id) + sync.mutex_lock(&data.m) + data.number += 1 + // log.debugf("MUTEX-%v> unlocking", th.id) + sync.mutex_unlock(&data.m) + // log.debugf("MUTEX-%v> leaving", th.id) + } + + data: Data + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + wait_for(threads[:]) + + testing.expect_value(t, data.number, THREADS) +} + +@test +test_rw_mutex :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + m1: sync.RW_Mutex, + m2: sync.RW_Mutex, + number1: int, + number2: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + sync.rw_mutex_shared_lock(&data.m1) + n := data.number1 + sync.rw_mutex_shared_unlock(&data.m1) + + sync.rw_mutex_lock(&data.m2) + data.number2 += n + sync.rw_mutex_unlock(&data.m2) + } + + data: Data + threads: [THREADS]^thread.Thread + + sync.rw_mutex_lock(&data.m1) + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + data.number1 = 1 + sync.rw_mutex_unlock(&data.m1) + + wait_for(threads[:]) + + testing.expect_value(t, data.number2, THREADS) +} + +@test +test_recursive_mutex :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + m: sync.Recursive_Mutex, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + // log.debugf("REC_MUTEX-%v> locking", th.id) + tried1 := sync.recursive_mutex_try_lock(&data.m) + for _ in 0..<3 { + sync.recursive_mutex_lock(&data.m) + } + tried2 := sync.recursive_mutex_try_lock(&data.m) + // log.debugf("REC_MUTEX-%v> locked", th.id) + data.number += 1 + // log.debugf("REC_MUTEX-%v> unlocking", th.id) + for _ in 0..<3 { + sync.recursive_mutex_unlock(&data.m) + } + if tried1 { sync.recursive_mutex_unlock(&data.m) } + if tried2 { sync.recursive_mutex_unlock(&data.m) } + // log.debugf("REC_MUTEX-%v> leaving", th.id) + } + + data: Data + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + wait_for(threads[:]) + + testing.expect_value(t, data.number, THREADS) +} + +@test +test_cond :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + c: sync.Cond, + m: sync.Mutex, + i: int, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + sync.mutex_lock(&data.m) + + for intrinsics.atomic_load(&data.i) != 1 { + sync.cond_wait(&data.c, &data.m) + } + + data.number += intrinsics.atomic_load(&data.i) + + sync.mutex_unlock(&data.m) + } + + data: Data + threads: [THREADS]^thread.Thread + data.i = -1 + + sync.mutex_lock(&data.m) + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + time.sleep(SLEEP_TIME) + data.i = 1 + sync.mutex_unlock(&data.m) + sync.cond_broadcast(&data.c) + + wait_for(threads[:]) + + testing.expect_value(t, data.number, THREADS) +} + +@test +test_cond_with_timeout :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + c: sync.Cond + m: sync.Mutex + sync.mutex_lock(&m) + sync.cond_wait_with_timeout(&c, &m, SLEEP_TIME) +} + +@test +test_semaphore :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + s: sync.Sema, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + // log.debugf("SEM-%v> waiting", th.id) + sync.sema_wait(&data.s) + data.number += 1 + // log.debugf("SEM-%v> posting", th.id) + sync.sema_post(&data.s) + // log.debugf("SEM-%v> leaving", th.id) + } + + data: Data + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + sync.sema_post(&data.s) + + wait_for(threads[:]) + + testing.expect_value(t, data.number, THREADS) +} + +@test +test_semaphore_with_timeout :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + s: sync.Sema + sync.sema_wait_with_timeout(&s, SLEEP_TIME) +} + +@test +test_futex :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + f: sync.Futex, + i: int, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + // log.debugf("FUTEX-%v> waiting", th.id) + sync.futex_wait(&data.f, 3) + // log.debugf("FUTEX-%v> done", th.id) + + n := data.i + intrinsics.atomic_add(&data.number, n) + } + + data: Data + data.i = -1 + data.f = 3 + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + data.i = 1 + // Change the futex variable to keep late-starters from stalling. + data.f = 0 + sync.futex_broadcast(&data.f) + + wait_for(threads[:]) + + testing.expect_value(t, data.number, THREADS) +} + +@test +test_futex_with_timeout :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + f: sync.Futex = 1 + sync.futex_wait_with_timeout(&f, 1, SLEEP_TIME) +} + +// +// core:sync/extended.odin +// + +@test +test_wait_group :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + step1: sync.Wait_Group, + step2: sync.Wait_Group, + i: int, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + sync.wait_group_wait(&data.step1) + + n := data.i + intrinsics.atomic_add(&data.number, n) + + sync.wait_group_done(&data.step2) + } + + data: Data + data.i = -1 + threads: [THREADS]^thread.Thread + + sync.wait_group_add(&data.step1, 1) + sync.wait_group_add(&data.step2, THREADS) + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + time.sleep(SMALL_SLEEP_TIME) + data.i = 1 + sync.wait_group_done(&data.step1) + + sync.wait_group_wait(&data.step2) + + wait_for(threads[:]) + + testing.expect_value(t, data.step1.counter, 0) + testing.expect_value(t, data.step2.counter, 0) + testing.expect_value(t, data.number, THREADS) +} + +@test +test_wait_group_with_timeout :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + wg: sync.Wait_Group + sync.wait_group_wait_with_timeout(&wg, SLEEP_TIME) +} + +@test +test_barrier :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + b: sync.Barrier, + i: int, + number: int, + + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + sync.barrier_wait(&data.b) + + intrinsics.atomic_add(&data.number, data.i) + } + + data: Data + data.i = -1 + threads: [THREADS]^thread.Thread + + sync.barrier_init(&data.b, THREADS + 1) // +1 for this thread, of course. + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + time.sleep(SMALL_SLEEP_TIME) + data.i = 1 + sync.barrier_wait(&data.b) + + wait_for(threads[:]) + + testing.expect_value(t, data.b.index, 0) + testing.expect_value(t, data.b.generation_id, 1) + testing.expect_value(t, data.b.thread_count, THREADS + 1) + testing.expect_value(t, data.number, THREADS) +} + +@test +test_auto_reset :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + a: sync.Auto_Reset_Event, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + // log.debugf("AUR-%v> entering", th.id) + sync.auto_reset_event_wait(&data.a) + // log.debugf("AUR-%v> adding", th.id) + data.number += 1 + // log.debugf("AUR-%v> signalling", th.id) + sync.auto_reset_event_signal(&data.a) + // log.debugf("AUR-%v> leaving", th.id) + } + + data: Data + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + // There is a chance that this test can stall if a signal is sent before + // all threads are queued, because it's possible for some number of threads + // to get to the waiting state, the signal to fire, all of the waited + // threads to pass successfully, then the other threads come in with no-one + // to run a signal. + // + // So we'll just test a fully-waited queue of cascading threads. + for { + status := intrinsics.atomic_load(&data.a.status) + if status == -THREADS { + // log.debug("All Auto_Reset_Event threads have queued.") + break + } + intrinsics.cpu_relax() + } + + sync.auto_reset_event_signal(&data.a) + + wait_for(threads[:]) + + // The last thread should leave this primitive in a signalled state. + testing.expect_value(t, data.a.status, 1) + testing.expect_value(t, data.number, THREADS) +} + +@test +test_auto_reset_already_signalled :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + a: sync.Auto_Reset_Event + sync.auto_reset_event_signal(&a) + sync.auto_reset_event_wait(&a) + testing.expect_value(t, a.status, 0) +} + +@test +test_ticket_mutex :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + m: sync.Ticket_Mutex, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + // log.debugf("TIC-%i> entering", th.id) + // intrinsics.debug_trap() + sync.ticket_mutex_lock(&data.m) + // log.debugf("TIC-%i> locked", th.id) + data.number += 1 + // log.debugf("TIC-%i> unlocking", th.id) + sync.ticket_mutex_unlock(&data.m) + // log.debugf("TIC-%i> leaving", th.id) + } + + data: Data + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + wait_for(threads[:]) + + testing.expect_value(t, data.m.ticket, THREADS) + testing.expect_value(t, data.m.serving, THREADS) + testing.expect_value(t, data.number, THREADS) +} + +@test +test_benaphore :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + b: sync.Benaphore, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + sync.benaphore_lock(&data.b) + data.number += 1 + sync.benaphore_unlock(&data.b) + } + + data: Data + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + wait_for(threads[:]) + + testing.expect_value(t, data.b.counter, 0) + testing.expect_value(t, data.number, THREADS) +} + +@test +test_recursive_benaphore :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + b: sync.Recursive_Benaphore, + number: int, + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + + // log.debugf("REC_BEP-%i> entering", th.id) + tried1 := sync.recursive_benaphore_try_lock(&data.b) + for _ in 0..<3 { + sync.recursive_benaphore_lock(&data.b) + } + tried2 := sync.recursive_benaphore_try_lock(&data.b) + // log.debugf("REC_BEP-%i> locked", th.id) + data.number += 1 + for _ in 0..<3 { + sync.recursive_benaphore_unlock(&data.b) + } + if tried1 { sync.recursive_benaphore_unlock(&data.b) } + if tried2 { sync.recursive_benaphore_unlock(&data.b) } + // log.debugf("REC_BEP-%i> leaving", th.id) + } + + data: Data + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + wait_for(threads[:]) + + // The benaphore should be unowned at the end. + testing.expect_value(t, data.b.counter, 0) + testing.expect_value(t, data.b.owner, 0) + testing.expect_value(t, data.b.recursion, 0) + testing.expect_value(t, data.number, THREADS) +} + +@test +test_once :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + once: sync.Once, + number: int, + } + + write :: proc "contextless" (data: rawptr) { + data := cast(^Data)data + data.number += 1 + } + + p :: proc(th: ^thread.Thread) { + data := cast(^Data)th.data + // log.debugf("ONCE-%v> entering", th.id) + sync.once_do_with_data_contextless(&data.once, write, data) + // log.debugf("ONCE-%v> leaving", th.id) + } + + data: Data + threads: [THREADS]^thread.Thread + + for &v in threads { + v = thread.create(p) + v.data = &data + v.init_context = context + thread.start(v) + } + + wait_for(threads[:]) + + testing.expect_value(t, data.once.done, true) + testing.expect_value(t, data.number, 1) +} + +@test +test_park :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + car: sync.Parker, + number: int, + } + + data: Data + + th := thread.create_and_start_with_data(&data, proc(data: rawptr) { + data := cast(^Data)data + time.sleep(SLEEP_TIME) + sync.unpark(&data.car) + data.number += 1 + }) + + sync.park(&data.car) + + wait_for([]^thread.Thread{ th }) + + PARKER_EMPTY :: 0 + testing.expect_value(t, data.car.state, PARKER_EMPTY) + testing.expect_value(t, data.number, 1) +} + +@test +test_park_with_timeout :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + car: sync.Parker + sync.park_with_timeout(&car, SLEEP_TIME) +} + +@test +test_one_shot_event :: proc(t: ^testing.T) { + testing.set_fail_timeout(t, FAIL_TIME) + + Data :: struct { + event: sync.One_Shot_Event, + number: int, + } + + data: Data + + th := thread.create_and_start_with_data(&data, proc(data: rawptr) { + data := cast(^Data)data + time.sleep(SLEEP_TIME) + sync.one_shot_event_signal(&data.event) + data.number += 1 + }) + + sync.one_shot_event_wait(&data.event) + + wait_for([]^thread.Thread{ th }) + + testing.expect_value(t, data.event.state, 1) + testing.expect_value(t, data.number, 1) +} From d38f5ffb49733b5084b9419412acf9381e3073d4 Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Sun, 15 Sep 2024 22:00:53 -0400 Subject: [PATCH 22/23] Remove unneeded synchronizations in `Chan` Everything was already guarded by `c.mutex`. --- core/sync/chan/chan.odin | 78 ++++++++++++++++++---------------------- 1 file changed, 35 insertions(+), 43 deletions(-) diff --git a/core/sync/chan/chan.odin b/core/sync/chan/chan.odin index 5b9a764b47a..c470d15f3e5 100644 --- a/core/sync/chan/chan.odin +++ b/core/sync/chan/chan.odin @@ -22,19 +22,17 @@ Raw_Chan :: struct { allocator: runtime.Allocator, allocation_size: int, msg_size: u16, - closed: b16, // atomic + closed: b16, // guarded by `mutex` mutex: sync.Mutex, r_cond: sync.Cond, w_cond: sync.Cond, - r_waiting: int, // atomic - w_waiting: int, // atomic + r_waiting: int, // guarded by `mutex` + w_waiting: int, // guarded by `mutex` // Buffered queue: ^Raw_Queue, // Unbuffered - r_mutex: sync.Mutex, - w_mutex: sync.Mutex, unbuffered_data: rawptr, } @@ -164,32 +162,30 @@ send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) { } if c.queue != nil { // buffered sync.guard(&c.mutex) - for !sync.atomic_load(&c.closed) && - c.queue.len == c.queue.cap { - sync.atomic_add(&c.w_waiting, 1) + for !c.closed && c.queue.len == c.queue.cap { + c.w_waiting += 1 sync.wait(&c.w_cond, &c.mutex) - sync.atomic_sub(&c.w_waiting, 1) + c.w_waiting -= 1 } - if sync.atomic_load(&c.closed) { + if c.closed { return false } ok = raw_queue_push(c.queue, msg_in) - if sync.atomic_load(&c.r_waiting) > 0 { + if c.r_waiting > 0 { sync.signal(&c.r_cond) } } else if c.unbuffered_data != nil { // unbuffered - sync.guard(&c.w_mutex) sync.guard(&c.mutex) - if sync.atomic_load(&c.closed) { + if c.closed { return false } mem.copy(c.unbuffered_data, msg_in, int(c.msg_size)) - sync.atomic_add(&c.w_waiting, 1) - if sync.atomic_load(&c.r_waiting) > 0 { + c.w_waiting += 1 + if c.r_waiting > 0 { sync.signal(&c.r_cond) } sync.wait(&c.w_cond, &c.mutex) @@ -206,13 +202,13 @@ recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) { if c.queue != nil { // buffered sync.guard(&c.mutex) for c.queue.len == 0 { - if sync.atomic_load(&c.closed) { + if c.closed { return } - sync.atomic_add(&c.r_waiting, 1) + c.r_waiting += 1 sync.wait(&c.r_cond, &c.mutex) - sync.atomic_sub(&c.r_waiting, 1) + c.r_waiting -= 1 } msg := raw_queue_pop(c.queue) @@ -220,27 +216,26 @@ recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> (ok: bool) { mem.copy(msg_out, msg, int(c.msg_size)) } - if sync.atomic_load(&c.w_waiting) > 0 { + if c.w_waiting > 0 { sync.signal(&c.w_cond) } ok = true } else if c.unbuffered_data != nil { // unbuffered - sync.guard(&c.r_mutex) sync.guard(&c.mutex) - for !sync.atomic_load(&c.closed) && - sync.atomic_load(&c.w_waiting) == 0 { - sync.atomic_add(&c.r_waiting, 1) + for !c.closed && + c.w_waiting == 0 { + c.r_waiting += 1 sync.wait(&c.r_cond, &c.mutex) - sync.atomic_sub(&c.r_waiting, 1) + c.r_waiting -= 1 } - if sync.atomic_load(&c.closed) { + if c.closed { return } mem.copy(msg_out, c.unbuffered_data, int(c.msg_size)) - sync.atomic_sub(&c.w_waiting, 1) + c.w_waiting -= 1 sync.signal(&c.w_cond) ok = true @@ -260,25 +255,24 @@ try_send_raw :: proc "contextless" (c: ^Raw_Chan, msg_in: rawptr) -> (ok: bool) return false } - if sync.atomic_load(&c.closed) { + if c.closed { return false } ok = raw_queue_push(c.queue, msg_in) - if sync.atomic_load(&c.r_waiting) > 0 { + if c.r_waiting > 0 { sync.signal(&c.r_cond) } } else if c.unbuffered_data != nil { // unbuffered - sync.guard(&c.w_mutex) sync.guard(&c.mutex) - if sync.atomic_load(&c.closed) { + if c.closed { return false } mem.copy(c.unbuffered_data, msg_in, int(c.msg_size)) - sync.atomic_add(&c.w_waiting, 1) - if sync.atomic_load(&c.r_waiting) > 0 { + c.w_waiting += 1 + if c.r_waiting > 0 { sync.signal(&c.r_cond) } sync.wait(&c.w_cond, &c.mutex) @@ -303,21 +297,19 @@ try_recv_raw :: proc "contextless" (c: ^Raw_Chan, msg_out: rawptr) -> bool { mem.copy(msg_out, msg, int(c.msg_size)) } - if sync.atomic_load(&c.w_waiting) > 0 { + if c.w_waiting > 0 { sync.signal(&c.w_cond) } return true } else if c.unbuffered_data != nil { // unbuffered - sync.guard(&c.r_mutex) sync.guard(&c.mutex) - if sync.atomic_load(&c.closed) || - sync.atomic_load(&c.w_waiting) == 0 { + if c.closed || c.w_waiting == 0 { return false } mem.copy(msg_out, c.unbuffered_data, int(c.msg_size)) - sync.atomic_sub(&c.w_waiting, 1) + c.w_waiting -= 1 sync.signal(&c.w_cond) return true @@ -360,10 +352,10 @@ close :: proc "contextless" (c: ^Raw_Chan) -> bool { return false } sync.guard(&c.mutex) - if sync.atomic_load(&c.closed) { + if c.closed { return false } - sync.atomic_store(&c.closed, true) + c.closed = true sync.broadcast(&c.r_cond) sync.broadcast(&c.w_cond) return true @@ -375,7 +367,7 @@ is_closed :: proc "contextless" (c: ^Raw_Chan) -> bool { return true } sync.guard(&c.mutex) - return bool(sync.atomic_load(&c.closed)) + return bool(c.closed) } @@ -434,7 +426,7 @@ can_recv :: proc "contextless" (c: ^Raw_Chan) -> bool { if is_buffered(c) { return c.queue.len > 0 } - return sync.atomic_load(&c.w_waiting) > 0 + return c.w_waiting > 0 } @@ -444,7 +436,7 @@ can_send :: proc "contextless" (c: ^Raw_Chan) -> bool { if is_buffered(c) { return c.queue.len < c.queue.cap } - return sync.atomic_load(&c.w_waiting) == 0 + return c.w_waiting == 0 } @@ -493,4 +485,4 @@ select_raw :: proc "odin" (recvs: []^Raw_Chan, sends: []^Raw_Chan, send_msgs: [] ok = send_raw(sends[sel.idx], send_msgs[sel.idx]) } return -} \ No newline at end of file +} From 16ef59700b68989beff48039a450ef6153b2e6af Mon Sep 17 00:00:00 2001 From: Feoramund <161657516+Feoramund@users.noreply.github.com> Date: Sun, 15 Sep 2024 23:58:03 -0400 Subject: [PATCH 23/23] Check for `EINTR` in `sys/posix` test --- tests/core/sys/posix/structs.odin | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/core/sys/posix/structs.odin b/tests/core/sys/posix/structs.odin index bdb1c24e316..95b6258f4e9 100644 --- a/tests/core/sys/posix/structs.odin +++ b/tests/core/sys/posix/structs.odin @@ -63,6 +63,9 @@ execute_struct_checks :: proc(t: ^testing.T) { waiting: for { status: i32 wpid := posix.waitpid(pid, &status, {}) + if status == posix.EINTR { + continue + } if !testing.expectf(t, wpid != -1, "waitpid() failure: %v", posix.strerror()) { return false }