-
Notifications
You must be signed in to change notification settings - Fork 202
Code quality tools #571
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Code quality tools #571
Changes from all commits
2067f82
7612061
7b9f8c8
b7ddf3f
cf06f72
a72c122
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,6 @@ | |
|
||
import canopen.network | ||
|
||
|
||
# Error code, error register, vendor specific data | ||
EMCY_STRUCT = struct.Struct("<HB5s") | ||
|
||
|
@@ -17,9 +16,9 @@ class EmcyConsumer: | |
|
||
def __init__(self): | ||
#: Log of all received EMCYs for this node | ||
self.log: List["EmcyError"] = [] | ||
self.log: List[EmcyError] = [] | ||
#: Only active EMCYs. Will be cleared on Error Reset | ||
self.active: List["EmcyError"] = [] | ||
self.active: List[EmcyError] = [] | ||
self.callbacks = [] | ||
self.emcy_received = threading.Condition() | ||
|
||
|
@@ -53,9 +52,7 @@ def reset(self): | |
self.log = [] | ||
self.active = [] | ||
|
||
def wait( | ||
self, emcy_code: Optional[int] = None, timeout: float = 10 | ||
) -> "EmcyError": | ||
def wait(self, emcy_code: Optional[int] = None, timeout: float = 10) -> "EmcyError": | ||
Comment on lines
-56
to
+55
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keeping in mind the different Python versions supported by this library, this could be improved by using In general, when touching some line of code just for prettifying or reformatting, at least it should modernize things we haven't consistently brought up to date yet. |
||
"""Wait for a new EMCY to arrive. | ||
|
||
:param emcy_code: EMCY code to wait for | ||
|
@@ -113,7 +110,7 @@ class EmcyError(Exception): | |
(0x8000, 0xF000, "Monitoring"), | ||
(0x9000, 0xFF00, "External Error"), | ||
(0xF000, 0xFF00, "Additional Functions"), | ||
(0xFF00, 0xFF00, "Device Specific") | ||
(0xFF00, 0xFF00, "Device Specific"), | ||
Comment on lines
-116
to
+113
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Certainly. |
||
] | ||
|
||
def __init__(self, code: int, register: int, data: bytes, timestamp: float): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,6 @@ | |
|
||
import canopen.network | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
# Command Specifier (CS) | ||
|
@@ -19,16 +18,16 @@ | |
CS_SWITCH_STATE_SELECTIVE_REVISION_NUMBER = 0x42 | ||
CS_SWITCH_STATE_SELECTIVE_SERIAL_NUMBER = 0x43 | ||
CS_SWITCH_STATE_SELECTIVE_RESPONSE = 0x44 | ||
CS_IDENTIFY_REMOTE_SLAVE_VENDOR_ID = 0x46 # m -> s | ||
CS_IDENTIFY_REMOTE_SLAVE_PRODUCT_CODE = 0x47 # m -> s | ||
CS_IDENTIFY_REMOTE_SLAVE_REVISION_NUMBER_LOW = 0x48 # m -> s | ||
CS_IDENTIFY_REMOTE_SLAVE_REVISION_NUMBER_HIGH = 0x49 # m -> s | ||
CS_IDENTIFY_REMOTE_SLAVE_SERIAL_NUMBER_LOW = 0x4A # m -> s | ||
CS_IDENTIFY_REMOTE_SLAVE_SERIAL_NUMBER_HIGH = 0x4B # m -> s | ||
CS_IDENTIFY_NON_CONFIGURED_REMOTE_SLAVE = 0x4C # m -> s | ||
CS_IDENTIFY_SLAVE = 0x4F # s -> m | ||
CS_IDENTIFY_NON_CONFIGURED_SLAVE = 0x50 # s -> m | ||
CS_FAST_SCAN = 0x51 # m -> s | ||
CS_IDENTIFY_REMOTE_SLAVE_VENDOR_ID = 0x46 # m -> s | ||
CS_IDENTIFY_REMOTE_SLAVE_PRODUCT_CODE = 0x47 # m -> s | ||
CS_IDENTIFY_REMOTE_SLAVE_REVISION_NUMBER_LOW = 0x48 # m -> s | ||
CS_IDENTIFY_REMOTE_SLAVE_REVISION_NUMBER_HIGH = 0x49 # m -> s | ||
CS_IDENTIFY_REMOTE_SLAVE_SERIAL_NUMBER_LOW = 0x4A # m -> s | ||
CS_IDENTIFY_REMOTE_SLAVE_SERIAL_NUMBER_HIGH = 0x4B # m -> s | ||
CS_IDENTIFY_NON_CONFIGURED_REMOTE_SLAVE = 0x4C # m -> s | ||
CS_IDENTIFY_SLAVE = 0x4F # s -> m | ||
CS_IDENTIFY_NON_CONFIGURED_SLAVE = 0x50 # s -> m | ||
CS_FAST_SCAN = 0x51 # m -> s | ||
Comment on lines
-22
to
+30
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you find this more readable? I don't. That's a case where I disagree with Black. In fact, I'd even align the equal signs to have the numbers always on the same column. For example Golang ( So yes, this is not consistent with Black's stringent rules, but it's a case where the "standardization" change is detrimental. And we shouldn't waste time on such changes just because the tools are available. Time can be better spent on more important (functional) changes. |
||
CS_INQUIRE_VENDOR_ID = 0x5A | ||
CS_INQUIRE_PRODUCT_CODE = 0x5B | ||
CS_INQUIRE_REVISION_NUMBER = 0x5C | ||
|
@@ -49,7 +48,7 @@ | |
ERROR_STORE_NOT_SUPPORTED = 1 | ||
ERROR_STORE_ACCESS_PROBLEM = 2 | ||
|
||
ERROR_VENDOR_SPECIFIC = 0xff | ||
ERROR_VENDOR_SPECIFIC = 0xFF | ||
Comment on lines
-52
to
+51
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. |
||
|
||
ListMessageNeedResponse = [ | ||
CS_CONFIGURE_NODE_ID, | ||
|
@@ -107,8 +106,9 @@ def send_switch_mode_global(self, mode): | |
"""obsolete""" | ||
self.send_switch_state_global(mode) | ||
|
||
def send_switch_state_selective(self, | ||
vendorId, productCode, revisionNumber, serialNumber): | ||
def send_switch_state_selective( | ||
self, vendorId, productCode, revisionNumber, serialNumber | ||
): | ||
"""switch mode from WAITING_STATE to CONFIGURATION_STATE | ||
only if 128bits LSS address matches with the arguments. | ||
It sends 4 messages for each argument. | ||
|
@@ -132,14 +132,15 @@ def send_switch_state_selective(self, | |
|
||
self.__send_lss_address(CS_SWITCH_STATE_SELECTIVE_VENDOR_ID, vendorId) | ||
self.__send_lss_address(CS_SWITCH_STATE_SELECTIVE_PRODUCT_CODE, productCode) | ||
self.__send_lss_address(CS_SWITCH_STATE_SELECTIVE_REVISION_NUMBER, revisionNumber) | ||
response = self.__send_lss_address(CS_SWITCH_STATE_SELECTIVE_SERIAL_NUMBER, serialNumber) | ||
self.__send_lss_address( | ||
CS_SWITCH_STATE_SELECTIVE_REVISION_NUMBER, revisionNumber | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Definitely need to agree on a line length setting for Black. Changes such as this one cause unnecessary churn and I don't think it's really solving a problem. My usual compromise is to set a 96 character limit. Not quite conservative (< 80), but also not as disruptive as allowing 120 chars (+50 %) and it leaves some margin for diff markings. |
||
) | ||
response = self.__send_lss_address( | ||
CS_SWITCH_STATE_SELECTIVE_SERIAL_NUMBER, serialNumber | ||
) | ||
|
||
cs = struct.unpack_from("<B", response)[0] | ||
if cs == CS_SWITCH_STATE_SELECTIVE_RESPONSE: | ||
return True | ||
|
||
return False | ||
return cs == CS_SWITCH_STATE_SELECTIVE_RESPONSE | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice. |
||
|
||
def inquire_node_id(self): | ||
"""Read the node id. | ||
|
@@ -197,19 +198,22 @@ def activate_bit_timing(self, switch_delay_ms): | |
message = bytearray(8) | ||
|
||
message[0] = CS_ACTIVATE_BIT_TIMING | ||
message[1:3] = struct.pack('<H', switch_delay_ms) | ||
message[1:3] = struct.pack("<H", switch_delay_ms) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We shouldn't get started with this, it has potential for endless discussions and no real benefit. I actually configure Black with |
||
self.__send_command(message) | ||
|
||
def store_configuration(self): | ||
"""Store node id and baud rate. | ||
""" | ||
"""Store node id and baud rate.""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice. PEP8 if I remember correctly? I use flake8 to help me with such things. |
||
self.__send_configure(CS_STORE_CONFIGURATION) | ||
|
||
def send_identify_remote_slave(self, | ||
vendorId, productCode, | ||
revisionNumberLow, revisionNumberHigh, | ||
serialNumberLow, serialNumberHigh): | ||
|
||
def send_identify_remote_slave( | ||
self, | ||
vendorId, | ||
productCode, | ||
revisionNumberLow, | ||
revisionNumberHigh, | ||
serialNumberLow, | ||
serialNumberHigh, | ||
): | ||
"""This command sends the range of LSS address to find the slave nodes | ||
in the specified range | ||
|
||
|
@@ -230,10 +234,18 @@ def send_identify_remote_slave(self, | |
|
||
self.__send_lss_address(CS_IDENTIFY_REMOTE_SLAVE_VENDOR_ID, vendorId) | ||
self.__send_lss_address(CS_IDENTIFY_REMOTE_SLAVE_PRODUCT_CODE, productCode) | ||
self.__send_lss_address(CS_IDENTIFY_REMOTE_SLAVE_REVISION_NUMBER_LOW, revisionNumberLow) | ||
self.__send_lss_address(CS_IDENTIFY_REMOTE_SLAVE_REVISION_NUMBER_HIGH, revisionNumberHigh) | ||
self.__send_lss_address(CS_IDENTIFY_REMOTE_SLAVE_SERIAL_NUMBER_LOW, serialNumberLow) | ||
self.__send_lss_address(CS_IDENTIFY_REMOTE_SLAVE_SERIAL_NUMBER_HIGH, serialNumberHigh) | ||
self.__send_lss_address( | ||
CS_IDENTIFY_REMOTE_SLAVE_REVISION_NUMBER_LOW, revisionNumberLow | ||
) | ||
self.__send_lss_address( | ||
CS_IDENTIFY_REMOTE_SLAVE_REVISION_NUMBER_HIGH, revisionNumberHigh | ||
) | ||
self.__send_lss_address( | ||
CS_IDENTIFY_REMOTE_SLAVE_SERIAL_NUMBER_LOW, serialNumberLow | ||
) | ||
self.__send_lss_address( | ||
CS_IDENTIFY_REMOTE_SLAVE_SERIAL_NUMBER_HIGH, serialNumberHigh | ||
) | ||
|
||
def send_identify_non_configured_remote_slave(self): | ||
# TODO it should handle the multiple respones from slaves | ||
|
@@ -263,13 +275,17 @@ def fast_scan(self): | |
while lss_bit_check > 0: | ||
lss_bit_check -= 1 | ||
|
||
if not self.__send_fast_scan_message(lss_id[lss_sub], lss_bit_check, lss_sub, lss_next): | ||
lss_id[lss_sub] |= 1<<lss_bit_check | ||
if not self.__send_fast_scan_message( | ||
lss_id[lss_sub], lss_bit_check, lss_sub, lss_next | ||
): | ||
lss_id[lss_sub] |= 1 << lss_bit_check | ||
|
||
time.sleep(0.01) | ||
|
||
lss_next = (lss_sub + 1) & 3 | ||
if not self.__send_fast_scan_message(lss_id[lss_sub], lss_bit_check, lss_sub, lss_next): | ||
if not self.__send_fast_scan_message( | ||
lss_id[lss_sub], lss_bit_check, lss_sub, lss_next | ||
): | ||
return False, None | ||
|
||
time.sleep(0.01) | ||
|
@@ -284,23 +300,22 @@ def fast_scan(self): | |
|
||
def __send_fast_scan_message(self, id_number, bit_checker, lss_sub, lss_next): | ||
message = bytearray(8) | ||
message[0:8] = struct.pack('<BIBBB', CS_FAST_SCAN, id_number, bit_checker, lss_sub, lss_next) | ||
message[0:8] = struct.pack( | ||
"<BIBBB", CS_FAST_SCAN, id_number, bit_checker, lss_sub, lss_next | ||
) | ||
try: | ||
recv_msg = self.__send_command(message) | ||
except LssError: | ||
return False | ||
|
||
cs = struct.unpack_from("<B", recv_msg)[0] | ||
if cs == CS_IDENTIFY_SLAVE: | ||
return True | ||
|
||
return False | ||
return cs == CS_IDENTIFY_SLAVE | ||
|
||
def __send_lss_address(self, req_cs, number): | ||
message = bytearray(8) | ||
|
||
message[0] = req_cs | ||
message[1:5] = struct.pack('<I', number) | ||
message[1:5] = struct.pack("<I", number) | ||
response = self.__send_command(message) | ||
# some device needs these delays between messages | ||
# because it can't handle messages arriving with no delay | ||
|
@@ -386,10 +401,9 @@ def __send_command(self, message): | |
# Wait for the slave to respond | ||
# TODO check if the response is LSS response message | ||
try: | ||
response = self.responses.get( | ||
block=True, timeout=self.RESPONSE_TIMEOUT) | ||
except queue.Empty: | ||
raise LssError("No LSS response received") | ||
response = self.responses.get(block=True, timeout=self.RESPONSE_TIMEOUT) | ||
except queue.Empty as err: | ||
raise LssError("No LSS response received") from err | ||
|
||
return response | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ | |
import logging | ||
import threading | ||
from collections.abc import MutableMapping | ||
from typing import Callable, Dict, Final, Iterator, List, Optional, Union | ||
from typing import Callable, Final, Iterator | ||
|
||
import can | ||
from can import Listener | ||
|
@@ -16,7 +16,6 @@ | |
from canopen.sync import SyncProducer | ||
from canopen.timestamp import TimeProducer | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
Callback = Callable[[int, bytearray, float], None] | ||
|
@@ -28,7 +27,7 @@ class Network(MutableMapping): | |
NOTIFIER_CYCLE: float = 1.0 #: Maximum waiting time for one notifier iteration. | ||
NOTIFIER_SHUTDOWN_TIMEOUT: float = 5.0 #: Maximum waiting time to stop notifiers. | ||
|
||
def __init__(self, bus: Optional[can.BusABC] = None): | ||
def __init__(self, bus: can.BusABC | None = None): | ||
""" | ||
:param can.BusABC bus: | ||
A python-can bus instance to re-use. | ||
|
@@ -41,9 +40,9 @@ def __init__(self, bus: Optional[can.BusABC] = None): | |
#: List of :class:`can.Listener` objects. | ||
#: Includes at least MessageListener. | ||
self.listeners = [MessageListener(self)] | ||
self.notifier: Optional[can.Notifier] = None | ||
self.nodes: Dict[int, Union[RemoteNode, LocalNode]] = {} | ||
self.subscribers: Dict[int, List[Callback]] = {} | ||
self.notifier: can.Notifier | None = None | ||
self.nodes: dict[int, RemoteNode | LocalNode] = {} | ||
self.subscribers: dict[int, list[Callback]] = {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What Python version is the minimum to support this? We shouldn't break compatibility because of such minor changes. |
||
self.send_lock = threading.Lock() | ||
self.sync = SyncProducer(self) | ||
self.time = TimeProducer(self) | ||
|
@@ -62,7 +61,7 @@ def subscribe(self, can_id: int, callback: Callback) -> None: | |
:param callback: | ||
Function to call when message is received. | ||
""" | ||
self.subscribers.setdefault(can_id, list()) | ||
self.subscribers.setdefault(can_id, []) | ||
if callback not in self.subscribers[can_id]: | ||
self.subscribers[can_id].append(callback) | ||
|
||
|
@@ -134,8 +133,8 @@ def __exit__(self, type, value, traceback): | |
|
||
def add_node( | ||
self, | ||
node: Union[int, RemoteNode, LocalNode], | ||
object_dictionary: Union[str, ObjectDictionary, None] = None, | ||
node: int | RemoteNode | LocalNode, | ||
object_dictionary: str | ObjectDictionary | None = None, | ||
upload_eds: bool = False, | ||
) -> RemoteNode: | ||
"""Add a remote node to the network. | ||
|
@@ -164,7 +163,7 @@ def add_node( | |
def create_node( | ||
self, | ||
node: int, | ||
object_dictionary: Union[str, ObjectDictionary, None] = None, | ||
object_dictionary: str | ObjectDictionary | None = None, | ||
) -> LocalNode: | ||
"""Create a local node in the network. | ||
|
||
|
@@ -202,10 +201,12 @@ def send_message(self, can_id: int, data: bytes, remote: bool = False) -> None: | |
""" | ||
if not self.bus: | ||
raise RuntimeError("Not connected to CAN bus") | ||
msg = can.Message(is_extended_id=can_id > 0x7FF, | ||
arbitration_id=can_id, | ||
data=data, | ||
is_remote_frame=remote) | ||
msg = can.Message( | ||
is_extended_id=can_id > 0x7FF, | ||
arbitration_id=can_id, | ||
data=data, | ||
is_remote_frame=remote, | ||
) | ||
with self.send_lock: | ||
self.bus.send(msg) | ||
self.check() | ||
|
@@ -260,10 +261,10 @@ def check(self) -> None: | |
logger.error("An error has caused receiving of messages to stop") | ||
raise exc | ||
|
||
def __getitem__(self, node_id: int) -> Union[RemoteNode, LocalNode]: | ||
def __getitem__(self, node_id: int) -> RemoteNode | LocalNode: | ||
return self.nodes[node_id] | ||
|
||
def __setitem__(self, node_id: int, node: Union[RemoteNode, LocalNode]): | ||
def __setitem__(self, node_id: int, node: RemoteNode | LocalNode): | ||
assert node_id == node.id | ||
if node_id in self.nodes: | ||
# Remove old callbacks | ||
|
@@ -285,12 +286,14 @@ def __len__(self) -> int: | |
class _UninitializedNetwork(Network): | ||
"""Empty network implementation as a placeholder before actual initialization.""" | ||
|
||
def __init__(self, bus: Optional[can.BusABC] = None): | ||
def __init__(self, bus: can.BusABC | None = None): | ||
"""Do not initialize attributes, by skipping the parent constructor.""" | ||
|
||
def __getattribute__(self, name): | ||
raise RuntimeError("No actual Network object was assigned, " | ||
"try associating to a real network first.") | ||
raise RuntimeError( | ||
"No actual Network object was assigned, " | ||
"try associating to a real network first." | ||
) | ||
|
||
|
||
#: Singleton instance | ||
|
@@ -323,9 +326,12 @@ def __init__( | |
""" | ||
self.bus = bus | ||
self.period = period | ||
self.msg = can.Message(is_extended_id=can_id > 0x7FF, | ||
arbitration_id=can_id, | ||
data=data, is_remote_frame=remote) | ||
self.msg = can.Message( | ||
is_extended_id=can_id > 0x7FF, | ||
arbitration_id=can_id, | ||
data=data, | ||
is_remote_frame=remote, | ||
) | ||
self._start() | ||
|
||
def _start(self): | ||
|
@@ -391,10 +397,10 @@ class NodeScanner: | |
|
||
SERVICES = (0x700, 0x580, 0x180, 0x280, 0x380, 0x480, 0x80) | ||
|
||
def __init__(self, network: Optional[Network] = None): | ||
def __init__(self, network: Network | None = None): | ||
self.network = network | ||
#: A :class:`list` of nodes discovered | ||
self.nodes: List[int] = [] | ||
self.nodes: list[int] = [] | ||
|
||
def on_message_received(self, can_id: int): | ||
service = can_id & 0x780 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely not. Which tool would suggest that? Separating the whole imports block(s) from the rest of the code is certainly worth an additional newline.