Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@

version: 2
updates:
- package-ecosystem: "uv"
- package-ecosystem: "pip"
# Enable version updates for development dependencies
directory: "/"
schedule:
interval: "monthly"
versioning-strategy: "increase-if-necessary"
groups:
dev-deps:
patterns:
Expand All @@ -23,4 +24,4 @@ updates:
groups:
github-actions:
patterns:
- "*"
- "*"
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ jobs:
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
env: [
"py39",
"py310",
"py311",
"py312",
Expand Down
1 change: 1 addition & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Library Version Python
4.0+ 3.7+
4.3+ 3.8+
4.6+ 3.9+
main branch 3.10+
============================== ===========


Expand Down
21 changes: 4 additions & 17 deletions can/_entry_points.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import importlib
import sys
from dataclasses import dataclass
from importlib.metadata import entry_points
from typing import Any
Expand All @@ -16,19 +15,7 @@ def load(self) -> Any:
return getattr(module, self.class_name)


# See https://docs.python.org/3/library/importlib.metadata.html#entry-points,
# "Compatibility Note".
if sys.version_info >= (3, 10):

def read_entry_points(group: str) -> list[_EntryPoint]:
return [
_EntryPoint(ep.name, ep.module, ep.attr) for ep in entry_points(group=group)
]

else:

def read_entry_points(group: str) -> list[_EntryPoint]:
return [
_EntryPoint(ep.name, *ep.value.split(":", maxsplit=1))
for ep in entry_points().get(group, []) # pylint: disable=no-member
]
def read_entry_points(group: str) -> list[_EntryPoint]:
return [
_EntryPoint(ep.name, ep.module, ep.attr) for ep in entry_points(group=group)
]
37 changes: 16 additions & 21 deletions can/broadcastmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,10 @@
import threading
import time
import warnings
from collections.abc import Sequence
from collections.abc import Callable, Sequence
from typing import (
TYPE_CHECKING,
Callable,
Final,
Optional,
Union,
cast,
)

Expand Down Expand Up @@ -78,7 +75,7 @@ def wait_inf(self, event: _Pywin32Event) -> None:
)


PYWIN32: Optional[_Pywin32] = None
PYWIN32: _Pywin32 | None = None
if sys.platform == "win32" and sys.version_info < (3, 11):
try:
PYWIN32 = _Pywin32()
Expand All @@ -105,9 +102,7 @@ class CyclicSendTaskABC(CyclicTask, abc.ABC):
Message send task with defined period
"""

def __init__(
self, messages: Union[Sequence[Message], Message], period: float
) -> None:
def __init__(self, messages: Sequence[Message] | Message, period: float) -> None:
"""
:param messages:
The messages to be sent periodically.
Expand All @@ -125,7 +120,7 @@ def __init__(

@staticmethod
def _check_and_convert_messages(
messages: Union[Sequence[Message], Message],
messages: Sequence[Message] | Message,
) -> tuple[Message, ...]:
"""Helper function to convert a Message or Sequence of messages into a
tuple, and raises an error when the given value is invalid.
Expand Down Expand Up @@ -164,9 +159,9 @@ def _check_and_convert_messages(
class LimitedDurationCyclicSendTaskABC(CyclicSendTaskABC, abc.ABC):
def __init__(
self,
messages: Union[Sequence[Message], Message],
messages: Sequence[Message] | Message,
period: float,
duration: Optional[float],
duration: float | None,
) -> None:
"""Message send task with a defined duration and period.

Expand All @@ -181,7 +176,7 @@ def __init__(
"""
super().__init__(messages, period)
self.duration = duration
self.end_time: Optional[float] = None
self.end_time: float | None = None


class RestartableCyclicTaskABC(CyclicSendTaskABC, abc.ABC):
Expand Down Expand Up @@ -215,7 +210,7 @@ def _check_modified_messages(self, messages: tuple[Message, ...]) -> None:
"from when the task was created"
)

def modify_data(self, messages: Union[Sequence[Message], Message]) -> None:
def modify_data(self, messages: Sequence[Message] | Message) -> None:
"""Update the contents of the periodically sent messages, without
altering the timing.

Expand All @@ -242,7 +237,7 @@ class MultiRateCyclicSendTaskABC(CyclicSendTaskABC, abc.ABC):
def __init__(
self,
channel: typechecking.Channel,
messages: Union[Sequence[Message], Message],
messages: Sequence[Message] | Message,
count: int, # pylint: disable=unused-argument
initial_period: float, # pylint: disable=unused-argument
subsequent_period: float,
Expand Down Expand Up @@ -272,12 +267,12 @@ def __init__(
self,
bus: "BusABC",
lock: threading.Lock,
messages: Union[Sequence[Message], Message],
messages: Sequence[Message] | Message,
period: float,
duration: Optional[float] = None,
on_error: Optional[Callable[[Exception], bool]] = None,
duration: float | None = None,
on_error: Callable[[Exception], bool] | None = None,
autostart: bool = True,
modifier_callback: Optional[Callable[[Message], None]] = None,
modifier_callback: Callable[[Message], None] | None = None,
) -> None:
"""Transmits `messages` with a `period` seconds for `duration` seconds on a `bus`.

Expand All @@ -298,13 +293,13 @@ def __init__(
self.bus = bus
self.send_lock = lock
self.stopped = True
self.thread: Optional[threading.Thread] = None
self.thread: threading.Thread | None = None
self.on_error = on_error
self.modifier_callback = modifier_callback

self.period_ms = int(round(period * 1000, 0))

self.event: Optional[_Pywin32Event] = None
self.event: _Pywin32Event | None = None
if PYWIN32:
if self.period_ms == 0:
# A period of 0 would mean that the timer is signaled only once
Expand Down Expand Up @@ -338,7 +333,7 @@ def start(self) -> None:
self.thread = threading.Thread(target=self._run, name=name)
self.thread.daemon = True

self.end_time: Optional[float] = (
self.end_time: float | None = (
time.perf_counter() + self.duration if self.duration else None
)

Expand Down
43 changes: 18 additions & 25 deletions can/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,11 @@
import logging
import threading
from abc import ABC, abstractmethod
from collections.abc import Iterator, Sequence
from collections.abc import Callable, Iterator, Sequence
from enum import Enum, auto
from time import time
from types import TracebackType
from typing import (
Callable,
Optional,
Union,
cast,
)

Expand Down Expand Up @@ -68,7 +65,7 @@ class BusABC(ABC):
def __init__(
self,
channel: can.typechecking.Channel,
can_filters: Optional[can.typechecking.CanFilters] = None,
can_filters: can.typechecking.CanFilters | None = None,
**kwargs: object,
):
"""Construct and open a CAN bus instance of the specified type.
Expand Down Expand Up @@ -101,7 +98,7 @@ def __init__(
def __str__(self) -> str:
return self.channel_info

def recv(self, timeout: Optional[float] = None) -> Optional[Message]:
def recv(self, timeout: float | None = None) -> Message | None:
"""Block waiting for a message from the Bus.

:param timeout:
Expand Down Expand Up @@ -139,9 +136,7 @@ def recv(self, timeout: Optional[float] = None) -> Optional[Message]:

return None

def _recv_internal(
self, timeout: Optional[float]
) -> tuple[Optional[Message], bool]:
def _recv_internal(self, timeout: float | None) -> tuple[Message | None, bool]:
"""
Read a message from the bus and tell whether it was filtered.
This methods may be called by :meth:`~can.BusABC.recv`
Expand Down Expand Up @@ -184,7 +179,7 @@ def _recv_internal(
raise NotImplementedError("Trying to read from a write only bus?")

@abstractmethod
def send(self, msg: Message, timeout: Optional[float] = None) -> None:
def send(self, msg: Message, timeout: float | None = None) -> None:
"""Transmit a message to the CAN bus.

Override this method to enable the transmit path.
Expand All @@ -205,12 +200,12 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None:

def send_periodic(
self,
msgs: Union[Message, Sequence[Message]],
msgs: Message | Sequence[Message],
period: float,
duration: Optional[float] = None,
duration: float | None = None,
store_task: bool = True,
autostart: bool = True,
modifier_callback: Optional[Callable[[Message], None]] = None,
modifier_callback: Callable[[Message], None] | None = None,
) -> can.broadcastmanager.CyclicSendTaskABC:
"""Start sending messages at a given period on this bus.

Expand Down Expand Up @@ -297,11 +292,11 @@ def wrapped_stop_method(remove_task: bool = True) -> None:

def _send_periodic_internal(
self,
msgs: Union[Sequence[Message], Message],
msgs: Sequence[Message] | Message,
period: float,
duration: Optional[float] = None,
duration: float | None = None,
autostart: bool = True,
modifier_callback: Optional[Callable[[Message], None]] = None,
modifier_callback: Callable[[Message], None] | None = None,
) -> can.broadcastmanager.CyclicSendTaskABC:
"""Default implementation of periodic message sending using threading.

Expand Down Expand Up @@ -378,20 +373,18 @@ def __iter__(self) -> Iterator[Message]:
yield msg

@property
def filters(self) -> Optional[can.typechecking.CanFilters]:
def filters(self) -> can.typechecking.CanFilters | None:
"""
Modify the filters of this bus. See :meth:`~can.BusABC.set_filters`
for details.
"""
return self._filters

@filters.setter
def filters(self, filters: Optional[can.typechecking.CanFilters]) -> None:
def filters(self, filters: can.typechecking.CanFilters | None) -> None:
self.set_filters(filters)

def set_filters(
self, filters: Optional[can.typechecking.CanFilters] = None
) -> None:
def set_filters(self, filters: can.typechecking.CanFilters | None = None) -> None:
"""Apply filtering to all messages received by this Bus.

All messages that match at least one filter are returned.
Expand All @@ -417,7 +410,7 @@ def set_filters(
with contextlib.suppress(NotImplementedError):
self._apply_filters(self._filters)

def _apply_filters(self, filters: Optional[can.typechecking.CanFilters]) -> None:
def _apply_filters(self, filters: can.typechecking.CanFilters | None) -> None:
"""
Hook for applying the filters to the underlying kernel or
hardware if supported/implemented by the interface.
Expand Down Expand Up @@ -484,9 +477,9 @@ def __enter__(self) -> Self:

def __exit__(
self,
exc_type: Optional[type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self.shutdown()

Expand Down
Loading
Loading