forked from commaai/panda
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgps_stability_test.py
executable file
·165 lines (140 loc) · 5.56 KB
/
gps_stability_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#!/usr/bin/env python3
# flake8: noqa
import os
import sys
import time
import random
import threading
sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".."))
from panda import Panda, PandaSerial # noqa: E402
INIT_GPS_BAUD = 9600
GPS_BAUD = 460800
def connect():
pandas = Panda.list()
print(pandas)
# make sure two pandas are connected
if len(pandas) != 2:
print("Connect white and grey/black panda to run this test!")
assert False
# connect
pandas[0] = Panda(pandas[0])
pandas[1] = Panda(pandas[1])
white_panda = None
gps_panda = None
# find out which one is white (for spamming the CAN buses)
if pandas[0].get_type() == Panda.HW_TYPE_WHITE_PANDA and pandas[1].get_type() != Panda.HW_TYPE_WHITE_PANDA:
white_panda = pandas[0]
gps_panda = pandas[1]
elif pandas[0].get_type() != Panda.HW_TYPE_WHITE_PANDA and pandas[1].get_type() == Panda.HW_TYPE_WHITE_PANDA:
white_panda = pandas[1]
gps_panda = pandas[0]
else:
print("Connect white and grey/black panda to run this test!")
assert False
return white_panda, gps_panda
def spam_buses_thread(panda):
try:
panda.set_safety_mode(Panda.SAFETY_ALLOUTPUT)
while True:
at = random.randint(1, 2000)
st = (b"test" + os.urandom(10))[0:8]
bus = random.randint(0, 2)
panda.can_send(at, st, bus)
except Exception as e:
print(e)
def read_can_thread(panda):
try:
while True:
panda.can_recv()
except Exception as e:
print(e)
def init_gps(panda):
def add_nmea_checksum(msg):
d = msg[1:]
cs = 0
for i in d:
cs ^= ord(i)
return msg + "*%02X" % cs
ser = PandaSerial(panda, 1, INIT_GPS_BAUD)
# Power cycle the gps by toggling reset
print("Resetting GPS")
panda.set_esp_power(0)
time.sleep(0.5)
panda.set_esp_power(1)
time.sleep(0.5)
# Upping baud rate
print("Upping GPS baud rate")
msg = str.encode(add_nmea_checksum("$PUBX,41,1,0007,0003,%d,0" % GPS_BAUD) + "\r\n")
ser.write(msg)
time.sleep(1) # needs a wait for it to actually send
# Reconnecting with the correct baud
ser = PandaSerial(panda, 1, GPS_BAUD)
# Sending all config messages boardd sends
print("Sending config")
ser.write(b"\xB5\x62\x06\x00\x14\x00\x03\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\x00\x00\x1E\x7F")
ser.write(b"\xB5\x62\x06\x3E\x00\x00\x44\xD2")
ser.write(b"\xB5\x62\x06\x00\x14\x00\x00\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19\x35")
ser.write(b"\xB5\x62\x06\x00\x14\x00\x01\x00\x00\x00\xC0\x08\x00\x00\x00\x08\x07\x00\x01\x00\x01\x00\x00\x00\x00\x00\xF4\x80")
ser.write(b"\xB5\x62\x06\x00\x14\x00\x04\xFF\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x1D\x85")
ser.write(b"\xB5\x62\x06\x00\x00\x00\x06\x18")
ser.write(b"\xB5\x62\x06\x00\x01\x00\x01\x08\x22")
ser.write(b"\xB5\x62\x06\x00\x01\x00\x02\x09\x23")
ser.write(b"\xB5\x62\x06\x00\x01\x00\x03\x0A\x24")
ser.write(b"\xB5\x62\x06\x08\x06\x00\x64\x00\x01\x00\x00\x00\x79\x10")
ser.write(b"\xB5\x62\x06\x24\x24\x00\x05\x00\x04\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x5A\x63")
ser.write(b"\xB5\x62\x06\x1E\x14\x00\x00\x00\x00\x00\x01\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x3C\x37")
ser.write(b"\xB5\x62\x06\x24\x00\x00\x2A\x84")
ser.write(b"\xB5\x62\x06\x23\x00\x00\x29\x81")
ser.write(b"\xB5\x62\x06\x1E\x00\x00\x24\x72")
ser.write(b"\xB5\x62\x06\x01\x03\x00\x01\x07\x01\x13\x51")
ser.write(b"\xB5\x62\x06\x01\x03\x00\x02\x15\x01\x22\x70")
ser.write(b"\xB5\x62\x06\x01\x03\x00\x02\x13\x01\x20\x6C")
print("Initialized GPS")
received_messages = 0
received_bytes = 0
send_something = False
def gps_read_thread(panda):
global received_messages, received_bytes, send_something
ser = PandaSerial(panda, 1, GPS_BAUD)
while True:
ret = ser.read(1024)
time.sleep(0.001)
if len(ret):
received_messages += 1
received_bytes += len(ret)
if send_something:
ser.write("test")
send_something = False
CHECK_PERIOD = 5
MIN_BYTES = 10000
MAX_BYTES = 50000
min_failures = 0
max_failures = 0
if __name__ == "__main__":
white_panda, gps_panda = connect()
# Start spamming the CAN buses with the white panda. Also read the messages to add load on the GPS panda
threading.Thread(target=spam_buses_thread, args=(white_panda,)).start()
threading.Thread(target=read_can_thread, args=(gps_panda,)).start()
# Start GPS checking
init_gps(gps_panda)
read_thread = threading.Thread(target=gps_read_thread, args=(gps_panda,))
read_thread.start()
while True:
time.sleep(CHECK_PERIOD)
if(received_bytes < MIN_BYTES):
print("Panda is not sending out enough data! Got " + str(received_messages) + " (" + str(received_bytes) + "B) in the last " + str(CHECK_PERIOD) + " seconds")
send_something = True
min_failures += 1
elif(received_bytes > MAX_BYTES):
print("Panda is not sending out too much data! Got " + str(received_messages) + " (" + str(received_bytes) + "B) in the last " + str(CHECK_PERIOD) + " seconds")
print("Probably not on the right baud rate, got reset somehow? Resetting...")
max_failures += 1
init_gps(gps_panda)
else:
print("Got " + str(received_messages) + " (" + str(received_bytes) + "B) messages in the last " + str(CHECK_PERIOD) + " seconds.")
if(min_failures > 0):
print("Total min failures: ", min_failures)
if(max_failures > 0):
print("Total max failures: ", max_failures)
received_messages = 0
received_bytes = 0