Skip to content

Fix some slcan issues #447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 31 additions & 17 deletions can/interfaces/slcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ class slcanBus(BusABC):

_SLEEP_AFTER_SERIAL_OPEN = 2 # in seconds

LINE_TERMINATOR = b'\r'

def __init__(self, channel, ttyBaudrate=115200, bitrate=None,
sleep_after_open=_SLEEP_AFTER_SERIAL_OPEN,
rtscts=False, **kwargs):
"""
:param str channel:
Expand All @@ -59,6 +62,8 @@ def __init__(self, channel, ttyBaudrate=115200, bitrate=None,
Bitrate in bit/s
:param float poll_interval:
Poll interval in seconds when reading messages
:param float sleep_after_open:
Time to wait in seconds after opening serial connection
:param bool rtscts:
turn hardware handshake (RTS/CTS) on and off
"""
Expand All @@ -72,7 +77,9 @@ def __init__(self, channel, ttyBaudrate=115200, bitrate=None,
self.serialPortOrig = serial.serial_for_url(
channel, baudrate=ttyBaudrate, rtscts=rtscts)

time.sleep(self._SLEEP_AFTER_SERIAL_OPEN)
self._buffer = bytearray()

time.sleep(sleep_after_open)

if bitrate is not None:
self.close()
Expand All @@ -87,9 +94,7 @@ def __init__(self, channel, ttyBaudrate=115200, bitrate=None,
bitrate=None, rtscts=False, **kwargs)

def write(self, string):
if not string.endswith('\r'):
string += '\r'
self.serialPortOrig.write(string.encode())
self.serialPortOrig.write(string.encode() + self.LINE_TERMINATOR)
self.serialPortOrig.flush()

def open(self):
Expand All @@ -107,12 +112,20 @@ def _recv_internal(self, timeout):
extended = False
frame = []

readStr = self.serialPortOrig.read_until(b'\r')
# Read everything that is already available
waiting = self.serialPortOrig.read(self.serialPortOrig.in_waiting)
self._buffer += waiting

if not readStr:
return None, False
else:
readStr = readStr.decode()
# Check if a complete message has been received
pos = self._buffer.find(self.LINE_TERMINATOR)
if pos == -1:
# Keep reading...
self._buffer += self.serialPortOrig.read_until(self.LINE_TERMINATOR)
pos = self._buffer.find(self.LINE_TERMINATOR)

if pos != -1:
readStr = self._buffer[0:pos].decode()
del self._buffer[0:pos+1]
if readStr[0] == 'T':
# extended frame
canId = int(readStr[1:9], 16)
Expand All @@ -129,45 +142,46 @@ def _recv_internal(self, timeout):
elif readStr[0] == 'r':
# remote frame
canId = int(readStr[1:4], 16)
dlc = int(readStr[4])
remote = True
elif readStr[0] == 'R':
# remote extended frame
canId = int(readStr[1:9], 16)
dlc = int(readStr[9])
extended = True
remote = True

if canId is not None:
msg = Message(arbitration_id=canId,
extended_id=extended,
is_extended_id=extended,
timestamp=time.time(), # Better than nothing...
is_remote_frame=remote,
dlc=dlc,
data=frame)
return msg, False
else:
return None, False
return None, False

def send(self, msg, timeout=0):
def send(self, msg, timeout=None):
if timeout != self.serialPortOrig.write_timeout:
self.serialPortOrig.write_timeout = timeout

if msg.is_remote_frame:
if msg.is_extended_id:
sendStr = "R%08X0" % (msg.arbitration_id)
sendStr = "R%08X%d" % (msg.arbitration_id, msg.dlc)
else:
sendStr = "r%03X0" % (msg.arbitration_id)
sendStr = "r%03X%d" % (msg.arbitration_id, msg.dlc)
else:
if msg.is_extended_id:
sendStr = "T%08X%d" % (msg.arbitration_id, msg.dlc)
else:
sendStr = "t%03X%d" % (msg.arbitration_id, msg.dlc)

for i in range(0, msg.dlc):
sendStr += "%02X" % msg.data[i]
sendStr += "".join(["%02X" % b for b in msg.data])
self.write(sendStr)

def shutdown(self):
self.close()
self.serialPortOrig.close()

def fileno(self):
if hasattr(self.serialPortOrig, 'fileno'):
Expand Down
112 changes: 112 additions & 0 deletions test/test_slcan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#!/usr/bin/env python
# coding: utf-8
import unittest
import can


class slcanTestCase(unittest.TestCase):

def setUp(self):
self.bus = can.Bus('loop://', bustype='slcan', sleep_after_open=0)
self.serial = self.bus.serialPortOrig
self.serial.read(self.serial.in_waiting)

def tearDown(self):
self.bus.shutdown()

def test_recv_extended(self):
self.serial.write(b'T12ABCDEF2AA55\r')
msg = self.bus.recv(0)
self.assertIsNotNone(msg)
self.assertEqual(msg.arbitration_id, 0x12ABCDEF)
self.assertEqual(msg.is_extended_id, True)
self.assertEqual(msg.is_remote_frame, False)
self.assertEqual(msg.dlc, 2)
self.assertSequenceEqual(msg.data, [0xAA, 0x55])

def test_send_extended(self):
msg = can.Message(arbitration_id=0x12ABCDEF,
is_extended_id=True,
data=[0xAA, 0x55])
self.bus.send(msg)
data = self.serial.read(self.serial.in_waiting)
self.assertEqual(data, b'T12ABCDEF2AA55\r')

def test_recv_standard(self):
self.serial.write(b't4563112233\r')
msg = self.bus.recv(0)
self.assertIsNotNone(msg)
self.assertEqual(msg.arbitration_id, 0x456)
self.assertEqual(msg.is_extended_id, False)
self.assertEqual(msg.is_remote_frame, False)
self.assertEqual(msg.dlc, 3)
self.assertSequenceEqual(msg.data, [0x11, 0x22, 0x33])

def test_send_standard(self):
msg = can.Message(arbitration_id=0x456,
is_extended_id=False,
data=[0x11, 0x22, 0x33])
self.bus.send(msg)
data = self.serial.read(self.serial.in_waiting)
self.assertEqual(data, b't4563112233\r')

def test_recv_standard_remote(self):
self.serial.write(b'r1238\r')
msg = self.bus.recv(0)
self.assertIsNotNone(msg)
self.assertEqual(msg.arbitration_id, 0x123)
self.assertEqual(msg.is_extended_id, False)
self.assertEqual(msg.is_remote_frame, True)
self.assertEqual(msg.dlc, 8)

def test_send_standard_remote(self):
msg = can.Message(arbitration_id=0x123,
is_extended_id=False,
is_remote_frame=True,
dlc=8)
self.bus.send(msg)
data = self.serial.read(self.serial.in_waiting)
self.assertEqual(data, b'r1238\r')

def test_recv_extended_remote(self):
self.serial.write(b'R12ABCDEF6\r')
msg = self.bus.recv(0)
self.assertIsNotNone(msg)
self.assertEqual(msg.arbitration_id, 0x12ABCDEF)
self.assertEqual(msg.is_extended_id, True)
self.assertEqual(msg.is_remote_frame, True)
self.assertEqual(msg.dlc, 6)

def test_send_extended_remote(self):
msg = can.Message(arbitration_id=0x12ABCDEF,
is_extended_id=True,
is_remote_frame=True,
dlc=6)
self.bus.send(msg)
data = self.serial.read(self.serial.in_waiting)
self.assertEqual(data, b'R12ABCDEF6\r')

def test_partial_recv(self):
self.serial.write(b'T12ABCDEF')
msg = self.bus.recv(0)
self.assertIsNone(msg)

self.serial.write(b'2AA55\rT12')
msg = self.bus.recv(0)
self.assertIsNotNone(msg)
self.assertEqual(msg.arbitration_id, 0x12ABCDEF)
self.assertEqual(msg.is_extended_id, True)
self.assertEqual(msg.is_remote_frame, False)
self.assertEqual(msg.dlc, 2)
self.assertSequenceEqual(msg.data, [0xAA, 0x55])

msg = self.bus.recv(0)
self.assertIsNone(msg)

self.serial.write(b'ABCDEF2AA55\r')
msg = self.bus.recv(0)
self.assertIsNotNone(msg)


if __name__ == '__main__':
unittest.main()