Skip to content

Slcan review #611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jun 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 162 additions & 55 deletions can/interfaces/slcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class slcanBus(BusABC):

_SLEEP_AFTER_SERIAL_OPEN = 2 # in seconds

_OK = b"\r"
_ERROR = b"\a"

LINE_TERMINATOR = b"\r"

def __init__(
Expand Down Expand Up @@ -81,10 +84,8 @@ def __init__(

if not channel: # if None or empty
raise TypeError("Must specify a serial port.")

if "@" in channel:
(channel, ttyBaudrate) = channel.split("@")

self.serialPortOrig = serial.serial_for_url(
channel, baudrate=ttyBaudrate, rtscts=rtscts
)
Expand All @@ -95,90 +96,123 @@ def __init__(

if bitrate is not None and btr is not None:
raise ValueError("Bitrate and btr mutually exclusive.")

if bitrate is not None:
self.close()
if bitrate in self._BITRATES:
self.write(self._BITRATES[bitrate])
else:
raise ValueError(
"Invalid bitrate, choose one of "
+ (", ".join(self._BITRATES))
+ "."
)

self.set_bitrate(self, bitrate)
if btr is not None:
self.close()
self.write("s" + btr)

self.set_bitrate_reg(self, btr)
self.open()

super().__init__(
channel, ttyBaudrate=115200, bitrate=None, rtscts=False, **kwargs
)

def write(self, string):
def set_bitrate(self, bitrate):
"""
:raise ValueError: if both *bitrate* is not among the possible values

:param int bitrate:
Bitrate in bit/s
"""
self.close()
if bitrate in self._BITRATES:
self._write(self._BITRATES[bitrate])
else:
raise ValueError(
"Invalid bitrate, choose one of " + (", ".join(self._BITRATES)) + "."
)
self.open()

def set_bitrate_reg(self, btr):
"""
:param str btr:
BTR register value to set custom can speed
"""
self.close()
self._write("s" + btr)
self.open()

def _write(self, string):
self.serialPortOrig.write(string.encode() + self.LINE_TERMINATOR)
self.serialPortOrig.flush()

def _read(self, timeout):

# first read what is already in receive buffer
while self.serialPortOrig.in_waiting:
self._buffer += self.serialPortOrig.read()
# if we still don't have a complete message, do a blocking read
start = time.time()
time_left = timeout
while not (ord(self._OK) in self._buffer or ord(self._ERROR) in self._buffer):
self.serialPortOrig.timeout = time_left
byte = self.serialPortOrig.read()
if byte:
self._buffer += byte
# if timeout is None, try indefinitely
if timeout is None:
continue
# try next one only if there still is time, and with
# reduced timeout
else:
time_left = timeout - (time.time() - start)
if time_left > 0:
continue
else:
return None
# return first message
for i in range(len(self._buffer)):
if self._buffer[i] == ord(self._OK) or self._buffer[i] == ord(self._ERROR):
string = self._buffer[: i + 1].decode()
del self._buffer[: i + 1]
break
return string

def flush(self):
del self._buffer[:]
while self.serialPortOrig.in_waiting:
self.serialPortOrig.read()

def open(self):
self.write("O")
self._write("O")

def close(self):
self.write("C")
self._write("C")

def _recv_internal(self, timeout):
if timeout != self.serialPortOrig.timeout:
self.serialPortOrig.timeout = timeout

canId = None
remote = False
extended = False
frame = []

# First read what is already in the receive buffer
while (
self.serialPortOrig.in_waiting and self.LINE_TERMINATOR not in self._buffer
):
self._buffer += self.serialPortOrig.read(1)

# If we still don't have a complete message, do a blocking read
if self.LINE_TERMINATOR not in self._buffer:
self._buffer += self.serialPortOrig.read_until(self.LINE_TERMINATOR)
string = self._read(timeout)

if self.LINE_TERMINATOR not in self._buffer:
# Timed out
return None, False

readStr = self._buffer.decode()
del self._buffer[:]
if not readStr:
if not string:
pass
elif readStr[0] == "T":
elif string[0] == "T":
# extended frame
canId = int(readStr[1:9], 16)
dlc = int(readStr[9])
canId = int(string[1:9], 16)
dlc = int(string[9])
extended = True
for i in range(0, dlc):
frame.append(int(readStr[10 + i * 2 : 12 + i * 2], 16))
elif readStr[0] == "t":
frame.append(int(string[10 + i * 2 : 12 + i * 2], 16))
elif string[0] == "t":
# normal frame
canId = int(readStr[1:4], 16)
dlc = int(readStr[4])
canId = int(string[1:4], 16)
dlc = int(string[4])
for i in range(0, dlc):
frame.append(int(readStr[5 + i * 2 : 7 + i * 2], 16))
elif readStr[0] == "r":
frame.append(int(string[5 + i * 2 : 7 + i * 2], 16))
elif string[0] == "r":
# remote frame
canId = int(readStr[1:4], 16)
dlc = int(readStr[4])
canId = int(string[1:4], 16)
dlc = int(string[4])
remote = True
elif readStr[0] == "R":
elif string[0] == "R":
# remote extended frame
canId = int(readStr[1:9], 16)
dlc = int(readStr[9])
canId = int(string[1:9], 16)
dlc = int(string[9])
extended = True
remote = True

if canId is not None:
msg = Message(
arbitration_id=canId,
Expand All @@ -194,7 +228,6 @@ def _recv_internal(self, timeout):
def send(self, msg, timeout=None):
if timeout != self.serialPortOrig.write_timeout:
self.serialPortOrig.write_timeout = timeout

if msg.is_remote_frame:
if msg.is_extended_id:
sendStr = "R%08X%d" % (msg.arbitration_id, msg.dlc)
Expand All @@ -205,9 +238,8 @@ def send(self, msg, timeout=None):
sendStr = "T%08X%d" % (msg.arbitration_id, msg.dlc)
else:
sendStr = "t%03X%d" % (msg.arbitration_id, msg.dlc)

sendStr += "".join(["%02X" % b for b in msg.data])
self.write(sendStr)
self._write(sendStr)

def shutdown(self):
self.close()
Expand All @@ -218,3 +250,78 @@ def fileno(self):
return self.serialPortOrig.fileno()
# Return an invalid file descriptor on Windows
return -1

def get_version(self, timeout):
"""Get HW and SW version of the slcan interface.

:type timeout: int or None
:param timeout:
seconds to wait for version or None to wait indefinitely

:returns: tuple (hw_version, sw_version)
WHERE
int hw_version is the hardware version or None on timeout
int sw_version is the software version or None on timeout
"""
cmd = "V"
self._write(cmd)

start = time.time()
time_left = timeout
while True:
string = self._read(time_left)

if not string:
pass
elif string[0] == cmd and len(string) == 6:
# convert ASCII coded version
hw_version = int(string[1:3])
sw_version = int(string[3:5])
return hw_version, sw_version
# if timeout is None, try indefinitely
if timeout is None:
continue
# try next one only if there still is time, and with
# reduced timeout
else:
time_left = timeout - (time.time() - start)
if time_left > 0:
continue
else:
return None, None

def get_serial_number(self, timeout):
"""Get serial number of the slcan interface.

:type timeout: int or None
:param timeout:
seconds to wait for serial number or None to wait indefinitely

:rtype str or None
:return:
None on timeout or a str object.
"""
cmd = "N"
self._write(cmd)

start = time.time()
time_left = timeout
while True:
string = self._read(time_left)

if not string:
pass
elif string[0] == cmd and len(string) == 6:
serial_number = string[1:-1]
return serial_number
# if timeout is None, try indefinitely
if timeout is None:
continue
# try next one only if there still is time, and with
# reduced timeout
else:
time_left = timeout - (time.time() - start)
if time_left > 0:
continue
else:
return None
18 changes: 18 additions & 0 deletions test/test_slcan.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,24 @@ def test_partial_recv(self):
msg = self.bus.recv(0)
self.assertIsNotNone(msg)

def test_version(self):
self.serial.write(b"V1013\r")
hw_ver, sw_ver = self.bus.get_version(0)
self.assertEqual(hw_ver, 10)
self.assertEqual(sw_ver, 13)

hw_ver, sw_ver = self.bus.get_version(0)
self.assertIsNone(hw_ver)
self.assertIsNone(sw_ver)

def test_serial_number(self):
self.serial.write(b"NA123\r")
sn = self.bus.get_serial_number(0)
self.assertEqual(sn, "A123")

sn = self.bus.get_serial_number(0)
self.assertIsNone(sn)


if __name__ == "__main__":
unittest.main()