5
5
from cocotb .clock import Clock
6
6
import random
7
7
8
+ NUM_TESTS = 100
9
+ BAUD_MAX_PERCENT = 2
8
10
9
11
@cocotb .test (timeout_time = 50 , timeout_unit = 'ms' )
10
12
async def transmit (dut ):
@@ -16,12 +18,12 @@ async def transmit(dut):
16
18
await check_tx_reset (dut )
17
19
18
20
# carry out 100 transmissions
19
- for count in range (100 ):
21
+ for count in range (NUM_TESTS ):
20
22
# randomized payload
21
23
TEST_BYTE = random .randint (0 ,255 )
22
24
23
25
# randomized baud multiplier (+/- 2%)
24
- baud_mult = 1.0 + (random .random () - 0.5 ) / 50 * 2
26
+ baud_mult = 1.0 + (random .random () - 0.5 ) / 50 * BAUD_MAX_PERCENT
25
27
26
28
await check_tx (dut , TEST_BYTE , 115200 , baud_mult )
27
29
@@ -30,7 +32,7 @@ async def transmit(dut):
30
32
await Timer (random .randint (1 ,2000 ), units = 'ns' )
31
33
32
34
33
- @cocotb .test (timeout_time = 50 , timeout_unit = 'ms' )
35
+ @cocotb .test (timeout_time = 10 , timeout_unit = 'ms' )
34
36
async def receive1 (dut ):
35
37
"""RX with randomized payload / clock frequency shift / inter-TX delay."""
36
38
@@ -41,34 +43,34 @@ async def receive1(dut):
41
43
await check_rx_reset (dut )
42
44
43
45
# run 100 randomized tests
44
- for count in range (100 ):
46
+ for count in range (NUM_TESTS ):
45
47
# randomized payload
46
48
TEST_BYTE = random .randint (0 ,255 )
47
49
48
50
# randomized baud multiplier (+/- 2%)
49
- baud_mult = 1.0 + (random .random () - 0.5 ) / 50 * 2
51
+ baud_mult = 1.0 + (random .random () - 0.5 ) / 50 * BAUD_MAX_PERCENT
50
52
51
53
await check_rx (dut , TEST_BYTE , 115200 , baud_mult )
52
54
53
55
# randomized delay
54
56
await Timer (random .randint (500 ,2000 ), units = 'ns' )
55
57
56
58
57
- @cocotb .test (timeout_time = 50 , timeout_unit = 'ms' )
59
+ # @cocotb.test(timeout_time=50, timeout_unit='ms')
58
60
async def receive2 (dut ):
59
61
"""Continuous RX with randomized payload."""
60
62
61
63
# 50 Mhz clock
62
64
cocotb .start_soon (Clock (dut .clk , 20 , units = 'ns' ).start ())
63
65
64
66
# reset
65
- await check_tx_reset (dut )
67
+ await check_rx_reset (dut )
66
68
67
69
# run 100 randomized tests
68
- for count in range (100 ):
70
+ for count in range (NUM_TESTS ):
69
71
# randomized payload
70
72
TEST_BYTE = random .randint (0 ,255 )
71
- await check_rx (dut , TEST_BYTE , 115200 , 1.0 )
73
+ await check_rx_concurrent (dut , TEST_BYTE , 115200 , 1.0 )
72
74
73
75
74
76
@cocotb .test (timeout_time = 50 , timeout_unit = 'ms' )
@@ -79,28 +81,97 @@ async def receive3(dut):
79
81
cocotb .start_soon (Clock (dut .clk , 20 , units = 'ns' ).start ())
80
82
81
83
# reset
82
- await check_tx_reset (dut )
84
+ await check_rx_reset (dut )
83
85
84
86
# run 100 randomized tests
85
- for count in range (100 ):
87
+ for count in range (NUM_TESTS ):
86
88
# randomized payload
87
89
TEST_BYTE = random .randint (0 ,255 )
88
90
89
91
# randomized baud multiplier (+/- 2%)
90
- baud_mult = 1.0 + (random .random () - 0.5 ) / 50 * 2
92
+ baud_mult = 1.0 + (random .random () - 0.5 ) / 50 * BAUD_MAX_PERCENT
91
93
92
- await check_rx (dut , TEST_BYTE , 115200 , baud_mult )
94
+ await check_rx_concurrent (dut , TEST_BYTE , 115200 , baud_mult )
93
95
94
96
95
97
@cocotb .test (timeout_time = 50 , timeout_unit = 'ms' )
96
98
async def receive4 (dut ):
97
- """RX overrun ."""
99
+ """RX frame error (start bit) ."""
98
100
99
101
# 50 Mhz clock
100
102
cocotb .start_soon (Clock (dut .clk , 20 , units = 'ns' ).start ())
101
103
102
104
# reset
103
- await check_tx_reset (dut )
105
+ await check_rx_reset (dut )
106
+
107
+ # keep RX ready high
108
+ dut .rx_ready .value = 1
109
+
110
+ # run 100 randomized tests
111
+ for count in range (NUM_TESTS ):
112
+ # prepare random test data
113
+ TEST_BYTE = random .randint (0 ,255 )
114
+ TEST_BITS_LSB = [(TEST_BYTE >> s ) & 1 for s in range (8 )]
115
+
116
+ # malformed (short) start bit
117
+ dut .uart_rx .value = 0
118
+ await Timer (int (1 / 115200. / 16 * 4 * 1e12 ), units = "ps" )
119
+ dut .uart_rx .value = 1
120
+ await Timer (int (1 / 115200. / 16 * 12 * 1e12 ), units = "ps" )
121
+
122
+ # send start bit (0), 8 data bits, no stop bit
123
+ for tx_bit in TEST_BITS_LSB + [1 ]:
124
+ dut .uart_rx .value = tx_bit
125
+ await Timer (int (1 / 115200. * 1e12 ), units = "ps" )
126
+
127
+ # set RX to 0 (instead of the stop bit)
128
+ dut .uart_rx .value = 0
129
+
130
+ await Edge (dut .rx_error )
131
+ assert dut .rx_error .value == 1
132
+ assert dut .rx_valid .value == 0
133
+
134
+
135
+ @cocotb .test (timeout_time = 50 , timeout_unit = 'ms' )
136
+ async def receive5 (dut ):
137
+ """RX frame error (stop bit)."""
138
+
139
+ # 50 Mhz clock
140
+ cocotb .start_soon (Clock (dut .clk , 20 , units = 'ns' ).start ())
141
+
142
+ # reset
143
+ await check_rx_reset (dut )
144
+
145
+ # keep RX ready high
146
+ dut .rx_ready .value = 1
147
+
148
+ # run 100 randomized tests
149
+ for count in range (NUM_TESTS ):
150
+ # prepare random test data
151
+ TEST_BYTE = random .randint (0 ,255 )
152
+ TEST_BITS_LSB = [(TEST_BYTE >> s ) & 1 for s in range (8 )]
153
+
154
+ # send start bit (0), 8 data bits, no stop bit
155
+ for tx_bit in [0 ] + TEST_BITS_LSB :
156
+ dut .uart_rx .value = tx_bit
157
+ await Timer (int (1 / 115200. * 1e12 ), units = "ps" )
158
+
159
+ # set RX to 0 (instead of the stop bit)
160
+ dut .uart_rx .value = 0
161
+
162
+ await Edge (dut .rx_error )
163
+ assert dut .rx_error .value == 1
164
+ assert dut .rx_valid .value == 0
165
+
166
+ @cocotb .test (timeout_time = 50 , timeout_unit = 'ms' )
167
+ async def receive6 (dut ):
168
+ """RX overrun error."""
169
+
170
+ # 50 Mhz clock
171
+ cocotb .start_soon (Clock (dut .clk , 20 , units = 'ns' ).start ())
172
+
173
+ # reset
174
+ await check_rx_reset (dut )
104
175
105
176
# set RX ready low, and keep it low...
106
177
dut .rx_ready .value = 0
@@ -128,15 +199,14 @@ async def receive4(dut):
128
199
assert dut .rx_overrun .value == 1
129
200
130
201
131
-
132
202
# --- HELPER FUNCTIONS ---
133
203
134
204
async def check_tx_reset (dut ):
135
205
# reset
136
206
dut .tx_reset .value = 1
137
- await Timer (1 , units = "ms " )
207
+ await Timer (10 , units = "us " )
138
208
dut .tx_reset .value = 0
139
- await Timer (1 , units = "ms " )
209
+ await Timer (10 , units = "us " )
140
210
141
211
assert dut .tx_ready .value == 1
142
212
@@ -145,11 +215,14 @@ async def check_rx_reset(dut):
145
215
# drive input high
146
216
dut .uart_rx .value = 1
147
217
218
+ # drive RX ready low
219
+ dut .rx_ready .value = 0
220
+
148
221
# reset
149
222
dut .rx_reset .value = 1
150
- await Timer (1 , units = "ms " )
223
+ await Timer (10 , units = "us " )
151
224
dut .rx_reset .value = 0
152
- await Timer (1 , units = "ms " )
225
+ await Timer (10 , units = "us " )
153
226
154
227
assert dut .rx_valid .value == 0
155
228
assert dut .rx_error .value == 0
@@ -166,7 +239,7 @@ async def check_tx(dut, data, baud, baud_mult):
166
239
167
240
# set data value and then start TX
168
241
dut .tx_data .value = data
169
- await Timer (random .randint (100 , 500 ), units = "ns" )
242
+ # await Timer(random.randint(100, 500), units="ns")
170
243
dut .tx_valid .value = 1
171
244
172
245
# wait for TX->0 transition, check TX=0 and ready=0
@@ -190,9 +263,6 @@ async def check_tx(dut, data, baud, baud_mult):
190
263
191
264
async def check_rx (dut , data , baud , baud_mult ):
192
265
dut .rx_ready .value = 0
193
-
194
- # random delay
195
- await Timer (100 + random .randint (100 , 500 ), units = "ns" )
196
266
assert dut .rx_valid .value == 0
197
267
198
268
# prepare random test data
@@ -203,18 +273,44 @@ async def check_rx(dut, data, baud, baud_mult):
203
273
dut .uart_rx .value = tx_bit
204
274
await Timer (int (1 / baud * baud_mult * 1e12 ), units = "ps" )
205
275
206
- # random delay
207
- await Timer (100 + random .randint (100 , 500 ), units = "ns" )
276
+ # did we receive a byte?
208
277
assert dut .rx_valid .value == 1
209
278
279
+ # pulse RX ready to fetch data
210
280
dut .rx_ready .value = 1
211
-
212
281
# wait for valid transition
213
282
await Edge (dut .rx_valid )
214
283
assert dut .rx_valid .value == 0
284
+ dut .rx_ready .value = 0
215
285
216
286
# check payload and valid/error/overflow flags
217
287
assert dut .rx_data .value == data
218
288
assert dut .rx_error .value == 0
219
289
assert dut .rx_overrun .value == 0
220
290
291
+
292
+ async def fetch_data (valid , data ):
293
+ #valid._log.info("HERE")
294
+ await Edge (valid )
295
+ #await Timer(100, units="ns")
296
+ #valid._log.info(f"VALID {valid}")
297
+ return data .value
298
+
299
+ async def check_rx_concurrent (dut , data , baud , baud_mult ):
300
+ dut .rx_ready .value = 1
301
+ assert dut .rx_valid .value == 0
302
+
303
+ f = cocotb .start_soon (fetch_data (dut .rx_valid , dut .rx_data ))
304
+
305
+ # prepare random test data
306
+ TEST_BITS_LSB = [(data >> s ) & 1 for s in range (8 )]
307
+
308
+ # send start bit (0), 8 data bits, stop bit (1)
309
+ for tx_bit in [0 ] + TEST_BITS_LSB + [1 ]:
310
+ dut .uart_rx .value = tx_bit
311
+ await Timer (int (1 / baud * baud_mult * 1e12 ), units = "ps" )
312
+
313
+ # check payload and valid/error/overflow flags
314
+ assert dut .rx_data .value == f .result ()
315
+ assert dut .rx_error .value == 0
316
+ assert dut .rx_overrun .value == 0
0 commit comments