From 2599db29904a8b966991003522a6417c46e11997 Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Sun, 27 Oct 2024 10:26:23 +0100 Subject: [PATCH 1/9] internal/task: implement atomic primitives for preemptive scheduling --- src/internal/task/atomic-cooperative.go | 2 + src/internal/task/atomic-preemptive.go | 14 +++++ src/internal/task/futex-cooperative.go | 2 + src/internal/task/futex-preemptive.go | 7 +++ src/internal/task/mutex-cooperative.go | 2 + src/internal/task/mutex-preemptive.go | 71 +++++++++++++++++++++++++ src/internal/task/pmutex-cooperative.go | 2 + src/internal/task/pmutex-preemptive.go | 11 ++++ 8 files changed, 111 insertions(+) create mode 100644 src/internal/task/atomic-preemptive.go create mode 100644 src/internal/task/futex-preemptive.go create mode 100644 src/internal/task/mutex-preemptive.go create mode 100644 src/internal/task/pmutex-preemptive.go diff --git a/src/internal/task/atomic-cooperative.go b/src/internal/task/atomic-cooperative.go index 60eb917a8e..bd4cba8956 100644 --- a/src/internal/task/atomic-cooperative.go +++ b/src/internal/task/atomic-cooperative.go @@ -1,3 +1,5 @@ +//go:build !scheduler.threads + package task // Atomics implementation for cooperative systems. The atomic types here aren't diff --git a/src/internal/task/atomic-preemptive.go b/src/internal/task/atomic-preemptive.go new file mode 100644 index 0000000000..275f36dce4 --- /dev/null +++ b/src/internal/task/atomic-preemptive.go @@ -0,0 +1,14 @@ +//go:build scheduler.threads + +package task + +// Atomics implementation for non-cooperative systems (multithreaded, etc). +// These atomic types use real atomic instructions. + +import "sync/atomic" + +type ( + Uintptr = atomic.Uintptr + Uint32 = atomic.Uint32 + Uint64 = atomic.Uint64 +) diff --git a/src/internal/task/futex-cooperative.go b/src/internal/task/futex-cooperative.go index 8351f88774..2a42c28d43 100644 --- a/src/internal/task/futex-cooperative.go +++ b/src/internal/task/futex-cooperative.go @@ -1,3 +1,5 @@ +//go:build !scheduler.threads + package task // A futex is a way for userspace to wait with the pointer as the key, and for diff --git a/src/internal/task/futex-preemptive.go b/src/internal/task/futex-preemptive.go new file mode 100644 index 0000000000..7f9e89580c --- /dev/null +++ b/src/internal/task/futex-preemptive.go @@ -0,0 +1,7 @@ +//go:build scheduler.threads + +package task + +import "internal/futex" + +type Futex = futex.Futex diff --git a/src/internal/task/mutex-cooperative.go b/src/internal/task/mutex-cooperative.go index e40966bed4..f1205eea25 100644 --- a/src/internal/task/mutex-cooperative.go +++ b/src/internal/task/mutex-cooperative.go @@ -1,3 +1,5 @@ +//go:build !scheduler.threads + package task type Mutex struct { diff --git a/src/internal/task/mutex-preemptive.go b/src/internal/task/mutex-preemptive.go new file mode 100644 index 0000000000..27f4646698 --- /dev/null +++ b/src/internal/task/mutex-preemptive.go @@ -0,0 +1,71 @@ +//go:build scheduler.threads + +package task + +// Futex-based mutex. +// This is largely based on the paper "Futexes are Tricky" by Ulrich Drepper. +// It describes a few ways to implement mutexes using a futex, and how some +// seemingly-obvious implementations don't exactly work as intended. +// Unfortunately, Go atomic operations work slightly differently so we can't +// copy the algorithm verbatim. +// +// The implementation works like this. The futex can have 3 different values, +// depending on the state: +// +// - 0: the futex is currently unlocked. +// - 1: the futex is locked, but is uncontended. There is one special case: if +// a contended futex is unlocked, it is set to 0. It is possible for another +// thread to lock the futex before the next waiter is woken. But because a +// waiter will be woken (if there is one), it will always change to 2 +// regardless. So this is not a problem. +// - 2: the futex is locked, and is contended. At least one thread is trying +// to obtain the lock (and is in the contended loop, see below). +// +// For the paper, see: +// https://dept-info.labri.fr/~denis/Enseignement/2008-IR/Articles/01-futex.pdf) + +type Mutex struct { + futex Futex +} + +func (m *Mutex) Lock() { + // Fast path: try to take an uncontended lock. + if m.futex.CompareAndSwap(0, 1) { + // We obtained the mutex. + return + } + + // The futex is contended, so we enter the contended loop. + // If we manage to change the futex from 0 to 2, we managed to take the + // lock. Else, we have to wait until a call to Unlock unlocks this mutex. + // (Unlock will wake one waiter when it finds the futex is set to 2 when + // unlocking). + for m.futex.Swap(2) != 0 { + // Wait until we get resumed in Unlock. + m.futex.Wait(2) + } +} + +func (m *Mutex) Unlock() { + if old := m.futex.Swap(0); old == 0 { + // Mutex wasn't locked before. + panic("sync: unlock of unlocked Mutex") + } else if old == 2 { + // Mutex was a contended lock, so we need to wake the next waiter. + m.futex.Wake() + } +} + +// TryLock tries to lock m and reports whether it succeeded. +// +// Note that while correct uses of TryLock do exist, they are rare, +// and use of TryLock is often a sign of a deeper problem +// in a particular use of mutexes. +func (m *Mutex) TryLock() bool { + // Fast path: try to take an uncontended lock. + if m.futex.CompareAndSwap(0, 1) { + // We obtained the mutex. + return true + } + return false +} diff --git a/src/internal/task/pmutex-cooperative.go b/src/internal/task/pmutex-cooperative.go index ae2aa4bad8..0e6c4f828b 100644 --- a/src/internal/task/pmutex-cooperative.go +++ b/src/internal/task/pmutex-cooperative.go @@ -1,3 +1,5 @@ +//go:build !scheduler.threads + package task // PMutex is a real mutex on systems that can be either preemptive or threaded, diff --git a/src/internal/task/pmutex-preemptive.go b/src/internal/task/pmutex-preemptive.go new file mode 100644 index 0000000000..10f0a63561 --- /dev/null +++ b/src/internal/task/pmutex-preemptive.go @@ -0,0 +1,11 @@ +//go:build scheduler.threads + +package task + +// PMutex is a real mutex on systems that can be either preemptive or threaded, +// and a dummy lock on other (purely cooperative) systems. +// +// It is mainly useful for short operations that need a lock when threading may +// be involved, but which do not need a lock with a purely cooperative +// scheduler. +type PMutex = Mutex From 8659b18d6f487e438959bf901c67bdf6b0862d18 Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Sat, 2 Nov 2024 10:47:28 +0100 Subject: [PATCH 2/9] runtime: refactor GC mark phase into gcMarkReachable This is a small refactor to prepare GC marking for multithreaded stop-the-world. --- src/runtime/gc_blocks.go | 3 +-- src/runtime/gc_stack_portable.go | 5 +++++ src/runtime/gc_stack_raw.go | 5 +++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/runtime/gc_blocks.go b/src/runtime/gc_blocks.go index d58bfd92a2..5b0453bb2e 100644 --- a/src/runtime/gc_blocks.go +++ b/src/runtime/gc_blocks.go @@ -456,8 +456,7 @@ func runGC() (freeBytes uintptr) { } // Mark phase: mark all reachable objects, recursively. - markStack() - findGlobals(markRoots) + gcMarkReachable() if baremetal && hasScheduler { // Channel operations in interrupts may move task pointers around while we are marking. diff --git a/src/runtime/gc_stack_portable.go b/src/runtime/gc_stack_portable.go index d35e16e30c..750a34ec2c 100644 --- a/src/runtime/gc_stack_portable.go +++ b/src/runtime/gc_stack_portable.go @@ -8,6 +8,11 @@ import ( "unsafe" ) +func gcMarkReachable() { + markStack() + findGlobals(markRoots) +} + //go:extern runtime.stackChainStart var stackChainStart *stackChainObject diff --git a/src/runtime/gc_stack_raw.go b/src/runtime/gc_stack_raw.go index 5ee18622db..d55522a9f6 100644 --- a/src/runtime/gc_stack_raw.go +++ b/src/runtime/gc_stack_raw.go @@ -4,6 +4,11 @@ package runtime import "internal/task" +func gcMarkReachable() { + markStack() + findGlobals(markRoots) +} + // markStack marks all root pointers found on the stack. // // This implementation is conservative and relies on the stack top (provided by From c9bb33a73ce77eb8223c043640a8313741c8c0c7 Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Sat, 2 Nov 2024 11:08:02 +0100 Subject: [PATCH 3/9] runtime: make conservative and precise GC MT-safe Using a global lock may be slow, but it is certainly simple and safe. If this global lock becomes a bottleneck, we can of course look into making the GC truly support multithreading. --- src/runtime/gc_blocks.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/runtime/gc_blocks.go b/src/runtime/gc_blocks.go index 5b0453bb2e..bce87ef271 100644 --- a/src/runtime/gc_blocks.go +++ b/src/runtime/gc_blocks.go @@ -57,6 +57,7 @@ var ( gcMallocs uint64 // total number of allocations gcFrees uint64 // total number of objects freed gcFreedBlocks uint64 // total number of freed blocks + gcLock task.PMutex // lock to avoid race conditions on multicore systems ) // zeroSizedAlloc is just a sentinel that gets returned when allocating 0 bytes. @@ -317,6 +318,10 @@ func alloc(size uintptr, layout unsafe.Pointer) unsafe.Pointer { runtimePanicAt(returnAddress(0), "heap alloc in interrupt") } + // Make sure there are no concurrent allocations. The heap is not currently + // designed for concurrent alloc/GC. + gcLock.Lock() + gcTotalAlloc += uint64(size) gcMallocs++ @@ -399,6 +404,9 @@ func alloc(size uintptr, layout unsafe.Pointer) unsafe.Pointer { i.setState(blockStateTail) } + // We've claimed this allocation, now we can unlock the heap. + gcLock.Unlock() + // Return a pointer to this allocation. pointer := thisAlloc.pointer() if preciseHeap { @@ -444,7 +452,9 @@ func free(ptr unsafe.Pointer) { // GC performs a garbage collection cycle. func GC() { + gcLock.Lock() runGC() + gcLock.Unlock() } // runGC performs a garbage collection cycle. It is the internal implementation @@ -713,6 +723,7 @@ func dumpHeap() { // The returned memory statistics are up to date as of the // call to ReadMemStats. This would not do GC implicitly for you. func ReadMemStats(m *MemStats) { + gcLock.Lock() m.HeapIdle = 0 m.HeapInuse = 0 for block := gcBlock(0); block < endBlock; block++ { @@ -732,6 +743,7 @@ func ReadMemStats(m *MemStats) { m.Sys = uint64(heapEnd - heapStart) m.HeapAlloc = (gcTotalBlocks - gcFreedBlocks) * uint64(bytesPerBlock) m.Alloc = m.HeapAlloc + gcLock.Unlock() } func SetFinalizer(obj interface{}, finalizer interface{}) { From 3cc9c44c268905c0a5965698267acfc61a034ee5 Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Sat, 2 Nov 2024 13:45:17 +0100 Subject: [PATCH 4/9] runtime: refactor timerQueue Move common functions to scheduler.go. They will be used both from the cooperative and from the threads scheduler. --- src/runtime/scheduler.go | 30 ++++++++++++++++++++++++++++ src/runtime/scheduler_cooperative.go | 26 ++---------------------- 2 files changed, 32 insertions(+), 24 deletions(-) diff --git a/src/runtime/scheduler.go b/src/runtime/scheduler.go index 727c7f5f2c..7461c966ed 100644 --- a/src/runtime/scheduler.go +++ b/src/runtime/scheduler.go @@ -6,6 +6,8 @@ const schedulerDebug = false var mainExited bool +var timerQueue *timerNode + // Simple logging, for debugging. func scheduleLog(msg string) { if schedulerDebug { @@ -27,6 +29,34 @@ func scheduleLogChan(msg string, ch *channel, t *task.Task) { } } +func timerQueueAdd(tn *timerNode) { + q := &timerQueue + for ; *q != nil; q = &(*q).next { + if tn.whenTicks() < (*q).whenTicks() { + // this will finish earlier than the next - insert here + break + } + } + tn.next = *q + *q = tn +} + +func timerQueueRemove(t *timer) bool { + removedTimer := false + for q := &timerQueue; *q != nil; q = &(*q).next { + if (*q).timer == t { + scheduleLog("removed timer") + *q = (*q).next + removedTimer = true + break + } + } + if !removedTimer { + scheduleLog("did not remove timer") + } + return removedTimer +} + // Goexit terminates the currently running goroutine. No other goroutines are affected. func Goexit() { panicOrGoexit(nil, panicGoexit) diff --git a/src/runtime/scheduler_cooperative.go b/src/runtime/scheduler_cooperative.go index 91ba86409f..85c8f56f09 100644 --- a/src/runtime/scheduler_cooperative.go +++ b/src/runtime/scheduler_cooperative.go @@ -32,7 +32,6 @@ var ( runqueue task.Queue sleepQueue *task.Task sleepQueueBaseTime timeUnit - timerQueue *timerNode ) // deadlock is called when a goroutine cannot proceed any more, but is in theory @@ -100,36 +99,15 @@ func addSleepTask(t *task.Task, duration timeUnit) { // sleepQueue. func addTimer(tim *timerNode) { mask := interrupt.Disable() - - // Add to timer queue. - q := &timerQueue - for ; *q != nil; q = &(*q).next { - if tim.whenTicks() < (*q).whenTicks() { - // this will finish earlier than the next - insert here - break - } - } - tim.next = *q - *q = tim + timerQueueAdd(tim) interrupt.Restore(mask) } // removeTimer is the implementation of time.stopTimer. It removes a timer from // the timer queue, returning true if the timer is present in the timer queue. func removeTimer(tim *timer) bool { - removedTimer := false mask := interrupt.Disable() - for t := &timerQueue; *t != nil; t = &(*t).next { - if (*t).timer == tim { - scheduleLog("removed timer") - *t = (*t).next - removedTimer = true - break - } - } - if !removedTimer { - scheduleLog("did not remove timer") - } + removedTimer := timerQueueRemove(tim) interrupt.Restore(mask) return removedTimer } From 8d6e16019ab915ace09ab0f2237c6b2e5f1d1df2 Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Sat, 2 Nov 2024 15:33:06 +0100 Subject: [PATCH 5/9] sync: implement RWMutex using futexes Somewhat surprisingly, this results in smaller code than the old code with the cooperative (tasks) scheduler. Probably because the new RWMutex is also simpler. --- src/sync/mutex.go | 163 ++++++++++++++++++---------------------------- 1 file changed, 64 insertions(+), 99 deletions(-) diff --git a/src/sync/mutex.go b/src/sync/mutex.go index 08c674d7ea..890af78606 100644 --- a/src/sync/mutex.go +++ b/src/sync/mutex.go @@ -6,131 +6,96 @@ import ( type Mutex = task.Mutex -type RWMutex struct { - // waitingWriters are all of the tasks waiting for write locks. - waitingWriters task.Stack - - // waitingReaders are all of the tasks waiting for a read lock. - waitingReaders task.Stack +//go:linkname runtimePanic runtime.runtimePanic +func runtimePanic(msg string) - // state is the current state of the RWMutex. - // Iff the mutex is completely unlocked, it contains rwMutexStateUnlocked (aka 0). - // Iff the mutex is write-locked, it contains rwMutexStateWLocked. - // While the mutex is read-locked, it contains the current number of readers. - state uint32 +type RWMutex struct { + // Reader count, with the number of readers that currently have read-locked + // this mutex. + // The value can be in two states: one where 0 means no readers and another + // where -rwMutexMaxReaders means no readers. A base of 0 is normal + // uncontended operation, a base of -rwMutexMaxReaders means a writer has + // the lock or is trying to get the lock. In the second case, readers should + // wait until the reader count becomes non-negative again to give the writer + // a chance to obtain the lock. + readers task.Futex + + // Writer futex, normally 0. If there is a writer waiting until all readers + // have unlocked, this value is 1. It will be changed to a 2 (and get a + // wake) when the last reader unlocks. + writer task.Futex + + // Writer lock. Held between Lock() and Unlock(). + writerLock Mutex } -const ( - rwMutexStateUnlocked = uint32(0) - rwMutexStateWLocked = ^uint32(0) - rwMutexMaxReaders = rwMutexStateWLocked - 1 -) +const rwMutexMaxReaders = 1 << 30 func (rw *RWMutex) Lock() { - if rw.state == 0 { - // The mutex is completely unlocked. - // Lock without waiting. - rw.state = rwMutexStateWLocked + // Exclusive lock for writers. + rw.writerLock.Lock() + + // Flag that we need to be awakened after the last read-lock unlocks. + rw.writer.Store(1) + + // Signal to readers that they can't lock this mutex anymore. + n := uint32(rwMutexMaxReaders) + waiting := rw.readers.Add(-n) + if int32(waiting) == -rwMutexMaxReaders { + // All readers were already unlocked, so we don't need to wait for them. + rw.writer.Store(0) return } - // Wait for the lock to be released. - rw.waitingWriters.Push(task.Current()) - task.Pause() + // There is at least one reader. + // Wait until all readers are unlocked. The last reader to unlock will set + // rw.writer to 2 and awaken us. + for rw.writer.Load() == 1 { + rw.writer.Wait(1) + } + rw.writer.Store(0) } func (rw *RWMutex) Unlock() { - switch rw.state { - case rwMutexStateWLocked: - // This is correct. - - case rwMutexStateUnlocked: - // The mutex is already unlocked. - panic("sync: unlock of unlocked RWMutex") - - default: - // The mutex is read-locked instead of write-locked. - panic("sync: write-unlock of read-locked RWMutex") + // Signal that new readers can lock this mutex. + waiting := rw.readers.Add(rwMutexMaxReaders) + if waiting != 0 { + // Awaken all waiting readers. + rw.readers.WakeAll() } - switch { - case rw.maybeUnblockReaders(): - // Switched over to read mode. - - case rw.maybeUnblockWriter(): - // Transferred to another writer. - - default: - // Nothing is waiting for the lock. - rw.state = rwMutexStateUnlocked - } + // Done with this lock (next writer can try to get a lock). + rw.writerLock.Unlock() } func (rw *RWMutex) RLock() { - if rw.state == rwMutexStateWLocked { - // Wait for the write lock to be released. - rw.waitingReaders.Push(task.Current()) - task.Pause() - return - } + // Add us as a reader. + newVal := rw.readers.Add(1) - if rw.state == rwMutexMaxReaders { - panic("sync: too many readers on RWMutex") + // Wait until the RWMutex is available for readers. + for int32(newVal) <= 0 { + rw.readers.Wait(newVal) + newVal = rw.readers.Load() } - - // Increase the reader count. - rw.state++ } func (rw *RWMutex) RUnlock() { - switch rw.state { - case rwMutexStateUnlocked: - // The mutex is already unlocked. - panic("sync: unlock of unlocked RWMutex") - - case rwMutexStateWLocked: - // The mutex is write-locked instead of read-locked. - panic("sync: read-unlock of write-locked RWMutex") - } - - rw.state-- + // Remove us as a reader. + one := uint32(1) + readers := int32(rw.readers.Add(-one)) - if rw.state == rwMutexStateUnlocked { - // This was the last reader. - // Try to unblock a writer. - rw.maybeUnblockWriter() + // Check whether RUnlock was called too often. + if readers == -1 || readers == (-rwMutexMaxReaders)-1 { + runtimePanic("sync: RUnlock of unlocked RWMutex") } -} -func (rw *RWMutex) maybeUnblockReaders() bool { - var n uint32 - for { - t := rw.waitingReaders.Pop() - if t == nil { - break + if readers == -rwMutexMaxReaders { + // This was the last read lock. Check whether we need to wake up a write + // lock. + if rw.writer.CompareAndSwap(1, 2) { + rw.writer.Wake() } - - n++ - scheduleTask(t) - } - if n == 0 { - return false } - - rw.state = n - return true -} - -func (rw *RWMutex) maybeUnblockWriter() bool { - t := rw.waitingWriters.Pop() - if t == nil { - return false - } - - rw.state = rwMutexStateWLocked - scheduleTask(t) - - return true } type Locker interface { From 3767decf9e272cd34ab3b7c4807758df33af11a5 Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Thu, 24 Oct 2024 10:26:17 +0200 Subject: [PATCH 6/9] runtime: map every goroutine to a new OS thread This is not a scheduler in the runtime, instead every goroutine is mapped to a single OS thread - meaning 1:1 scheduling. While this may not perform well (or at all) for large numbers of threads, it greatly simplifies many things in the runtime. For example, blocking syscalls can be called directly instead of having to use epoll or similar. Also, we don't need to do anything special to call C code - the default stack is all we need. --- compileopts/options.go | 2 +- compileopts/options_test.go | 2 +- compileopts/target.go | 5 +- src/internal/task/linux.go | 9 + src/internal/task/semaphore.go | 32 ++++ src/internal/task/task_threads.c | 104 ++++++++++++ src/internal/task/task_threads.go | 265 ++++++++++++++++++++++++++++++ src/runtime/gc_stack_raw.go | 2 +- src/runtime/gc_stack_threads.go | 25 +++ src/runtime/runtime_unix.go | 1 + src/runtime/scheduler_threads.go | 124 ++++++++++++++ 11 files changed, 567 insertions(+), 4 deletions(-) create mode 100644 src/internal/task/linux.go create mode 100644 src/internal/task/semaphore.go create mode 100644 src/internal/task/task_threads.c create mode 100644 src/internal/task/task_threads.go create mode 100644 src/runtime/gc_stack_threads.go create mode 100644 src/runtime/scheduler_threads.go diff --git a/compileopts/options.go b/compileopts/options.go index 30e0e4dbed..78eb17cde9 100644 --- a/compileopts/options.go +++ b/compileopts/options.go @@ -10,7 +10,7 @@ import ( var ( validBuildModeOptions = []string{"default", "c-shared"} validGCOptions = []string{"none", "leaking", "conservative", "custom", "precise"} - validSchedulerOptions = []string{"none", "tasks", "asyncify"} + validSchedulerOptions = []string{"none", "tasks", "asyncify", "threads"} validSerialOptions = []string{"none", "uart", "usb", "rtt"} validPrintSizeOptions = []string{"none", "short", "full", "html"} validPanicStrategyOptions = []string{"print", "trap"} diff --git a/compileopts/options_test.go b/compileopts/options_test.go index ee63c4c46d..d52ef5690e 100644 --- a/compileopts/options_test.go +++ b/compileopts/options_test.go @@ -10,7 +10,7 @@ import ( func TestVerifyOptions(t *testing.T) { expectedGCError := errors.New(`invalid gc option 'incorrect': valid values are none, leaking, conservative, custom, precise`) - expectedSchedulerError := errors.New(`invalid scheduler option 'incorrect': valid values are none, tasks, asyncify`) + expectedSchedulerError := errors.New(`invalid scheduler option 'incorrect': valid values are none, tasks, asyncify, threads`) expectedPrintSizeError := errors.New(`invalid size option 'incorrect': valid values are none, short, full, html`) expectedPanicStrategyError := errors.New(`invalid panic option 'incorrect': valid values are print, trap`) diff --git a/compileopts/target.go b/compileopts/target.go index 7893e58290..a28bf774fa 100644 --- a/compileopts/target.go +++ b/compileopts/target.go @@ -248,7 +248,6 @@ func defaultTarget(options *Options) (*TargetSpec, error) { GOARCH: options.GOARCH, BuildTags: []string{options.GOOS, options.GOARCH}, GC: "precise", - Scheduler: "tasks", Linker: "cc", DefaultStackSize: 1024 * 64, // 64kB GDB: []string{"gdb"}, @@ -381,6 +380,7 @@ func defaultTarget(options *Options) (*TargetSpec, error) { platformVersion = "11.0.0" // first macosx platform with arm64 support } llvmvendor = "apple" + spec.Scheduler = "tasks" spec.Linker = "ld.lld" spec.Libc = "darwin-libSystem" // Use macosx* instead of darwin, otherwise darwin/arm64 will refer to @@ -398,6 +398,7 @@ func defaultTarget(options *Options) (*TargetSpec, error) { "src/runtime/runtime_unix.c", "src/runtime/signal.c") case "linux": + spec.Scheduler = "threads" spec.Linker = "ld.lld" spec.RTLib = "compiler-rt" spec.Libc = "musl" @@ -418,9 +419,11 @@ func defaultTarget(options *Options) (*TargetSpec, error) { } spec.ExtraFiles = append(spec.ExtraFiles, "src/internal/futex/futex_linux.c", + "src/internal/task/task_threads.c", "src/runtime/runtime_unix.c", "src/runtime/signal.c") case "windows": + spec.Scheduler = "tasks" spec.Linker = "ld.lld" spec.Libc = "mingw-w64" // Note: using a medium code model, low image base and no ASLR diff --git a/src/internal/task/linux.go b/src/internal/task/linux.go new file mode 100644 index 0000000000..7d28f708c4 --- /dev/null +++ b/src/internal/task/linux.go @@ -0,0 +1,9 @@ +//go:build linux && !baremetal + +package task + +import "unsafe" + +// Musl uses a pointer (or unsigned long for C++) so unsafe.Pointer should be +// fine. +type threadID unsafe.Pointer diff --git a/src/internal/task/semaphore.go b/src/internal/task/semaphore.go new file mode 100644 index 0000000000..914f09bc5e --- /dev/null +++ b/src/internal/task/semaphore.go @@ -0,0 +1,32 @@ +package task + +// Barebones semaphore implementation. +// The main limitation is that if there are multiple waiters, a single Post() +// call won't do anything. Only when Post() has been called to awaken all +// waiters will the waiters proceed. +// This limitation is not a problem when there will only be a single waiter. +type Semaphore struct { + futex Futex +} + +// Post (unlock) the semaphore, incrementing the value in the semaphore. +func (s *Semaphore) Post() { + newValue := s.futex.Add(1) + if newValue == 0 { + s.futex.WakeAll() + } +} + +// Wait (lock) the semaphore, decrementing the value in the semaphore. +func (s *Semaphore) Wait() { + delta := int32(-1) + value := s.futex.Add(uint32(delta)) + for { + if int32(value) >= 0 { + // Semaphore unlocked! + return + } + s.futex.Wait(value) + value = s.futex.Load() + } +} diff --git a/src/internal/task/task_threads.c b/src/internal/task/task_threads.c new file mode 100644 index 0000000000..a14844f2ef --- /dev/null +++ b/src/internal/task/task_threads.c @@ -0,0 +1,104 @@ +//go:build none + +#define _GNU_SOURCE +#include +#include +#include +#include +#include + +// BDWGC also uses SIGRTMIN+6 on Linux, which seems like a reasonable choice. +#ifdef __linux__ +#define taskPauseSignal (SIGRTMIN + 6) +#endif + +// Pointer to the current task.Task structure. +// Ideally the entire task.Task structure would be a thread-local variable but +// this also works. +static __thread void *current_task; + +struct state_pass { + void *(*start)(void*); + void *args; + void *task; + uintptr_t *stackTop; + sem_t startlock; +}; + +// Handle the GC pause in Go. +void tinygo_task_gc_pause(int sig); + +// Initialize the main thread. +void tinygo_task_init(void *mainTask, pthread_t *thread, void *context) { + // Make sure the current task pointer is set correctly for the main + // goroutine as well. + current_task = mainTask; + + // Store the thread ID of the main thread. + *thread = pthread_self(); + + // Register the "GC pause" signal for the entire process. + // Using pthread_kill, we can still send the signal to a specific thread. + struct sigaction act = { 0 }; + act.sa_flags = SA_SIGINFO; + act.sa_handler = &tinygo_task_gc_pause; + sigaction(taskPauseSignal, &act, NULL); +} + +void tinygo_task_exited(void*); + +// Helper to start a goroutine while also storing the 'task' structure. +static void* start_wrapper(void *arg) { + struct state_pass *state = arg; + void *(*start)(void*) = state->start; + void *args = state->args; + current_task = state->task; + + // Save the current stack pointer in the goroutine state, for the GC. + int stackAddr; + *(state->stackTop) = (uintptr_t)(&stackAddr); + + // Notify the caller that the thread has successfully started and + // initialized. + sem_post(&state->startlock); + + // Run the goroutine function. + start(args); + + // Notify the Go side this thread will exit. + tinygo_task_exited(current_task); + + return NULL; +}; + +// Start a new goroutine in an OS thread. +int tinygo_task_start(uintptr_t fn, void *args, void *task, pthread_t *thread, uintptr_t *stackTop, void *context) { + // Sanity check. Should get optimized away. + if (sizeof(pthread_t) != sizeof(void*)) { + __builtin_trap(); + } + + struct state_pass state = { + .start = (void*)fn, + .args = args, + .task = task, + .stackTop = stackTop, + }; + sem_init(&state.startlock, 0, 0); + int result = pthread_create(thread, NULL, &start_wrapper, &state); + + // Wait until the thread has been created and read all state_pass variables. + sem_wait(&state.startlock); + + return result; +} + +// Return the current task (for task.Current()). +void* tinygo_task_current(void) { + return current_task; +} + +// Send a signal to cause the task to pause for the GC mark phase. +void tinygo_task_send_gc_signal(pthread_t thread) { + pthread_kill(thread, taskPauseSignal); +} diff --git a/src/internal/task/task_threads.go b/src/internal/task/task_threads.go new file mode 100644 index 0000000000..93204fb9ba --- /dev/null +++ b/src/internal/task/task_threads.go @@ -0,0 +1,265 @@ +//go:build scheduler.threads + +package task + +import ( + "sync/atomic" + "unsafe" +) + +// If true, print verbose debug logs. +const verbose = false + +// Scheduler-specific state. +type state struct { + // Goroutine ID. The number here is not really significant and after a while + // it could wrap around. But it is useful for debugging. + id uintptr + + // Thread ID, pthread_t or similar (typically implemented as a pointer). + thread threadID + + // Highest address of the stack. It is stored when the goroutine starts, and + // is needed to be able to scan the stack. + stackTop uintptr + + // Next task in the activeTasks queue. + QueueNext *Task + + // Semaphore to pause/resume the thread atomically. + pauseSem Semaphore + + // Semaphore used for stack scanning. + // We can't reuse pauseSem here since the thread might have been paused for + // other reasons (for example, because it was waiting on a channel). + gcSem Semaphore +} + +// Goroutine counter, starting at 0 for the main goroutine. +var goroutineID uintptr + +var mainTask Task + +// Queue of tasks (see QueueNext) that currently exist in the program. +var activeTasks = &mainTask +var activeTaskLock PMutex + +func OnSystemStack() bool { + runtimePanic("todo: task.OnSystemStack") + return false +} + +// Initialize the main goroutine state. Must be called by the runtime on +// startup, before starting any other goroutines. +func Init(sp uintptr) { + mainTask.state.stackTop = sp + tinygo_task_init(&mainTask, &mainTask.state.thread) +} + +// Return the task struct for the current thread. +func Current() *Task { + t := (*Task)(tinygo_task_current()) + if t == nil { + runtimePanic("unknown current task") + } + return t +} + +// Pause pauses the current task, until it is resumed by another task. +// It is possible that another task has called Resume() on the task before it +// hits Pause(), in which case the task won't be paused but continues +// immediately. +func Pause() { + // Wait until resumed + t := Current() + if verbose { + println("*** pause: ", t.state.id) + } + t.state.pauseSem.Wait() +} + +// Resume the given task. +// It is legal to resume a task before it gets paused, it means that the next +// call to Pause() won't pause but will continue immediately. This happens in +// practice sometimes in channel operations, where the Resume() might get called +// between the channel unlock and the call to Pause(). +func (t *Task) Resume() { + if verbose { + println("*** resume: ", t.state.id) + } + // Increment the semaphore counter. + // If the task is currently paused in Wait(), it will resume. + // If the task is not yet paused, the next call to Wait() will continue + // immediately. + t.state.pauseSem.Post() +} + +// Start a new OS thread. +func start(fn uintptr, args unsafe.Pointer, stackSize uintptr) { + t := &Task{} + t.state.id = atomic.AddUintptr(&goroutineID, 1) + if verbose { + println("*** start: ", t.state.id, "from", Current().state.id) + } + + // Start the new thread, and add it to the list of threads. + // Do this with a lock so that only started threads are part of the queue + // and the stop-the-world GC won't see threads that haven't started yet or + // are not fully started yet. + activeTaskLock.Lock() + errCode := tinygo_task_start(fn, args, t, &t.state.thread, &t.state.stackTop) + if errCode != 0 { + runtimePanic("could not start thread") + } + t.state.QueueNext = activeTasks + activeTasks = t + activeTaskLock.Unlock() +} + +//export tinygo_task_exited +func taskExited(t *Task) { + if verbose { + println("*** exit:", t.state.id) + } + + // Remove from the queue. + // TODO: this can be made more efficient by using a doubly linked list. + activeTaskLock.Lock() + found := false + for q := &activeTasks; *q != nil; q = &(*q).state.QueueNext { + if *q == t { + *q = t.state.QueueNext + found = true + break + } + } + activeTaskLock.Unlock() + + // Sanity check. + if !found { + runtimePanic("taskExited failed") + } +} + +// Futex to wait on until all tasks have finished scanning the stack. +// This is basically a sync.WaitGroup. +var scanDoneFutex Futex + +// GC scan phase. Because we need to stop the world while scanning, this kinda +// needs to be done in the tasks package. +func GCScan() { + current := Current() + + // Don't allow new goroutines to be started while pausing/resuming threads + // in the stop-the-world phase. + activeTaskLock.Lock() + + // Pause all other threads. + numOtherThreads := uint32(0) + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + numOtherThreads++ + tinygo_task_send_gc_signal(t.state.thread) + } + } + + // Store the number of threads to wait for in the futex. + // This is the equivalent of doing an initial wg.Add(numOtherThreads). + scanDoneFutex.Store(numOtherThreads) + + // Scan the current stack, and all current registers. + scanCurrentStack() + + // Wake each paused thread for the first time so it will scan the stack. + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + t.state.gcSem.Post() + } + } + + // Wait until all threads have finished scanning their stack. + // This is the equivalent of wg.Wait() + for { + val := scanDoneFutex.Load() + if val == 0 { + break + } + scanDoneFutex.Wait(val) + } + + // Scan all globals (implemented in the runtime). + gcScanGlobals() + + // Wake each paused thread for the second time, so they will resume normal + // operation. + for t := activeTasks; t != nil; t = t.state.QueueNext { + if t != current { + t.state.gcSem.Post() + } + } + + // Allow goroutines to start and exit again. + activeTaskLock.Unlock() +} + +// Scan globals, implemented in the runtime package. +func gcScanGlobals() + +var stackScanLock PMutex + +//export tinygo_task_gc_pause +func tingyo_task_gc_pause() { + // Wait until we get the signal to start scanning the stack. + Current().state.gcSem.Wait() + + // Scan the thread stack. + // Only scan a single thread stack at a time, because the GC marking phase + // doesn't support parallelism. + // TODO: it may be possible to call markRoots directly (without saving + // registers) since we are in a signal handler that already saved a bunch of + // registers. This is an optimization left for a future time. + stackScanLock.Lock() + scanCurrentStack() + stackScanLock.Unlock() + + // Equivalent of wg.Done(): subtract one from the futex and if the result is + // 0 (meaning we were the last in the waitgroup), wake the waiting thread. + n := uint32(1) + if scanDoneFutex.Add(-n) == 0 { + scanDoneFutex.Wake() + } + + // Wait until we get the signal we can resume normally (after the mark phase + // has finished). + Current().state.gcSem.Wait() +} + +//go:export tinygo_scanCurrentStack +func scanCurrentStack() + +// Return the highest address of the current stack. +func StackTop() uintptr { + return Current().state.stackTop +} + +//go:linkname runtimePanic runtime.runtimePanic +func runtimePanic(msg string) + +// Using //go:linkname instead of //export so that we don't tell the compiler +// that the 't' parameter won't escape (because it will). +// +//go:linkname tinygo_task_init tinygo_task_init +func tinygo_task_init(t *Task, thread *threadID) + +// Here same as for tinygo_task_init. +// +//go:linkname tinygo_task_start tinygo_task_start +func tinygo_task_start(fn uintptr, args unsafe.Pointer, t *Task, thread *threadID, stackTop *uintptr) int32 + +// Pause the thread by sending it a signal. +// +//export tinygo_task_send_gc_signal +func tinygo_task_send_gc_signal(threadID) + +//export tinygo_task_current +func tinygo_task_current() unsafe.Pointer diff --git a/src/runtime/gc_stack_raw.go b/src/runtime/gc_stack_raw.go index d55522a9f6..bdc3154fa5 100644 --- a/src/runtime/gc_stack_raw.go +++ b/src/runtime/gc_stack_raw.go @@ -1,4 +1,4 @@ -//go:build (gc.conservative || gc.precise) && !tinygo.wasm +//go:build (gc.conservative || gc.precise) && !tinygo.wasm && !scheduler.threads package runtime diff --git a/src/runtime/gc_stack_threads.go b/src/runtime/gc_stack_threads.go new file mode 100644 index 0000000000..9c77fa0c7b --- /dev/null +++ b/src/runtime/gc_stack_threads.go @@ -0,0 +1,25 @@ +//go:build scheduler.threads + +package runtime + +import "internal/task" + +func gcMarkReachable() { + task.GCScan() +} + +// Scan globals inside the stop-the-world phase. Called from the STW +// implementation in the internal/task package. +// +//go:linkname gcScanGlobals internal/task.gcScanGlobals +func gcScanGlobals() { + findGlobals(markRoots) +} + +// Function called from assembly with all registers pushed, to actually scan the +// stack. +// +//go:export tinygo_scanstack +func scanstack(sp uintptr) { + markRoots(sp, task.StackTop()) +} diff --git a/src/runtime/runtime_unix.go b/src/runtime/runtime_unix.go index 08e3e74269..17e004b2e8 100644 --- a/src/runtime/runtime_unix.go +++ b/src/runtime/runtime_unix.go @@ -73,6 +73,7 @@ type timespec struct { tv_nsec int64 // unsigned 64-bit integer on all time64 platforms } +// Highest address of the stack of the main thread. var stackTop uintptr // Entry point for Go. Initialize all packages and call main.main(). diff --git a/src/runtime/scheduler_threads.go b/src/runtime/scheduler_threads.go new file mode 100644 index 0000000000..e553a5b9c7 --- /dev/null +++ b/src/runtime/scheduler_threads.go @@ -0,0 +1,124 @@ +//go:build scheduler.threads + +package runtime + +import "internal/task" + +const hasScheduler = false // not using the cooperative scheduler + +// We use threads, so yes there is parallelism. +const hasParallelism = true + +var ( + timerQueueLock task.PMutex + timerQueueStarted bool + timerFutex task.Futex +) + +// Because we just use OS threads, we don't need to do anything special here. We +// can just initialize everything and run main.main on the main thread. +func run() { + initHeap() + task.Init(stackTop) + initAll() + callMain() +} + +// Pause the current task for a given time. +// +//go:linkname sleep time.Sleep +func sleep(duration int64) { + if duration <= 0 { + return + } + + sleepTicks(nanosecondsToTicks(duration)) +} + +func deadlock() { + // TODO: exit the thread via pthread_exit. + task.Pause() +} + +func scheduleTask(t *task.Task) { + t.Resume() +} + +func Gosched() { + // Each goroutine runs in a thread, so there's not much we can do here. + // There is sched_yield but it's only really intended for realtime + // operation, so is probably best not to use. +} + +// Separate goroutine (thread) that runs timer callbacks when they expire. +func timerRunner() { + for { + timerQueueLock.Lock() + + if timerQueue == nil { + // No timer in the queue, so wait until one becomes available. + val := timerFutex.Load() + timerQueueLock.Unlock() + timerFutex.Wait(val) + continue + } + + now := ticks() + if now < timerQueue.whenTicks() { + // There is a timer in the queue, but we need to wait until it + // expires. + // Using a futex, so that the wait is exited early when adding a new + // (sooner-to-expire) timer. + val := timerFutex.Load() + timerQueueLock.Unlock() + timeout := ticksToNanoseconds(timerQueue.whenTicks() - now) + timerFutex.WaitUntil(val, uint64(timeout)) + continue + } + + // Pop timer from queue. + tn := timerQueue + timerQueue = tn.next + tn.next = nil + + timerQueueLock.Unlock() + + // Run the callback stored in this timer node. + delay := ticksToNanoseconds(now - tn.whenTicks()) + tn.callback(tn, delay) + } +} + +func addTimer(tim *timerNode) { + timerQueueLock.Lock() + + if !timerQueueStarted { + timerQueueStarted = true + go timerRunner() + } + + timerQueueAdd(tim) + + timerFutex.Add(1) + timerFutex.Wake() + + timerQueueLock.Unlock() +} + +func removeTimer(tim *timer) bool { + timerQueueLock.Lock() + removed := timerQueueRemove(tim) + timerQueueLock.Unlock() + return removed +} + +func schedulerRunQueue() *task.Queue { + // This function is not actually used, it is only called when hasScheduler + // is true. So we can just return nil here. + return nil +} + +func runqueueForGC() *task.Queue { + // There is only a runqueue when using the cooperative scheduler. + return nil +} From cce44b5ebb027f68f7ab2ef26546398803bf1c34 Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Tue, 21 Jan 2025 11:10:18 +0100 Subject: [PATCH 7/9] runtime: implement NumCPU for -scheduler=threads For the threads scheduler, it makes sense to have NumCPU available. For all other schedulers, the number of available CPUs is practically limited to one by the scheduler (even though the system might have more CPUs). --- builder/musl.go | 1 + src/internal/task/task_threads.c | 11 ++++++++++- src/internal/task/task_threads.go | 10 ++++++++-- src/runtime/debug.go | 9 --------- src/runtime/scheduler_cooperative.go | 5 +++++ src/runtime/scheduler_none.go | 5 +++++ src/runtime/scheduler_threads.go | 5 +++++ 7 files changed, 34 insertions(+), 12 deletions(-) diff --git a/builder/musl.go b/builder/musl.go index 3c79c7c43a..54ec2c83ed 100644 --- a/builder/musl.go +++ b/builder/musl.go @@ -113,6 +113,7 @@ var libMusl = Library{ librarySources: func(target string) ([]string, error) { arch := compileopts.MuslArchitecture(target) globs := []string{ + "conf/*.c", "env/*.c", "errno/*.c", "exit/*.c", diff --git a/src/internal/task/task_threads.c b/src/internal/task/task_threads.c index a14844f2ef..6ada95fa84 100644 --- a/src/internal/task/task_threads.c +++ b/src/internal/task/task_threads.c @@ -6,6 +6,7 @@ #include #include #include +#include // BDWGC also uses SIGRTMIN+6 on Linux, which seems like a reasonable choice. #ifdef __linux__ @@ -29,7 +30,7 @@ struct state_pass { void tinygo_task_gc_pause(int sig); // Initialize the main thread. -void tinygo_task_init(void *mainTask, pthread_t *thread, void *context) { +void tinygo_task_init(void *mainTask, pthread_t *thread, int *numCPU, void *context) { // Make sure the current task pointer is set correctly for the main // goroutine as well. current_task = mainTask; @@ -43,6 +44,14 @@ void tinygo_task_init(void *mainTask, pthread_t *thread, void *context) { act.sa_flags = SA_SIGINFO; act.sa_handler = &tinygo_task_gc_pause; sigaction(taskPauseSignal, &act, NULL); + + // Obtain the number of CPUs available on program start (for NumCPU). + int num = sysconf(_SC_NPROCESSORS_ONLN); + if (num <= 0) { + // Fallback in case there is an error. + num = 1; + } + *numCPU = num; } void tinygo_task_exited(void*); diff --git a/src/internal/task/task_threads.go b/src/internal/task/task_threads.go index 93204fb9ba..c3531ef9c1 100644 --- a/src/internal/task/task_threads.go +++ b/src/internal/task/task_threads.go @@ -38,6 +38,8 @@ type state struct { // Goroutine counter, starting at 0 for the main goroutine. var goroutineID uintptr +var numCPU int32 + var mainTask Task // Queue of tasks (see QueueNext) that currently exist in the program. @@ -53,7 +55,7 @@ func OnSystemStack() bool { // startup, before starting any other goroutines. func Init(sp uintptr) { mainTask.state.stackTop = sp - tinygo_task_init(&mainTask, &mainTask.state.thread) + tinygo_task_init(&mainTask, &mainTask.state.thread, &numCPU) } // Return the task struct for the current thread. @@ -249,7 +251,7 @@ func runtimePanic(msg string) // that the 't' parameter won't escape (because it will). // //go:linkname tinygo_task_init tinygo_task_init -func tinygo_task_init(t *Task, thread *threadID) +func tinygo_task_init(t *Task, thread *threadID, numCPU *int32) // Here same as for tinygo_task_init. // @@ -263,3 +265,7 @@ func tinygo_task_send_gc_signal(threadID) //export tinygo_task_current func tinygo_task_current() unsafe.Pointer + +func NumCPU() int { + return int(numCPU) +} diff --git a/src/runtime/debug.go b/src/runtime/debug.go index 139e18bcd2..230515908f 100644 --- a/src/runtime/debug.go +++ b/src/runtime/debug.go @@ -1,14 +1,5 @@ package runtime -// NumCPU returns the number of logical CPUs usable by the current process. -// -// The set of available CPUs is checked by querying the operating system -// at process startup. Changes to operating system CPU allocation after -// process startup are not reflected. -func NumCPU() int { - return 1 -} - // Stub for NumCgoCall, does not return the real value func NumCgoCall() int { return 0 diff --git a/src/runtime/scheduler_cooperative.go b/src/runtime/scheduler_cooperative.go index 85c8f56f09..bffda72284 100644 --- a/src/runtime/scheduler_cooperative.go +++ b/src/runtime/scheduler_cooperative.go @@ -57,6 +57,11 @@ func Gosched() { task.Pause() } +// NumCPU returns the number of logical CPUs usable by the current process. +func NumCPU() int { + return 1 +} + // Add this task to the sleep queue, assuming its state is set to sleeping. func addSleepTask(t *task.Task, duration timeUnit) { if schedulerDebug { diff --git a/src/runtime/scheduler_none.go b/src/runtime/scheduler_none.go index a5acfd4309..7e2ddeb668 100644 --- a/src/runtime/scheduler_none.go +++ b/src/runtime/scheduler_none.go @@ -40,6 +40,11 @@ func Gosched() { // There are no other goroutines, so there's nothing to schedule. } +// NumCPU returns the number of logical CPUs usable by the current process. +func NumCPU() int { + return 1 +} + func addTimer(tim *timerNode) { runtimePanic("timers not supported without a scheduler") } diff --git a/src/runtime/scheduler_threads.go b/src/runtime/scheduler_threads.go index e553a5b9c7..292499354e 100644 --- a/src/runtime/scheduler_threads.go +++ b/src/runtime/scheduler_threads.go @@ -50,6 +50,11 @@ func Gosched() { // operation, so is probably best not to use. } +// NumCPU returns the number of logical CPUs usable by the current process. +func NumCPU() int { + return task.NumCPU() +} + // Separate goroutine (thread) that runs timer callbacks when they expire. func timerRunner() { for { From 04242fd6207e5ca3da4b7966d622238f6eeb0bd9 Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Mon, 13 Jan 2025 17:31:34 +0100 Subject: [PATCH 8/9] WIP cores scheduler --- builder/sizes.go | 4 +- compileopts/config.go | 5 + compileopts/options.go | 2 +- compileopts/target.go | 1 + compiler/symbol.go | 2 + src/internal/task/atomic-cooperative.go | 2 +- src/internal/task/atomic-preemptive.go | 2 +- src/internal/task/futex-cooperative.go | 2 +- src/internal/task/futex-cores.go | 64 +++++ .../{futex-preemptive.go => futex-threads.go} | 0 src/internal/task/mutex-cooperative.go | 2 +- src/internal/task/mutex-preemptive.go | 2 +- src/internal/task/pmutex-cooperative.go | 2 +- src/internal/task/pmutex-preemptive.go | 2 +- src/internal/task/task.go | 8 + src/internal/task/task_asyncify.go | 8 - src/internal/task/task_cores.c | 50 ++++ src/internal/task/task_cores.go | 266 ++++++++++++++++++ src/internal/task/task_none.go | 3 - src/internal/task/task_stack.go | 8 - src/internal/task/task_threads.go | 3 - src/runtime/atomics_critical.go | 61 ++-- src/runtime/baremetal-multicore.go | 13 + src/runtime/baremetal-unicore.go | 13 + src/runtime/runtime_rp2040.go | 165 +++++++++++ src/runtime/scheduler_cores.go | 91 ++++++ targets/arm.ld | 9 + .../gen-critical-atomics.go | 21 +- 28 files changed, 737 insertions(+), 74 deletions(-) create mode 100644 src/internal/task/futex-cores.go rename src/internal/task/{futex-preemptive.go => futex-threads.go} (100%) create mode 100644 src/internal/task/task_cores.c create mode 100644 src/internal/task/task_cores.go create mode 100644 src/runtime/baremetal-multicore.go create mode 100644 src/runtime/baremetal-unicore.go create mode 100644 src/runtime/scheduler_cores.go diff --git a/builder/sizes.go b/builder/sizes.go index 485a652d97..9aa74ffa82 100644 --- a/builder/sizes.go +++ b/builder/sizes.go @@ -490,9 +490,9 @@ func loadProgramSize(path string, packagePathMap map[string]string) (*programSiz continue } if section.Type == elf.SHT_NOBITS { - if section.Name == ".stack" { + if section.Name == ".stack" || section.Name == ".stack1" { // TinyGo emits stack sections on microcontroller using the - // ".stack" name. + // ".stack" (or ".stack1") name. // This is a bit ugly, but I don't think there is a way to // mark the stack section in a linker script. sections = append(sections, memorySection{ diff --git a/compileopts/config.go b/compileopts/config.go index ee5c34537c..1c88519cd6 100644 --- a/compileopts/config.go +++ b/compileopts/config.go @@ -99,6 +99,11 @@ func (c *Config) BuildTags() []string { "math_big_pure_go", // to get math/big to work "gc." + c.GC(), "scheduler." + c.Scheduler(), // used inside the runtime package "serial." + c.Serial()}...) // used inside the machine package + switch c.Scheduler() { + case "threads", "cores": + default: + tags = append(tags, "tinygo.unicore") + } for i := 1; i <= c.GoMinorVersion; i++ { tags = append(tags, fmt.Sprintf("go1.%d", i)) } diff --git a/compileopts/options.go b/compileopts/options.go index 78eb17cde9..4366dd5c11 100644 --- a/compileopts/options.go +++ b/compileopts/options.go @@ -10,7 +10,7 @@ import ( var ( validBuildModeOptions = []string{"default", "c-shared"} validGCOptions = []string{"none", "leaking", "conservative", "custom", "precise"} - validSchedulerOptions = []string{"none", "tasks", "asyncify", "threads"} + validSchedulerOptions = []string{"none", "tasks", "asyncify", "threads", "cores"} validSerialOptions = []string{"none", "uart", "usb", "rtt"} validPrintSizeOptions = []string{"none", "short", "full", "html"} validPanicStrategyOptions = []string{"print", "trap"} diff --git a/compileopts/target.go b/compileopts/target.go index a28bf774fa..961e69a5d8 100644 --- a/compileopts/target.go +++ b/compileopts/target.go @@ -501,6 +501,7 @@ func defaultTarget(options *Options) (*TargetSpec, error) { } spec.ExtraFiles = append(spec.ExtraFiles, "src/runtime/asm_"+asmGoarch+suffix+".S") spec.ExtraFiles = append(spec.ExtraFiles, "src/internal/task/task_stack_"+asmGoarch+suffix+".S") + spec.ExtraFiles = append(spec.ExtraFiles, "src/internal/task/task_cores.c") } // Configure the emulator. diff --git a/compiler/symbol.go b/compiler/symbol.go index ff7ef05508..8ad2184d39 100644 --- a/compiler/symbol.go +++ b/compiler/symbol.go @@ -208,6 +208,8 @@ func (c *compilerContext) getFunction(fn *ssa.Function) (llvm.Type, llvm.Value) // > circumstances, and should not be exposed to source languages. llvmutil.AppendToGlobal(c.mod, "llvm.compiler.used", llvmFn) } + case "tinygo_exitTask", "tinygo_schedulerUnlock": + llvmutil.AppendToGlobal(c.mod, "llvm.used", llvmFn) } // External/exported functions may not retain pointer values. diff --git a/src/internal/task/atomic-cooperative.go b/src/internal/task/atomic-cooperative.go index bd4cba8956..e05ea7de0d 100644 --- a/src/internal/task/atomic-cooperative.go +++ b/src/internal/task/atomic-cooperative.go @@ -1,4 +1,4 @@ -//go:build !scheduler.threads +//go:build tinygo.unicore package task diff --git a/src/internal/task/atomic-preemptive.go b/src/internal/task/atomic-preemptive.go index 275f36dce4..b395ef48a3 100644 --- a/src/internal/task/atomic-preemptive.go +++ b/src/internal/task/atomic-preemptive.go @@ -1,4 +1,4 @@ -//go:build scheduler.threads +//go:build !tinygo.unicore package task diff --git a/src/internal/task/futex-cooperative.go b/src/internal/task/futex-cooperative.go index 2a42c28d43..ae9efb5a73 100644 --- a/src/internal/task/futex-cooperative.go +++ b/src/internal/task/futex-cooperative.go @@ -1,4 +1,4 @@ -//go:build !scheduler.threads +//go:build tinygo.unicore package task diff --git a/src/internal/task/futex-cores.go b/src/internal/task/futex-cores.go new file mode 100644 index 0000000000..0ffccba2e4 --- /dev/null +++ b/src/internal/task/futex-cores.go @@ -0,0 +1,64 @@ +//go:build scheduler.cores + +package task + +import "runtime/interrupt" + +// A futex is a way for userspace to wait with the pointer as the key, and for +// another thread to wake one or all waiting threads keyed on the same pointer. +// +// A futex does not change the underlying value, it only reads it before to prevent +// lost wake-ups. +type Futex struct { + Uint32 + + waiters Stack +} + +// Atomically check for cmp to still be equal to the futex value and if so, go +// to sleep. Return true if we were definitely awoken by a call to Wake or +// WakeAll, and false if we can't be sure of that. +func (f *Futex) Wait(cmp uint32) (awoken bool) { + mask := futexLock() + + if f.Uint32.Load() != cmp { + futexUnlock(mask) + return false + } + + // Push the current goroutine onto the waiter stack. + f.waiters.Push(Current()) + + futexUnlock(mask) + + // Pause until this task is awoken by Wake/WakeAll. + Pause() + + // We were awoken by a call to Wake or WakeAll. There is no chance for + // spurious wakeups. + return true +} + +// Wake a single waiter. +func (f *Futex) Wake() { + mask := futexLock() + if t := f.waiters.Pop(); t != nil { + scheduleTask(t) + } + futexUnlock(mask) +} + +// Wake all waiters. +func (f *Futex) WakeAll() { + mask := futexLock() + for t := f.waiters.Pop(); t != nil; t = f.waiters.Pop() { + scheduleTask(t) + } + futexUnlock(mask) +} + +//go:linkname futexLock runtime.futexLock +func futexLock() interrupt.State + +//go:linkname futexUnlock runtime.futexUnlock +func futexUnlock(interrupt.State) diff --git a/src/internal/task/futex-preemptive.go b/src/internal/task/futex-threads.go similarity index 100% rename from src/internal/task/futex-preemptive.go rename to src/internal/task/futex-threads.go diff --git a/src/internal/task/mutex-cooperative.go b/src/internal/task/mutex-cooperative.go index f1205eea25..90274df2bb 100644 --- a/src/internal/task/mutex-cooperative.go +++ b/src/internal/task/mutex-cooperative.go @@ -1,4 +1,4 @@ -//go:build !scheduler.threads +//go:build tinygo.unicore package task diff --git a/src/internal/task/mutex-preemptive.go b/src/internal/task/mutex-preemptive.go index 27f4646698..ec83a6135d 100644 --- a/src/internal/task/mutex-preemptive.go +++ b/src/internal/task/mutex-preemptive.go @@ -1,4 +1,4 @@ -//go:build scheduler.threads +//go:build !tinygo.unicore package task diff --git a/src/internal/task/pmutex-cooperative.go b/src/internal/task/pmutex-cooperative.go index 0e6c4f828b..b61e92d829 100644 --- a/src/internal/task/pmutex-cooperative.go +++ b/src/internal/task/pmutex-cooperative.go @@ -1,4 +1,4 @@ -//go:build !scheduler.threads +//go:build tinygo.unicore package task diff --git a/src/internal/task/pmutex-preemptive.go b/src/internal/task/pmutex-preemptive.go index 10f0a63561..92263ed256 100644 --- a/src/internal/task/pmutex-preemptive.go +++ b/src/internal/task/pmutex-preemptive.go @@ -1,4 +1,4 @@ -//go:build scheduler.threads +//go:build !tinygo.unicore package task diff --git a/src/internal/task/task.go b/src/internal/task/task.go index 546f5ba117..aa563f6389 100644 --- a/src/internal/task/task.go +++ b/src/internal/task/task.go @@ -53,3 +53,11 @@ func runtime_alloc(size uintptr, layout unsafe.Pointer) unsafe.Pointer //go:linkname scheduleTask runtime.scheduleTask func scheduleTask(*Task) + +//go:linkname runtimePanic runtime.runtimePanic +func runtimePanic(str string) + +// Stack canary, to detect a stack overflow. The number is a random number +// generated by random.org. The bit fiddling dance is necessary because +// otherwise Go wouldn't allow the cast to a smaller integer size. +const stackCanary = uintptr(uint64(0x670c1333b83bf575) & uint64(^uintptr(0))) diff --git a/src/internal/task/task_asyncify.go b/src/internal/task/task_asyncify.go index 637a6b2237..bc4197995d 100644 --- a/src/internal/task/task_asyncify.go +++ b/src/internal/task/task_asyncify.go @@ -6,14 +6,6 @@ import ( "unsafe" ) -// Stack canary, to detect a stack overflow. The number is a random number -// generated by random.org. The bit fiddling dance is necessary because -// otherwise Go wouldn't allow the cast to a smaller integer size. -const stackCanary = uintptr(uint64(0x670c1333b83bf575) & uint64(^uintptr(0))) - -//go:linkname runtimePanic runtime.runtimePanic -func runtimePanic(str string) - // state is a structure which holds a reference to the state of the task. // When the task is suspended, the stack pointers are saved here. type state struct { diff --git a/src/internal/task/task_cores.c b/src/internal/task/task_cores.c new file mode 100644 index 0000000000..ae3f203bb1 --- /dev/null +++ b/src/internal/task/task_cores.c @@ -0,0 +1,50 @@ +//go:build scheduler.cores + +#include + +__attribute__((naked)) +void tinygo_cores_startTask(void) { + asm volatile( + "bl tinygo_schedulerUnlock\n\t" + + "ldr r0, =tinygo_exitTask\n\t" + "mov lr, r0\n\t" + + "pop {r0, pc}\n\t" + ); +} + +void tinygo_switchTask(uintptr_t *oldStack, uintptr_t newStack) { +#if defined(__thumb__) + register uintptr_t *oldStackReg asm("r0"); + oldStackReg = oldStack; + register uintptr_t newStackReg asm("r1"); + newStackReg = newStack; + asm volatile( + // Push PC to switch back to. + // Note: adding 1 to set the Thumb bit. + "ldr r2, =1f+1\n\t" + "push {r2}\n\t" + + // Save stack pointer in oldStack for the switch back. + "mov r2, sp\n\t" + "str r2, [%[oldStack]]\n\t" + + // Switch to the new stack. + "mov sp, %[newStack]\n\t" + + // Return into the new stack. + "pop {pc}\n\t" + + // address where we should resume + "1:" + + : [oldStack]"+r"(oldStackReg), + [newStack]"+r"(newStackReg) + : + : "r2", "r3", "r4", "r5", "r6", "r7", "r8", "r9", "r10", "r11", "r12", "lr", "cc", "memory" + ); +#else + #error unknown architecture +#endif +} diff --git a/src/internal/task/task_cores.go b/src/internal/task/task_cores.go new file mode 100644 index 0000000000..c4f4b558cf --- /dev/null +++ b/src/internal/task/task_cores.go @@ -0,0 +1,266 @@ +//go:build scheduler.cores + +package task + +import ( + "runtime/interrupt" + "unsafe" +) + +import "C" // dummy import, to make sure task_cores.c is included in the build + +type runState uint8 + +const ( + runStateRunning runState = iota + runStateResuming + runStatePaused +) + +type state struct { + // Which state the task is currently in. + // The state is protected by the scheduler lock, and must only be + // read/modified with that lock held. + runState runState + + // The stack pointer while the task is switched away. + sp unsafe.Pointer + + // canaryPtr points to the top word of the stack (the lowest address). + // This is used to detect stack overflows. + // When initializing the goroutine, the stackCanary constant is stored there. + // If the stack overflowed, the word will likely no longer equal stackCanary. + canaryPtr *uintptr +} + +var ( + runQueue *Task + sleepQueue *Task +) + +//go:linkname runtimeCurrentTask runtime.currentTask +func runtimeCurrentTask() *Task + +// Current returns the current task, or nil if we're in the scheduler. +func Current() *Task { + return runtimeCurrentTask() +} + +func Init(mainTask *Task, canaryPtr *uintptr) { + // The topmost word of the default stack is used as a stack canary. + *canaryPtr = stackCanary + mainTask.state.canaryPtr = canaryPtr +} + +func Pause() { + // Check whether the canary (the lowest address of the stack) is still + // valid. If it is not, a stack overflow has occurred. + if *Current().state.canaryPtr != stackCanary { + runtimePanic("goroutine stack overflow") + } + if interrupt.In() { + runtimePanic("blocked inside interrupt") + } + + // Note: Pause() must be called with the scheduler lock locked! + schedulerLock() + pauseLocked() + schedulerUnlock() +} + +var schedulerIsRunning bool + +func pauseLocked() { + t := Current() + for { + if t.state.runState == runStateResuming { + t.state.runState = runStateRunning + return + } + + // Make sure only one core is calling sleepTicks etc. + if schedulerIsRunning { + schedulerUnlock() + waitForEvents() + schedulerLock() + continue + } + + if runnable := runQueue; runnable != nil { + // Resume it now. + runQueue = runQueue.Next + runnable.Next = nil + if t == runnable { + // We're actually the task that's supposed to be resumed, so we + // are ready! + } else { + // It's not us that's ready, so switch to this other task. + setCurrentTask(runnable) + t.state.runState = runStatePaused + + // Switch away! + switchTask(&t.state.sp, runnable.state.sp) + + // We got back from the switch, so another task resumed us. + t.state.runState = runStateRunning + } + return + } + + // Check whether there's a sleeping task that is ready to run. + if sleepingTask := sleepQueue; sleepingTask != nil { + now := runtimeTicks() + if now >= sleepingTask.Data { + // This task is done sleeping. + // Resume it now. + sleepQueue = sleepQueue.Next + sleepingTask.Next = nil + if t == sleepingTask { + // We're actually the task that's sleeping, so we are ready! + } else { + // It's not us that's ready, so switch to this other task. + setCurrentTask(sleepingTask) + t.state.runState = runStatePaused + + // Switch away! + switchTask(&t.state.sp, sleepingTask.state.sp) + + // We got back from the switch, so another task resumed us. + t.state.runState = runStateRunning + } + return + } else { + // Sleep for a bit until the next task is ready to run. + schedulerIsRunning = true + schedulerUnlock() + delay := sleepingTask.Data - now + runtimeSleepTicks(delay) + schedulerLock() + schedulerIsRunning = false + continue + } + } + } +} + +func start(fn uintptr, args unsafe.Pointer, stackSize uintptr) { + t := &Task{} + stack := runtime_alloc(stackSize, nil) + stackTop := unsafe.Add(stack, stackSize-16) + topRegs := unsafe.Slice((*uintptr)(stackTop), 4) + topRegs[0] = uintptr(unsafe.Pointer(&startTask)) + topRegs[1] = uintptr(args) + topRegs[2] = fn + t.state.sp = stackTop + + canaryPtr := (*uintptr)(stack) + *canaryPtr = stackCanary + t.state.canaryPtr = canaryPtr + + schedulerLock() + addToRunqueue(t) + schedulerUnlock() +} + +func GCScan() { + panic("todo: task.GCScan") +} + +func StackTop() uintptr { + println("todo: task.StackTop") + for { + } +} + +func Sleep(wakeup uint64) { + schedulerLock() + addSleepTask(Current(), wakeup) + pauseLocked() + schedulerUnlock() +} + +func Resume(t *Task) { + schedulerLock() + switch t.state.runState { + case runStatePaused: + // Paused, state is saved on the stack. + addToRunqueue(t) + case runStateRunning: + // Going to pause soon, so let the Pause() function know it can resume + // immediately. + t.state.runState = runStateResuming + default: + println("unknown run state??") + for { + } + } + schedulerUnlock() +} + +// May only be called with the scheduler lock held! +func addToRunqueue(t *Task) { + t.Next = runQueue + runQueue = t +} + +func addSleepTask(t *Task, wakeup uint64) { + // Save the timestamp when the task should be woken up. + t.Data = wakeup + + // Find the position where we should insert this task in the queue. + q := &sleepQueue + for { + if *q == nil { + // Found the end of the time queue. Insert it here, at the end. + break + } + if (*q).Data > t.Data { + // Found a task in the queue that has a timeout before the + // to-be-sleeping task. Insert our task right before. + break + } + q = &(*q).Next + } + + // Insert the task into the queue (this could be at the end, if *q is nil). + t.Next = *q + *q = t +} + +//go:linkname schedulerLock runtime.schedulerLock +func schedulerLock() + +//go:linkname schedulerUnlock runtime.schedulerUnlock +func schedulerUnlock() + +//go:linkname runtimeTicks runtime.runtimeTicks +func runtimeTicks() uint64 + +//go:linkname runtimeSleepTicks runtime.runtimeSleepTicks +func runtimeSleepTicks(duration uint64) + +// startTask is a small wrapper function that sets up the first (and only) +// argument to the new goroutine and makes sure it is exited when the goroutine +// finishes. +// +//go:extern tinygo_cores_startTask +var startTask [0]uint8 + +//export tinygo_exitTask +func exitTask() { + Pause() +} + +//export tinygo_schedulerUnlock +func tinygo_schedulerUnlock() { + schedulerUnlock() +} + +//export tinygo_switchTask +func switchTask(oldStack *unsafe.Pointer, newStack unsafe.Pointer) + +//go:linkname waitForEvents runtime.waitForEvents +func waitForEvents() + +//go:linkname setCurrentTask runtime.setCurrentTask +func setCurrentTask(task *Task) diff --git a/src/internal/task/task_none.go b/src/internal/task/task_none.go index 280f1c4a81..8b667eec38 100644 --- a/src/internal/task/task_none.go +++ b/src/internal/task/task_none.go @@ -7,9 +7,6 @@ import "unsafe" // There is only one goroutine so the task struct can be a global. var mainTask Task -//go:linkname runtimePanic runtime.runtimePanic -func runtimePanic(str string) - func Pause() { runtimePanic("scheduler is disabled") } diff --git a/src/internal/task/task_stack.go b/src/internal/task/task_stack.go index 88a0970685..dc846107ad 100644 --- a/src/internal/task/task_stack.go +++ b/src/internal/task/task_stack.go @@ -7,14 +7,6 @@ import ( "unsafe" ) -//go:linkname runtimePanic runtime.runtimePanic -func runtimePanic(str string) - -// Stack canary, to detect a stack overflow. The number is a random number -// generated by random.org. The bit fiddling dance is necessary because -// otherwise Go wouldn't allow the cast to a smaller integer size. -const stackCanary = uintptr(uint64(0x670c1333b83bf575) & uint64(^uintptr(0))) - // state is a structure which holds a reference to the state of the task. // When the task is suspended, the registers are stored onto the stack and the stack pointer is stored into sp. type state struct { diff --git a/src/internal/task/task_threads.go b/src/internal/task/task_threads.go index c3531ef9c1..327b2df46e 100644 --- a/src/internal/task/task_threads.go +++ b/src/internal/task/task_threads.go @@ -244,9 +244,6 @@ func StackTop() uintptr { return Current().state.stackTop } -//go:linkname runtimePanic runtime.runtimePanic -func runtimePanic(msg string) - // Using //go:linkname instead of //export so that we don't tell the compiler // that the 't' parameter won't escape (because it will). // diff --git a/src/runtime/atomics_critical.go b/src/runtime/atomics_critical.go index 2d98881a10..edc6a82a3d 100644 --- a/src/runtime/atomics_critical.go +++ b/src/runtime/atomics_critical.go @@ -6,7 +6,6 @@ package runtime import ( - "runtime/interrupt" _ "unsafe" ) @@ -23,27 +22,27 @@ import ( func __atomic_load_2(ptr *uint16, ordering uintptr) uint16 { // The LLVM docs for this say that there is a val argument after the pointer. // That is a typo, and the GCC docs omit it. - mask := interrupt.Disable() + mask := atomicLock() val := *ptr - interrupt.Restore(mask) + atomicUnlock(mask) return val } //export __atomic_store_2 func __atomic_store_2(ptr *uint16, val uint16, ordering uintptr) { - mask := interrupt.Disable() + mask := atomicLock() *ptr = val - interrupt.Restore(mask) + atomicUnlock(mask) } //go:inline func doAtomicCAS16(ptr *uint16, expected, desired uint16) uint16 { - mask := interrupt.Disable() + mask := atomicLock() old := *ptr if old == expected { *ptr = desired } - interrupt.Restore(mask) + atomicUnlock(mask) return old } @@ -61,10 +60,10 @@ func __atomic_compare_exchange_2(ptr, expected *uint16, desired uint16, successO //go:inline func doAtomicSwap16(ptr *uint16, new uint16) uint16 { - mask := interrupt.Disable() + mask := atomicLock() old := *ptr *ptr = new - interrupt.Restore(mask) + atomicUnlock(mask) return old } @@ -80,11 +79,11 @@ func __atomic_exchange_2(ptr *uint16, new uint16, ordering uintptr) uint16 { //go:inline func doAtomicAdd16(ptr *uint16, value uint16) (old, new uint16) { - mask := interrupt.Disable() + mask := atomicLock() old = *ptr new = old + value *ptr = new - interrupt.Restore(mask) + atomicUnlock(mask) return old, new } @@ -112,27 +111,27 @@ func __atomic_add_fetch_2(ptr *uint16, value uint16, ordering uintptr) uint16 { func __atomic_load_4(ptr *uint32, ordering uintptr) uint32 { // The LLVM docs for this say that there is a val argument after the pointer. // That is a typo, and the GCC docs omit it. - mask := interrupt.Disable() + mask := atomicLock() val := *ptr - interrupt.Restore(mask) + atomicUnlock(mask) return val } //export __atomic_store_4 func __atomic_store_4(ptr *uint32, val uint32, ordering uintptr) { - mask := interrupt.Disable() + mask := atomicLock() *ptr = val - interrupt.Restore(mask) + atomicUnlock(mask) } //go:inline func doAtomicCAS32(ptr *uint32, expected, desired uint32) uint32 { - mask := interrupt.Disable() + mask := atomicLock() old := *ptr if old == expected { *ptr = desired } - interrupt.Restore(mask) + atomicUnlock(mask) return old } @@ -150,10 +149,10 @@ func __atomic_compare_exchange_4(ptr, expected *uint32, desired uint32, successO //go:inline func doAtomicSwap32(ptr *uint32, new uint32) uint32 { - mask := interrupt.Disable() + mask := atomicLock() old := *ptr *ptr = new - interrupt.Restore(mask) + atomicUnlock(mask) return old } @@ -169,11 +168,11 @@ func __atomic_exchange_4(ptr *uint32, new uint32, ordering uintptr) uint32 { //go:inline func doAtomicAdd32(ptr *uint32, value uint32) (old, new uint32) { - mask := interrupt.Disable() + mask := atomicLock() old = *ptr new = old + value *ptr = new - interrupt.Restore(mask) + atomicUnlock(mask) return old, new } @@ -201,27 +200,27 @@ func __atomic_add_fetch_4(ptr *uint32, value uint32, ordering uintptr) uint32 { func __atomic_load_8(ptr *uint64, ordering uintptr) uint64 { // The LLVM docs for this say that there is a val argument after the pointer. // That is a typo, and the GCC docs omit it. - mask := interrupt.Disable() + mask := atomicLock() val := *ptr - interrupt.Restore(mask) + atomicUnlock(mask) return val } //export __atomic_store_8 func __atomic_store_8(ptr *uint64, val uint64, ordering uintptr) { - mask := interrupt.Disable() + mask := atomicLock() *ptr = val - interrupt.Restore(mask) + atomicUnlock(mask) } //go:inline func doAtomicCAS64(ptr *uint64, expected, desired uint64) uint64 { - mask := interrupt.Disable() + mask := atomicLock() old := *ptr if old == expected { *ptr = desired } - interrupt.Restore(mask) + atomicUnlock(mask) return old } @@ -239,10 +238,10 @@ func __atomic_compare_exchange_8(ptr, expected *uint64, desired uint64, successO //go:inline func doAtomicSwap64(ptr *uint64, new uint64) uint64 { - mask := interrupt.Disable() + mask := atomicLock() old := *ptr *ptr = new - interrupt.Restore(mask) + atomicUnlock(mask) return old } @@ -258,11 +257,11 @@ func __atomic_exchange_8(ptr *uint64, new uint64, ordering uintptr) uint64 { //go:inline func doAtomicAdd64(ptr *uint64, value uint64) (old, new uint64) { - mask := interrupt.Disable() + mask := atomicLock() old = *ptr new = old + value *ptr = new - interrupt.Restore(mask) + atomicUnlock(mask) return old, new } diff --git a/src/runtime/baremetal-multicore.go b/src/runtime/baremetal-multicore.go new file mode 100644 index 0000000000..8395d8f735 --- /dev/null +++ b/src/runtime/baremetal-multicore.go @@ -0,0 +1,13 @@ +//go:build baremetal && !tinygo.unicore + +package runtime + +import "runtime/interrupt" + +func atomicLock() interrupt.State { + return atomicLockImpl() +} + +func atomicUnlock(mask interrupt.State) { + atomicUnlockImpl(mask) +} diff --git a/src/runtime/baremetal-unicore.go b/src/runtime/baremetal-unicore.go new file mode 100644 index 0000000000..17b8f10212 --- /dev/null +++ b/src/runtime/baremetal-unicore.go @@ -0,0 +1,13 @@ +//go:build baremetal && tinygo.unicore + +package runtime + +import "runtime/interrupt" + +func atomicLock() interrupt.State { + return interrupt.Disable() +} + +func atomicUnlock(mask interrupt.State) { + interrupt.Restore(mask) +} diff --git a/src/runtime/runtime_rp2040.go b/src/runtime/runtime_rp2040.go index 1d36a771e5..c55f0b787d 100644 --- a/src/runtime/runtime_rp2040.go +++ b/src/runtime/runtime_rp2040.go @@ -4,8 +4,12 @@ package runtime import ( "device/arm" + "device/rp" "machine" "machine/usb/cdc" + "reflect" + "runtime/interrupt" + "unsafe" ) // machineTicks is provided by package machine. @@ -16,6 +20,8 @@ func machineLightSleep(uint64) type timeUnit int64 +const numCPU = 2 + // ticks returns the number of ticks (microseconds) elapsed since power up. func ticks() timeUnit { t := machineTicks() @@ -50,14 +56,18 @@ func waitForEvents() { } func putchar(c byte) { + mask := serialLock() machine.Serial.WriteByte(c) + serialUnlock(mask) } func getchar() byte { + mask := serialLock() for machine.Serial.Buffered() == 0 { Gosched() } v, _ := machine.Serial.ReadByte() + serialUnlock(mask) return v } @@ -71,9 +81,11 @@ func machineInit() func init() { machineInit() + mask := serialLock() cdc.EnableUSBCDC() machine.USBDev.Configure(machine.UARTConfig{}) machine.InitSerial() + serialUnlock(mask) } //export Reset_Handler @@ -82,3 +94,156 @@ func main() { run() exit(0) } + +func multicore_fifo_rvalid() bool { + return rp.SIO.FIFO_ST.Get()&rp.SIO_FIFO_ST_VLD != 0 +} + +func multicore_fifo_wready() bool { + return rp.SIO.FIFO_ST.Get()&rp.SIO_FIFO_ST_RDY != 0 +} + +func multicore_fifo_drain() { + for multicore_fifo_rvalid() { + rp.SIO.FIFO_RD.Get() + } +} + +func multicore_fifo_push_blocking(data uint32) { + for !multicore_fifo_wready() { + } + rp.SIO.FIFO_WR.Set(data) + arm.Asm("sev") +} + +func multicore_fifo_pop_blocking() uint32 { + for !multicore_fifo_rvalid() { + arm.Asm("wfe") + } + + return rp.SIO.FIFO_RD.Get() +} + +//go:extern __isr_vector +var __isr_vector [0]uint32 + +//go:extern _stack1_top +var _stack1_top [0]uint32 + +var core1StartSequence = [...]uint32{ + 0, 0, 1, + uint32(uintptr(unsafe.Pointer(&__isr_vector))), + uint32(uintptr(unsafe.Pointer(&_stack1_top))), + uint32(uintptr(reflect.ValueOf(runCore1).Pointer())), +} + +func startOtherCores() { + // Start the second core of the RP2040. + // See section 2.8.2 in the datasheet. + seq := 0 + for { + cmd := core1StartSequence[seq] + if cmd == 0 { + multicore_fifo_drain() + arm.Asm("sev") + } + multicore_fifo_push_blocking(cmd) + response := multicore_fifo_pop_blocking() + if cmd != response { + seq = 0 + continue + } + seq = seq + 1 + if seq >= len(core1StartSequence) { + break + } + } +} + +func runCore1() { + // Just blink a LED to show that this core is running. + // TODO: use a real scheduler. + led := machine.GP0 + led.Configure(machine.PinConfig{Mode: machine.PinOutput}) + const cycles = 7000_000 + for { + for i := 0; i < cycles; i++ { + led.Low() + } + + for i := 0; i < cycles; i++ { + led.High() + } + } +} + +func currentCPU() uint32 { + return rp.SIO.CPUID.Get() +} + +const ( + spinlockAtomic = iota + spinlockFutex + spinlockScheduler +) + +func atomicLockImpl() interrupt.State { + mask := interrupt.Disable() + for rp.SIO.SPINLOCK0.Get() == 0 { + } + return mask +} + +func atomicUnlockImpl(mask interrupt.State) { + rp.SIO.SPINLOCK0.Set(0) + interrupt.Restore(mask) +} + +func futexLock() interrupt.State { + // Disable interrupts. + // This is necessary since we might do some futex operations (like Wake) + // inside an interrupt and we don't want to deadlock with a non-interrupt + // goroutine that has taken the spinlock at the same time. + mask := interrupt.Disable() + + // Acquire the spinlock. + for rp.SIO.SPINLOCK1.Get() == 0 { + // Spin, until the lock is released. + } + + return mask +} + +func futexUnlock(mask interrupt.State) { + // Release the spinlock. + rp.SIO.SPINLOCK1.Set(0) + + // Restore interrupts. + interrupt.Restore(mask) +} + +var schedulerLockMasks [numCPU]interrupt.State + +// WARNING: doesn't check for deadlocks! +func schedulerLock() { + //schedulerLockMasks[currentCPU()] = interrupt.Disable() + for rp.SIO.SPINLOCK2.Get() == 0 { + } +} + +func schedulerUnlock() { + rp.SIO.SPINLOCK2.Set(0) + //interrupt.Restore(schedulerLockMasks[currentCPU()]) +} + +func serialLock() interrupt.State { + mask := interrupt.Disable() + for rp.SIO.SPINLOCK3.Get() == 0 { + } + return mask +} + +func serialUnlock(mask interrupt.State) { + rp.SIO.SPINLOCK3.Set(0) + interrupt.Restore(mask) +} diff --git a/src/runtime/scheduler_cores.go b/src/runtime/scheduler_cores.go new file mode 100644 index 0000000000..9216cebacc --- /dev/null +++ b/src/runtime/scheduler_cores.go @@ -0,0 +1,91 @@ +//go:build scheduler.cores + +package runtime + +import ( + "internal/task" + "unsafe" +) + +const hasScheduler = true + +const hasParallelism = true + +var ( + mainTask task.Task + cpuTasks [numCPU]*task.Task +) + +func deadlock() { + // Call yield without requesting a wakeup. + task.Pause() + trap() +} + +func scheduleTask(t *task.Task) { + task.Resume(t) +} + +func Gosched() { + // TODO +} + +// NumCPU returns the number of logical CPUs usable by the current process. +func NumCPU() int { + // Return the hardcoded number of physical CPU cores. + return numCPU +} + +func addTimer(tn *timerNode) { + runtimePanic("todo: timers") +} + +func removeTimer(t *timer) bool { + runtimePanic("todo: timers") + return false +} + +func schedulerRunQueue() *task.Queue { + println("todo: schedulerRunQueue") + for { + } + return nil +} + +// Pause the current task for a given time. +// +//go:linkname sleep time.Sleep +func sleep(duration int64) { + if duration <= 0 { + return + } + + wakeup := ticks() + nanosecondsToTicks(duration) + task.Sleep(uint64(wakeup)) +} + +func run() { + initHeap() + cpuTasks[0] = &mainTask + task.Init(&mainTask, (*uintptr)(unsafe.Pointer(&stackTopSymbol))) + initAll() + startOtherCores() + callMain() + mainExited = true +} + +func currentTask() *task.Task { + return cpuTasks[currentCPU()] +} + +func setCurrentTask(task *task.Task) { + cpuTasks[currentCPU()] = task +} + +func runtimeTicks() uint64 { + return uint64(ticks()) +} + +func runtimeSleepTicks(delay uint64) { + sleepTicks(timeUnit(delay)) +} diff --git a/targets/arm.ld b/targets/arm.ld index cdf5b1dd43..631ec9cab2 100644 --- a/targets/arm.ld +++ b/targets/arm.ld @@ -32,6 +32,15 @@ SECTIONS _stack_top = .; } >RAM + /* Stack for second core (core 1), if there is one. This memory area won't be + * reserved if there is no second core. */ + .stack1 (NOLOAD) : + { + . = ALIGN(4); + . += _stack_size; + _stack1_top = .; + } >RAM + /* Start address (in flash) of .data, used by startup code. */ _sidata = LOADADDR(.data); diff --git a/tools/gen-critical-atomics/gen-critical-atomics.go b/tools/gen-critical-atomics/gen-critical-atomics.go index 75ea327076..f949a7933a 100644 --- a/tools/gen-critical-atomics/gen-critical-atomics.go +++ b/tools/gen-critical-atomics/gen-critical-atomics.go @@ -26,7 +26,6 @@ package runtime import ( _ "unsafe" - "runtime/interrupt" ) // Documentation: @@ -41,29 +40,29 @@ import ( func __atomic_load_{{.}}(ptr *uint{{$bits}}, ordering uintptr) uint{{$bits}} { // The LLVM docs for this say that there is a val argument after the pointer. // That is a typo, and the GCC docs omit it. - mask := interrupt.Disable() + mask := atomicLock() val := *ptr - interrupt.Restore(mask) + atomicUnlock(mask) return val } {{end}} {{- define "store"}}{{$bits := mul . 8 -}} //export __atomic_store_{{.}} func __atomic_store_{{.}}(ptr *uint{{$bits}}, val uint{{$bits}}, ordering uintptr) { - mask := interrupt.Disable() + mask := atomicLock() *ptr = val - interrupt.Restore(mask) + atomicUnlock(mask) } {{end}} {{- define "cas"}}{{$bits := mul . 8 -}} //go:inline func doAtomicCAS{{$bits}}(ptr *uint{{$bits}}, expected, desired uint{{$bits}}) uint{{$bits}} { - mask := interrupt.Disable() + mask := atomicLock() old := *ptr if old == expected { *ptr = desired } - interrupt.Restore(mask) + atomicUnlock(mask) return old } @@ -82,10 +81,10 @@ func __atomic_compare_exchange_{{.}}(ptr, expected *uint{{$bits}}, desired uint{ {{- define "swap"}}{{$bits := mul . 8 -}} //go:inline func doAtomicSwap{{$bits}}(ptr *uint{{$bits}}, new uint{{$bits}}) uint{{$bits}} { - mask := interrupt.Disable() + mask := atomicLock() old := *ptr *ptr = new - interrupt.Restore(mask) + atomicUnlock(mask) return old } @@ -111,11 +110,11 @@ func __atomic_exchange_{{.}}(ptr *uint{{$bits}}, new uint{{$bits}}, ordering uin //go:inline func {{$opfn}}(ptr *{{$type}}, value {{$type}}) (old, new {{$type}}) { - mask := interrupt.Disable() + mask := atomicLock() old = *ptr {{$opdef}} *ptr = new - interrupt.Restore(mask) + atomicUnlock(mask) return old, new } From ea2f0e444dfc9739d240dd306b253a4023f8df1e Mon Sep 17 00:00:00 2001 From: Ayke van Laethem Date: Tue, 1 Apr 2025 09:16:35 +0200 Subject: [PATCH 9/9] WIP stashed changes --- src/internal/task/task.go | 8 + src/internal/task/task_cores.c | 50 ----- src/internal/task/task_cores.go | 266 ------------------------ src/internal/task/task_stack.go | 40 ++-- src/internal/task/task_stack_cortexm.S | 2 +- src/internal/task/task_stack_cortexm.go | 2 +- src/runtime/gc_stack_raw.go | 2 +- src/runtime/print.go | 4 + src/runtime/runtime_nrf.go | 30 +++ src/runtime/runtime_rp2040.go | 55 +++-- src/runtime/runtime_unix.go | 6 + src/runtime/scheduler_cores.go | 191 ++++++++++++++++- testdata/channel.go | 1 + testdata/goroutines.go | 20 +- 14 files changed, 312 insertions(+), 365 deletions(-) delete mode 100644 src/internal/task/task_cores.c delete mode 100644 src/internal/task/task_cores.go diff --git a/src/internal/task/task.go b/src/internal/task/task.go index aa563f6389..3f2a36cda9 100644 --- a/src/internal/task/task.go +++ b/src/internal/task/task.go @@ -21,11 +21,19 @@ type Task struct { // state is the underlying running state of the task. state state + RunState uint8 + // DeferFrame stores a pointer to the (stack allocated) defer frame of the // goroutine that is used for the recover builtin. DeferFrame unsafe.Pointer } +const ( + RunStatePaused = iota + RunStateRunning + RunStateResuming +) + // DataUint32 returns the Data field as a uint32. The value is only valid after // setting it through SetDataUint32 or by storing to it using DataAtomicUint32. func (t *Task) DataUint32() uint32 { diff --git a/src/internal/task/task_cores.c b/src/internal/task/task_cores.c deleted file mode 100644 index ae3f203bb1..0000000000 --- a/src/internal/task/task_cores.c +++ /dev/null @@ -1,50 +0,0 @@ -//go:build scheduler.cores - -#include - -__attribute__((naked)) -void tinygo_cores_startTask(void) { - asm volatile( - "bl tinygo_schedulerUnlock\n\t" - - "ldr r0, =tinygo_exitTask\n\t" - "mov lr, r0\n\t" - - "pop {r0, pc}\n\t" - ); -} - -void tinygo_switchTask(uintptr_t *oldStack, uintptr_t newStack) { -#if defined(__thumb__) - register uintptr_t *oldStackReg asm("r0"); - oldStackReg = oldStack; - register uintptr_t newStackReg asm("r1"); - newStackReg = newStack; - asm volatile( - // Push PC to switch back to. - // Note: adding 1 to set the Thumb bit. - "ldr r2, =1f+1\n\t" - "push {r2}\n\t" - - // Save stack pointer in oldStack for the switch back. - "mov r2, sp\n\t" - "str r2, [%[oldStack]]\n\t" - - // Switch to the new stack. - "mov sp, %[newStack]\n\t" - - // Return into the new stack. - "pop {pc}\n\t" - - // address where we should resume - "1:" - - : [oldStack]"+r"(oldStackReg), - [newStack]"+r"(newStackReg) - : - : "r2", "r3", "r4", "r5", "r6", "r7", "r8", "r9", "r10", "r11", "r12", "lr", "cc", "memory" - ); -#else - #error unknown architecture -#endif -} diff --git a/src/internal/task/task_cores.go b/src/internal/task/task_cores.go deleted file mode 100644 index c4f4b558cf..0000000000 --- a/src/internal/task/task_cores.go +++ /dev/null @@ -1,266 +0,0 @@ -//go:build scheduler.cores - -package task - -import ( - "runtime/interrupt" - "unsafe" -) - -import "C" // dummy import, to make sure task_cores.c is included in the build - -type runState uint8 - -const ( - runStateRunning runState = iota - runStateResuming - runStatePaused -) - -type state struct { - // Which state the task is currently in. - // The state is protected by the scheduler lock, and must only be - // read/modified with that lock held. - runState runState - - // The stack pointer while the task is switched away. - sp unsafe.Pointer - - // canaryPtr points to the top word of the stack (the lowest address). - // This is used to detect stack overflows. - // When initializing the goroutine, the stackCanary constant is stored there. - // If the stack overflowed, the word will likely no longer equal stackCanary. - canaryPtr *uintptr -} - -var ( - runQueue *Task - sleepQueue *Task -) - -//go:linkname runtimeCurrentTask runtime.currentTask -func runtimeCurrentTask() *Task - -// Current returns the current task, or nil if we're in the scheduler. -func Current() *Task { - return runtimeCurrentTask() -} - -func Init(mainTask *Task, canaryPtr *uintptr) { - // The topmost word of the default stack is used as a stack canary. - *canaryPtr = stackCanary - mainTask.state.canaryPtr = canaryPtr -} - -func Pause() { - // Check whether the canary (the lowest address of the stack) is still - // valid. If it is not, a stack overflow has occurred. - if *Current().state.canaryPtr != stackCanary { - runtimePanic("goroutine stack overflow") - } - if interrupt.In() { - runtimePanic("blocked inside interrupt") - } - - // Note: Pause() must be called with the scheduler lock locked! - schedulerLock() - pauseLocked() - schedulerUnlock() -} - -var schedulerIsRunning bool - -func pauseLocked() { - t := Current() - for { - if t.state.runState == runStateResuming { - t.state.runState = runStateRunning - return - } - - // Make sure only one core is calling sleepTicks etc. - if schedulerIsRunning { - schedulerUnlock() - waitForEvents() - schedulerLock() - continue - } - - if runnable := runQueue; runnable != nil { - // Resume it now. - runQueue = runQueue.Next - runnable.Next = nil - if t == runnable { - // We're actually the task that's supposed to be resumed, so we - // are ready! - } else { - // It's not us that's ready, so switch to this other task. - setCurrentTask(runnable) - t.state.runState = runStatePaused - - // Switch away! - switchTask(&t.state.sp, runnable.state.sp) - - // We got back from the switch, so another task resumed us. - t.state.runState = runStateRunning - } - return - } - - // Check whether there's a sleeping task that is ready to run. - if sleepingTask := sleepQueue; sleepingTask != nil { - now := runtimeTicks() - if now >= sleepingTask.Data { - // This task is done sleeping. - // Resume it now. - sleepQueue = sleepQueue.Next - sleepingTask.Next = nil - if t == sleepingTask { - // We're actually the task that's sleeping, so we are ready! - } else { - // It's not us that's ready, so switch to this other task. - setCurrentTask(sleepingTask) - t.state.runState = runStatePaused - - // Switch away! - switchTask(&t.state.sp, sleepingTask.state.sp) - - // We got back from the switch, so another task resumed us. - t.state.runState = runStateRunning - } - return - } else { - // Sleep for a bit until the next task is ready to run. - schedulerIsRunning = true - schedulerUnlock() - delay := sleepingTask.Data - now - runtimeSleepTicks(delay) - schedulerLock() - schedulerIsRunning = false - continue - } - } - } -} - -func start(fn uintptr, args unsafe.Pointer, stackSize uintptr) { - t := &Task{} - stack := runtime_alloc(stackSize, nil) - stackTop := unsafe.Add(stack, stackSize-16) - topRegs := unsafe.Slice((*uintptr)(stackTop), 4) - topRegs[0] = uintptr(unsafe.Pointer(&startTask)) - topRegs[1] = uintptr(args) - topRegs[2] = fn - t.state.sp = stackTop - - canaryPtr := (*uintptr)(stack) - *canaryPtr = stackCanary - t.state.canaryPtr = canaryPtr - - schedulerLock() - addToRunqueue(t) - schedulerUnlock() -} - -func GCScan() { - panic("todo: task.GCScan") -} - -func StackTop() uintptr { - println("todo: task.StackTop") - for { - } -} - -func Sleep(wakeup uint64) { - schedulerLock() - addSleepTask(Current(), wakeup) - pauseLocked() - schedulerUnlock() -} - -func Resume(t *Task) { - schedulerLock() - switch t.state.runState { - case runStatePaused: - // Paused, state is saved on the stack. - addToRunqueue(t) - case runStateRunning: - // Going to pause soon, so let the Pause() function know it can resume - // immediately. - t.state.runState = runStateResuming - default: - println("unknown run state??") - for { - } - } - schedulerUnlock() -} - -// May only be called with the scheduler lock held! -func addToRunqueue(t *Task) { - t.Next = runQueue - runQueue = t -} - -func addSleepTask(t *Task, wakeup uint64) { - // Save the timestamp when the task should be woken up. - t.Data = wakeup - - // Find the position where we should insert this task in the queue. - q := &sleepQueue - for { - if *q == nil { - // Found the end of the time queue. Insert it here, at the end. - break - } - if (*q).Data > t.Data { - // Found a task in the queue that has a timeout before the - // to-be-sleeping task. Insert our task right before. - break - } - q = &(*q).Next - } - - // Insert the task into the queue (this could be at the end, if *q is nil). - t.Next = *q - *q = t -} - -//go:linkname schedulerLock runtime.schedulerLock -func schedulerLock() - -//go:linkname schedulerUnlock runtime.schedulerUnlock -func schedulerUnlock() - -//go:linkname runtimeTicks runtime.runtimeTicks -func runtimeTicks() uint64 - -//go:linkname runtimeSleepTicks runtime.runtimeSleepTicks -func runtimeSleepTicks(duration uint64) - -// startTask is a small wrapper function that sets up the first (and only) -// argument to the new goroutine and makes sure it is exited when the goroutine -// finishes. -// -//go:extern tinygo_cores_startTask -var startTask [0]uint8 - -//export tinygo_exitTask -func exitTask() { - Pause() -} - -//export tinygo_schedulerUnlock -func tinygo_schedulerUnlock() { - schedulerUnlock() -} - -//export tinygo_switchTask -func switchTask(oldStack *unsafe.Pointer, newStack unsafe.Pointer) - -//go:linkname waitForEvents runtime.waitForEvents -func waitForEvents() - -//go:linkname setCurrentTask runtime.setCurrentTask -func setCurrentTask(task *Task) diff --git a/src/internal/task/task_stack.go b/src/internal/task/task_stack.go index dc846107ad..cbba8e3807 100644 --- a/src/internal/task/task_stack.go +++ b/src/internal/task/task_stack.go @@ -1,4 +1,4 @@ -//go:build scheduler.tasks +//go:build scheduler.tasks || scheduler.cores package task @@ -25,40 +25,54 @@ type state struct { } // currentTask is the current running task, or nil if currently in the scheduler. -var currentTask *Task +//var currentTask *Task // Current returns the current active task. -func Current() *Task { - return currentTask +//func Current() *Task { +// return currentTask +//} + +//go:linkname Current runtime.currentTask +func Current() *Task + +//go:linkname schedulerLock runtime.schedulerLock +func schedulerLock() + +func Pause() { + schedulerLock() + PauseLocked() } // Pause suspends the current task and returns to the scheduler. // This function may only be called when running on a goroutine stack, not when running on the system stack or in an interrupt. -func Pause() { +func PauseLocked() { // Check whether the canary (the lowest address of the stack) is still // valid. If it is not, a stack overflow has occurred. - if *currentTask.state.canaryPtr != stackCanary { + if *Current().state.canaryPtr != stackCanary { runtimePanic("goroutine stack overflow") } if interrupt.In() { runtimePanic("blocked inside interrupt") } - currentTask.state.pause() + current := Current() + current.RunState = RunStatePaused + current.state.pause() } -//export tinygo_pause -func pause() { +//export tinygo_task_exit +func taskExit() { + println("-- exiting task") Pause() } // Resume the task until it pauses or completes. // This may only be called from the scheduler. func (t *Task) Resume() { - currentTask = t - t.gcData.swap() + //currentTask = t + //t.gcData.swap() t.state.resume() - t.gcData.swap() - currentTask = nil + //t.gcData.swap() + //currentTask = nil } // initialize the state and prepare to call the specified function with the specified argument bundle. diff --git a/src/internal/task/task_stack_cortexm.S b/src/internal/task/task_stack_cortexm.S index dfe713552d..8e6520106e 100644 --- a/src/internal/task/task_stack_cortexm.S +++ b/src/internal/task/task_stack_cortexm.S @@ -28,7 +28,7 @@ tinygo_startTask: blx r4 // After return, exit this goroutine. This is a tail call. - bl tinygo_pause + bl tinygo_task_exit .cfi_endproc .size tinygo_startTask, .-tinygo_startTask diff --git a/src/internal/task/task_stack_cortexm.go b/src/internal/task/task_stack_cortexm.go index 226a088c88..653dc06e17 100644 --- a/src/internal/task/task_stack_cortexm.go +++ b/src/internal/task/task_stack_cortexm.go @@ -1,4 +1,4 @@ -//go:build scheduler.tasks && cortexm +//go:build (scheduler.tasks || scheduler.cores) && cortexm package task diff --git a/src/runtime/gc_stack_raw.go b/src/runtime/gc_stack_raw.go index bdc3154fa5..f4483e58c5 100644 --- a/src/runtime/gc_stack_raw.go +++ b/src/runtime/gc_stack_raw.go @@ -1,4 +1,4 @@ -//go:build (gc.conservative || gc.precise) && !tinygo.wasm && !scheduler.threads +//go:build (gc.conservative || gc.precise) && !tinygo.wasm && !scheduler.threads && !scheduler.cores package runtime diff --git a/src/runtime/print.go b/src/runtime/print.go index a4de460253..ef5c8a7dcc 100644 --- a/src/runtime/print.go +++ b/src/runtime/print.go @@ -13,11 +13,15 @@ type stringer interface { // This is a no-op lock on systems that do not have parallelism. var printLock task.PMutex +//var printLocked interrupt.State + func printlock() { + //printLocked = serialLock() printLock.Lock() } func printunlock() { + //serialUnlock(printLocked) printLock.Unlock() } diff --git a/src/runtime/runtime_nrf.go b/src/runtime/runtime_nrf.go index 729c6bb20f..30a6cda9cb 100644 --- a/src/runtime/runtime_nrf.go +++ b/src/runtime/runtime_nrf.go @@ -15,6 +15,8 @@ type timeUnit int64 //go:linkname systemInit SystemInit func systemInit() +const numCPU = 1 + //export Reset_Handler func main() { if nrf.FPUPresent { @@ -145,3 +147,31 @@ func rtc_sleep(ticks uint32) { waitForEvents() } } + +func atomicLockImpl() interrupt.State { + mask := interrupt.Disable() + return mask +} + +func atomicUnlockImpl(mask interrupt.State) { + interrupt.Restore(mask) +} + +func futexLock() interrupt.State { + mask := interrupt.Disable() + return mask +} + +func futexUnlock(mask interrupt.State) { + interrupt.Restore(mask) +} + +func schedulerLock() { +} + +func schedulerUnlock() { +} + +func currentCPU() uint32 { + return 0 +} diff --git a/src/runtime/runtime_rp2040.go b/src/runtime/runtime_rp2040.go index c55f0b787d..3c9e8935a0 100644 --- a/src/runtime/runtime_rp2040.go +++ b/src/runtime/runtime_rp2040.go @@ -5,6 +5,7 @@ package runtime import ( "device/arm" "device/rp" + "internal/task" "machine" "machine/usb/cdc" "reflect" @@ -56,9 +57,9 @@ func waitForEvents() { } func putchar(c byte) { - mask := serialLock() + //mask := serialLock() machine.Serial.WriteByte(c) - serialUnlock(mask) + //serialUnlock(mask) } func getchar() byte { @@ -137,7 +138,7 @@ var core1StartSequence = [...]uint32{ uint32(uintptr(reflect.ValueOf(runCore1).Pointer())), } -func startOtherCores() { +func startSecondaryCores() { // Start the second core of the RP2040. // See section 2.8.2 in the datasheet. seq := 0 @@ -160,21 +161,30 @@ func startOtherCores() { } } +var core1Task task.Task + func runCore1() { + //until := ticks() + nanosecondsToTicks(1900e6) + //for ticks() < until { + //} + println("starting core 1") + + runSecondary(1, &core1Task) + // Just blink a LED to show that this core is running. // TODO: use a real scheduler. - led := machine.GP0 - led.Configure(machine.PinConfig{Mode: machine.PinOutput}) - const cycles = 7000_000 - for { - for i := 0; i < cycles; i++ { - led.Low() - } - - for i := 0; i < cycles; i++ { - led.High() - } - } + //led := machine.GP16 + //led.Configure(machine.PinConfig{Mode: machine.PinOutput}) + //const cycles = 7000_000 + //for { + // for i := 0; i < cycles; i++ { + // led.Low() + // } + + // for i := 0; i < cycles; i++ { + // led.High() + // } + //} } func currentCPU() uint32 { @@ -223,27 +233,36 @@ func futexUnlock(mask interrupt.State) { } var schedulerLockMasks [numCPU]interrupt.State +var schedulerLocked = false // WARNING: doesn't check for deadlocks! func schedulerLock() { //schedulerLockMasks[currentCPU()] = interrupt.Disable() for rp.SIO.SPINLOCK2.Get() == 0 { } + schedulerLocked = true } func schedulerUnlock() { + if !schedulerLocked { + println("!!! not locked at unlock") + for { + } + } + schedulerLocked = false rp.SIO.SPINLOCK2.Set(0) //interrupt.Restore(schedulerLockMasks[currentCPU()]) } func serialLock() interrupt.State { - mask := interrupt.Disable() + //mask := interrupt.Disable() for rp.SIO.SPINLOCK3.Get() == 0 { } - return mask + //return mask + return 0 } func serialUnlock(mask interrupt.State) { rp.SIO.SPINLOCK3.Set(0) - interrupt.Restore(mask) + //interrupt.Restore(mask) } diff --git a/src/runtime/runtime_unix.go b/src/runtime/runtime_unix.go index 17e004b2e8..fd71d7c488 100644 --- a/src/runtime/runtime_unix.go +++ b/src/runtime/runtime_unix.go @@ -11,6 +11,12 @@ import ( "unsafe" ) +const numCPU = 1 + +func currentCPU() int { + return 0 +} + //export write func libc_write(fd int32, buf unsafe.Pointer, count uint) int diff --git a/src/runtime/scheduler_cores.go b/src/runtime/scheduler_cores.go index 9216cebacc..4a664acf80 100644 --- a/src/runtime/scheduler_cores.go +++ b/src/runtime/scheduler_cores.go @@ -3,17 +3,21 @@ package runtime import ( + "device/arm" "internal/task" - "unsafe" ) const hasScheduler = true const hasParallelism = true +const coresVerbose = false + var ( - mainTask task.Task - cpuTasks [numCPU]*task.Task + mainTask task.Task + cpuTasks [numCPU]*task.Task + sleepQueue *task.Task + runQueue *task.Task ) func deadlock() { @@ -23,7 +27,60 @@ func deadlock() { } func scheduleTask(t *task.Task) { - task.Resume(t) + schedulerLock() + switch t.RunState { + case task.RunStatePaused: + // Paused, state is saved on the stack. + + if coresVerbose { + println("## schedule: add to runQueue") + } + addToRunQueue(t) + arm.Asm("sev") + case task.RunStateRunning: + // Not yet paused (probably going to pause very soon), so let the + // Pause() function know it can resume immediately. + t.RunState = task.RunStateResuming + if coresVerbose { + println("## schedule: mark as resuming") + } + default: + println("Unknown run state??") + for { + } + } + schedulerUnlock() +} + +// Add task to runQueue. +// Scheduler lock must be held when calling this function. +func addToRunQueue(t *task.Task) { + t.Next = runQueue + runQueue = t +} + +func addSleepTask(t *task.Task, wakeup timeUnit) { + // Save the timestamp when the task should be woken up. + t.Data = uint64(wakeup) + + // Find the position where we should insert this task in the queue. + q := &sleepQueue + for { + if *q == nil { + // Found the end of the time queue. Insert it here, at the end. + break + } + if timeUnit((*q).Data) > timeUnit(t.Data) { + // Found a task in the queue that has a timeout before the + // to-be-sleeping task. Insert our task right before. + break + } + q = &(*q).Next + } + + // Insert the task into the queue (this could be at the end, if *q is nil). + t.Next = *q + *q = t } func Gosched() { @@ -61,17 +118,126 @@ func sleep(duration int64) { } wakeup := ticks() + nanosecondsToTicks(duration) - task.Sleep(uint64(wakeup)) + + schedulerLock() + addSleepTask(task.Current(), wakeup) + task.PauseLocked() } func run() { initHeap() cpuTasks[0] = &mainTask - task.Init(&mainTask, (*uintptr)(unsafe.Pointer(&stackTopSymbol))) - initAll() - startOtherCores() - callMain() - mainExited = true + initAll() // TODO: move into main goroutine! + + until := ticks() + nanosecondsToTicks(200e6) + for ticks() < until { + } + println("\n\n=====") + + go func() { + //initAll() + startSecondaryCores() + callMain() + mainExited = true + }() + schedulerLock() + scheduler() +} + +func runSecondary(core uint32, t *task.Task) { + println("-- runSecondary") + cpuTasks[core] = t + println("-- locking for 2nd core") + schedulerLock() + println("-- locked!") + scheduler() +} + +var schedulerIsRunning = false + +func scheduler() { + if coresVerbose { + println("** scheduler on core:", currentCPU()) + } + for { + //until := ticks() + nanosecondsToTicks(100e6) + //for ticks() < until { + //} + + // Check for ready-to-run tasks. + if runnable := runQueue; runnable != nil { + if coresVerbose { + println("** scheduler", currentCPU(), "run runnable") + } + // Pop off the run queue. + runQueue = runnable.Next + runnable.Next = nil + + // Resume it now. + setCurrentTask(runnable) + schedulerUnlock() + runnable.Resume() + if coresVerbose { + println("** scheduler", currentCPU(), " returned (from runqueue resume)") + } + setCurrentTask(nil) + + continue + } + + // If another core is using the clock, let it handle the sleep queue. + if schedulerIsRunning { + if coresVerbose { + println("** scheduler", currentCPU(), "wait for other core") + } + schedulerUnlock() + waitForEvents() + schedulerLock() + continue + } + + if sleepingTask := sleepQueue; sleepingTask != nil { + now := ticks() + if now >= timeUnit(sleepingTask.Data) { + if coresVerbose { + println("** scheduler", currentCPU(), "run sleeping") + } + // This task is done sleeping. + // Resume it now. + sleepQueue = sleepQueue.Next + sleepingTask.Next = nil + + setCurrentTask(sleepingTask) + schedulerUnlock() + sleepingTask.Resume() + if coresVerbose { + println("** scheduler", currentCPU(), " returned (from sleepQueue resume)") + } + setCurrentTask(nil) + continue + } + + delay := timeUnit(sleepingTask.Data) - now + if coresVerbose { + println("** scheduler", currentCPU(), "sleep", ticksToNanoseconds(delay)/1e6) + } + + // Sleep for a bit until the next task is ready to run. + schedulerIsRunning = true + schedulerUnlock() + sleepTicks(delay) + schedulerLock() + schedulerIsRunning = false + continue + } + + if coresVerbose { + println("** scheduler", currentCPU(), "wait for events") + } + schedulerUnlock() + waitForEvents() + schedulerLock() + } } func currentTask() *task.Task { @@ -89,3 +255,8 @@ func runtimeTicks() uint64 { func runtimeSleepTicks(delay uint64) { sleepTicks(timeUnit(delay)) } + +//export tinygo_schedulerUnlock +func tinygo_schedulerUnlock() { + schedulerUnlock() +} diff --git a/testdata/channel.go b/testdata/channel.go index 9c0fee5b73..a247c808f2 100644 --- a/testdata/channel.go +++ b/testdata/channel.go @@ -12,6 +12,7 @@ var wg sync.WaitGroup type intchan chan int func main() { + time.Sleep(time.Second * 2) ch := make(chan int, 2) ch <- 1 println("len, cap of channel:", len(ch), cap(ch), ch == nil) diff --git a/testdata/goroutines.go b/testdata/goroutines.go index cf19cc3ca0..bed2ecb1a7 100644 --- a/testdata/goroutines.go +++ b/testdata/goroutines.go @@ -5,19 +5,27 @@ import ( "time" ) -func init() { - println("init") - go println("goroutine in init") - time.Sleep(1 * time.Millisecond) -} +//func init() { +// println("\n----") +// println("init") +// go println("goroutine in init") +// time.Sleep(1 * time.Millisecond) +//} func main() { + //for i := 0; i < 2; i++ { + // println("...") + // time.Sleep(time.Second) + //} + println("main 1") go sub() time.Sleep(1 * time.Millisecond) println("main 2") time.Sleep(2 * time.Millisecond) println("main 3") + //time.Sleep(2 * time.Millisecond) + //println("main 4") // Await a blocking call. println("wait:") @@ -101,6 +109,8 @@ func sub() { println("sub 1") time.Sleep(2 * time.Millisecond) println("sub 2") + //time.Sleep(2 * time.Millisecond) + //println("sub 3") } func wait() {