Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redo UF2 discovery for Windows compatibility #2853

Merged
merged 2 commits into from
Mar 14, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 74 additions & 66 deletions tools/pluggable_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,89 +5,96 @@
import time
import threading


toolspath = os.path.dirname(os.path.realpath(__file__))
try:
sys.path.insert(0, os.path.join(toolspath, ".")) # Add pyserial dir to search path
sys.path.insert(0, os.path.join(toolspath, ".")) # Add uf2conv dir to search path
import uf2conv # If this fails, we can't continue and will bomb below
except ImportError:
sys.stderr.write("uf2conv not found next to this tool.\n")
sys.exit(1)

scannerStop = threading.Event()
dropDead = False

scannerGo = False
class ScannerDarkly(threading.Thread):

loopTime = 0.0 # Set to 0 for 1st pass to get immediate response for arduino-cli, then bumped to 2.0 for ongoing checks

# https://stackoverflow.com/questions/12435211/threading-timer-repeat-function-every-n-seconds
def __init__(self, event):
threading.Thread.__init__(self)
self.stopped = event

def run(self):
global dropDead
boards = False;
while not self.stopped.wait(self.loopTime):
if self.stopped.is_set() or dropDead:
return
self.loopTime = 2.0
l = uf2conv.get_drives()
if (len(l) > 0) and not boards:
boards = True
print ("""{
"eventType": "add",
"port": {
"address": "UF2_Board",
"label": "UF2 Board",
"protocol": "uf2conv",
"protocolLabel": "UF2 Devices",
"properties": {
"mac": "ffffffffffff",
"pid" : "0x2e8a",
"vid" : "0x000a"
}
}
}""", flush=True)
elif (len(l) == 0) and boards:
boards = False
print("""{
"eventType": "remove",
"port": {
"address": "UF2_Board",
"protocol": "uf2conv"
}
}""", flush = True)

def scanner():
global scannerGo
scannerGo = True
boards = False
while scannerGo:
l = uf2conv.get_drives()
if (len(l) > 0) and scannerGo and not boards:
boards = True
print ("""{
"eventType": "add",
"port": {
"address": "UF2_Board",
"label": "UF2 Board",
"protocol": "uf2conv",
"protocolLabel": "UF2 Devices",
"properties": {
"mac": "ffffffffffff",
"pid" : "0x2e8a",
"vid" : "0x000a"
}
}
}""", flush=True)
elif (len(l) == 0) and scannerGo and boards:
boards = False
print("""{
"eventType": "remove",
"port": {
"address": "UF2_Board",
"protocol": "uf2conv"
}
}""", flush = True)
n = time.time() + 2
while scannerGo and (time.time() < n):
time.sleep(.1)
scannerGo = True

def main():
global scannerGo
while True:
cmdline = input()
cmd = cmdline.split()[0]
if cmd == "HELLO":
print(""" {
global scannerStop
global dropDead
try:
while True:
cmdline = input()
cmd = cmdline.split()[0]
if cmd == "HELLO":
print(""" {
"eventType": "hello",
"message": "OK",
"protocolVersion": 1
}""", flush = True)
elif cmd == "START":
print("""{
elif cmd == "START":
print("""{
"eventType": "start",
"message": "OK"
}""", flush = True);
elif cmd == "STOP":
scannerGo = False
while not scannerGo:
time.sleep(.1)
print("""{
elif cmd == "STOP":
scannerStop.set()
print("""{
"eventType": "stop",
"message": "OK"
}""", flush = True)
elif cmd == "QUIT":
scannerGo = False
print("""{
elif cmd == "QUIT":
scannerStop.set()
print("""{
"eventType": "quit",
"message": "OK"
}""", flush = True)
return
elif cmd == "LIST":
l = uf2conv.get_drives()
if len(l) > 0:
print ("""{
return
elif cmd == "LIST":
l = uf2conv.get_drives()
if len(l) > 0:
print ("""{
"eventType": "list",
"ports": [
{
Expand All @@ -103,18 +110,19 @@ def main():
}
]
}""", flush=True)
else:
print ("""{
else:
print ("""{
"eventType": "list",
"ports": [ ]
}""", flush=True)
elif cmd == "START_SYNC":
print("""{
elif cmd == "START_SYNC":
print("""{
"eventType": "start_sync",
"message": "OK"
}""", flush = True)
scannerGo = True
threading.Thread(target = scanner).start()
time.sleep(.5)
thread = ScannerDarkly(scannerStop)
thread.start()
except:
dropDead = True

main()