Skip to content

feature: Detect board port change after upload #2253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Simplified port detection using a Future-style abstraction
  • Loading branch information
cmaglie committed Aug 9, 2023
commit f7eef3b4d80308a3d31517d2199996165fd75f13
26 changes: 10 additions & 16 deletions commands/upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"io"
"path/filepath"
"strings"
"sync"
"time"

"github.com/arduino/arduino-cli/arduino"
Expand Down Expand Up @@ -219,17 +218,11 @@ func runProgramAction(pme *packagemanager.Explorer,
if watch != nil {
// Run port detector
uploadCompletedCtx, cancel := context.WithCancel(context.Background())
var newUploadPort *rpc.Port
var wg sync.WaitGroup
wg.Add(1)
go func() {
newUploadPort = detectUploadPort(port, watch, uploadCompletedCtx)
wg.Done()
}()
newUploadPort := f.NewFuture[*rpc.Port]()
go detectUploadPort(port, watch, uploadCompletedCtx, newUploadPort)
uploadCompleted = func() *rpc.Port {
cancel()
wg.Wait()
return newUploadPort
return newUploadPort.Await()
}
defer uploadCompleted() // defer in case of exit on error (ensures goroutine completion)
}
Expand Down Expand Up @@ -522,13 +515,15 @@ func runProgramAction(pme *packagemanager.Explorer,
return uploadCompleted(), nil
}

func detectUploadPort(uploadPort *rpc.Port, watch <-chan *rpc.BoardListWatchResponse, uploadCtx context.Context) *rpc.Port {
func detectUploadPort(uploadPort *rpc.Port, watch <-chan *rpc.BoardListWatchResponse, uploadCtx context.Context, result f.Future[*rpc.Port]) {
log := logrus.WithField("task", "port_detection")
log.Tracef("Detecting new board port after upload")

var candidate *rpc.Port
defer func() {
// On exit, discard all events until the watcher is closed
go f.DiscardCh(watch)
result.Send(candidate)
}()

// Ignore all events during the upload
Expand All @@ -537,7 +532,7 @@ func detectUploadPort(uploadPort *rpc.Port, watch <-chan *rpc.BoardListWatchResp
case ev, ok := <-watch:
if !ok {
log.Error("Upload port detection failed, watcher closed")
return nil
return
}
log.WithField("event", ev).Trace("Ignored watcher event before upload")
continue
Expand All @@ -550,13 +545,12 @@ func detectUploadPort(uploadPort *rpc.Port, watch <-chan *rpc.BoardListWatchResp
// Pick the first port that is detected after the upload
desiredHwID := uploadPort.HardwareId
timeout := time.After(5 * time.Second)
var candidate *rpc.Port
for {
select {
case ev, ok := <-watch:
if !ok {
log.Error("Upload port detection failed, watcher closed")
return candidate
return
}
if ev.EventType == "remove" && candidate != nil {
if candidate.Equals(ev.Port.GetPort()) {
Expand All @@ -580,10 +574,10 @@ func detectUploadPort(uploadPort *rpc.Port, watch <-chan *rpc.BoardListWatchResp
}

log.Trace("Found new upload port!")
return candidate
return
case <-timeout:
log.Trace("Timeout waiting for candidate port")
return candidate
return
}
}
}
Expand Down
33 changes: 33 additions & 0 deletions internal/algorithms/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,41 @@

package f

import "sync"

// DiscardCh consume all incoming messages from the given channel until its closed.
func DiscardCh[T any](ch <-chan T) {
for range ch {
}
}

// Future is an object that holds a result value. The value may be read and
// written asynchronously.
type Future[T any] interface {
Send(T)
Await() T
}

type future[T any] struct {
wg sync.WaitGroup
value T
}

// NewFuture creates a new Future[T]
func NewFuture[T any]() Future[T] {
res := &future[T]{}
res.wg.Add(1)
return res
}

// Send a result in the Future. Threads waiting for result will be unlocked.
func (f *future[T]) Send(value T) {
f.value = value
f.wg.Done()
}

// Await for a result from the Future, blocks until a result is available.
func (f *future[T]) Await() T {
f.wg.Wait()
return f.value
}