Skip to content

Commit d908967

Browse files
committed
tests: Verify waveform sessions
1 parent 699914e commit d908967

File tree

1 file changed

+54
-0
lines changed

1 file changed

+54
-0
lines changed

tests/test_signals.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,57 @@ def test_singlepoint_loopback(nixnet_in_interface, nixnet_out_interface):
8686
actual_signals = list(input_session.signals.read())
8787
for expected, (_, actual) in zip(expected_signals, actual_signals):
8888
assert pytest.approx(expected, rel=1) == actual
89+
90+
91+
def generate_ramp(min, max, rate, length):
92+
"""Generate ramp for test data
93+
94+
Args:
95+
min(float): minimum value
96+
max(float): maximum value
97+
rate(float): rate data is transmitted (Hz)
98+
length(float): Duration of ramp (secs)
99+
"""
100+
samples = int(rate * length)
101+
step = (max - min) / samples
102+
return [
103+
min + step * i
104+
for i in range(samples)
105+
]
106+
107+
108+
@pytest.mark.integration
109+
def test_waveform_loopback(nixnet_in_interface, nixnet_out_interface):
110+
database_name = 'NIXNET_example'
111+
cluster_name = 'CAN_Cluster'
112+
signal_names = 'CANEventSignal1'
113+
114+
with nixnet.SignalInWaveformSession(
115+
nixnet_in_interface,
116+
database_name,
117+
cluster_name,
118+
signal_names) as input_session:
119+
with nixnet.SignalOutWaveformSession(
120+
nixnet_out_interface,
121+
database_name,
122+
cluster_name,
123+
signal_names) as output_session:
124+
output_session.signals.resamp_rate = 1000
125+
input_session.signals.resamp_rate = output_session.signals.resamp_rate / 2
126+
# Start the input session manually to make sure that the first
127+
# frame value sent before the initial read will be received.
128+
input_session.start()
129+
130+
signal_ramp = generate_ramp(4, 11, output_session.signal_ramp.resamp_rate, 1)
131+
output_session.signals.write([signal_ramp])
132+
133+
# Wait 1 s and then read the received values.
134+
# They should be the same as the ones sent.
135+
time.sleep(1)
136+
137+
t0, dt, waveforms = input_session.signals.read()
138+
print(t0)
139+
assert pytest.approx(dt, 1 / input_session.signals.resamp_rate)
140+
assert len(waveforms) == 1
141+
for expected, actual in zip(signal_ramp, waveforms[0]):
142+
assert pytest.approx(expected, rel=0.1) == actual

0 commit comments

Comments
 (0)