Skip to content

Commit 401358a

Browse files
authored
Merge pull request #197 from plugwise/bugfix/arnout/event_and_msg_overflow
Bugfix/arnout/event_and_msg_overflow
2 parents f7287bf + 7f2c05e commit 401358a

File tree

9 files changed

+57
-26
lines changed

9 files changed

+57
-26
lines changed

plugwise_usb/connection/manager.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ def __init__(self) -> None:
3636
] = {}
3737
self._unsubscribe_stick_events: Callable[[], None] | None = None
3838

39+
@property
40+
def queue_depth(self) -> int:
41+
return self._sender.processed_messages - self._receiver.processed_messages
42+
43+
def correct_received_messages(self, correction: int) -> None:
44+
self._receiver.correct_processed_messages(correction)
45+
3946
@property
4047
def serial_path(self) -> str:
4148
"""Return current port."""

plugwise_usb/connection/queue.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,15 @@ async def stop(self) -> None:
7474
self._stick = None
7575
_LOGGER.debug("queue stopped")
7676

77-
async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse:
77+
async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse | None:
7878
"""Add request to queue and return the response of node. Raises an error when something fails."""
7979
if request.waiting_for_response:
8080
raise MessageError(
8181
f"Cannot send message {request} which is currently waiting for response."
8282
)
8383

8484
while request.resend and not request.waiting_for_response:
85-
_LOGGER.warning("submit | start (%s) %s", request.retries_left, request)
85+
_LOGGER.debug("submit | start (%s) %s", request.retries_left, request)
8686
if not self._running or self._stick is None:
8787
raise StickError(
8888
f"Cannot send message {request.__class__.__name__} for"
@@ -91,6 +91,7 @@ async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse:
9191
await self._add_request_to_queue(request)
9292
try:
9393
response: PlugwiseResponse = await request.response_future()
94+
return response
9495
except (NodeTimeout, StickTimeout) as e:
9596
if isinstance(request, NodePingRequest):
9697
# For ping requests it is expected to receive timeouts, so lower log level
@@ -103,17 +104,19 @@ async def submit(self, request: PlugwiseRequest) -> PlugwiseResponse:
103104
_LOGGER.warning("%s, cancel request", e) # type: ignore[unreachable]
104105
except StickError as exception:
105106
_LOGGER.error(exception)
107+
self._stick.correct_received_messages(1)
106108
raise StickError(
107109
f"No response received for {request.__class__.__name__} "
108110
+ f"to {request.mac_decoded}"
109111
) from exception
110112
except BaseException as exception:
113+
self._stick.correct_received_messages(1)
111114
raise StickError(
112115
f"No response received for {request.__class__.__name__} "
113116
+ f"to {request.mac_decoded}"
114117
) from exception
115118

116-
return response
119+
return None
117120

118121
async def _add_request_to_queue(self, request: PlugwiseRequest) -> None:
119122
"""Add request to send queue."""
@@ -133,8 +136,13 @@ async def _send_queue_worker(self) -> None:
133136
if request.priority == Priority.CANCEL:
134137
self._submit_queue.task_done()
135138
return
139+
140+
while self._stick.queue_depth > 3:
141+
_LOGGER.info("Awaiting plugwise responses %d", self._stick.queue_depth)
142+
await sleep(0.125)
143+
136144
await self._stick.write_to_stick(request)
137145
self._submit_queue.task_done()
138-
await sleep(0.001)
146+
139147
_LOGGER.debug("Sent from queue %s", request)
140148
_LOGGER.debug("Send_queue_worker stopped")

plugwise_usb/connection/receiver.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ def __init__(
9999
self._data_worker_task: Task[None] | None = None
100100

101101
# Message processing
102+
self._processed_msgs = 0
102103
self._message_queue: PriorityQueue[PlugwiseResponse] = PriorityQueue()
103104
self._last_processed_messages: list[bytes] = []
104105
self._current_seq_id: bytes | None = None
@@ -137,11 +138,20 @@ def connection_lost(self, exc: Exception | None = None) -> None:
137138
self._transport = None
138139
self._connection_state = False
139140

141+
@property
142+
def processed_messages(self) -> int:
143+
"""Return the number of processed messages."""
144+
return self._processed_msgs
145+
140146
@property
141147
def is_connected(self) -> bool:
142148
"""Return current connection state of the USB-Stick."""
143149
return self._connection_state
144150

151+
def correct_processed_messages(self, correction: int) -> None:
152+
"""Return the number of processed messages."""
153+
self._processed_msgs += correction
154+
145155
def connection_made(self, transport: SerialTransport) -> None:
146156
"""Call when the serial connection to USB-Stick is established."""
147157
_LOGGER.info("Connection made")
@@ -278,6 +288,7 @@ async def _message_queue_worker(self) -> None:
278288
await self._notify_stick_subscribers(response)
279289
else:
280290
await self._notify_node_response_subscribers(response)
291+
self._processed_msgs += 1
281292
self._message_queue.task_done()
282293
await sleep(0)
283294
_LOGGER.debug("Message queue worker stopped")
@@ -457,7 +468,7 @@ async def _notify_node_response_subscribers(
457468

458469
self._node_subscription_lock.release()
459470
if len(notify_tasks) > 0:
460-
_LOGGER.debug("Received %s", node_response)
471+
_LOGGER.debug("Received %s %s", node_response, node_response.seq_id)
461472
if node_response.seq_id not in BROADCAST_IDS:
462473
self._last_processed_messages.append(node_response.seq_id)
463474
# Limit tracking to only the last appended request (FIFO)

plugwise_usb/connection/sender.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,17 @@ def __init__(self, stick_receiver: StickReceiver, transport: Transport) -> None:
3838
self._loop = get_running_loop()
3939
self._receiver = stick_receiver
4040
self._transport = transport
41+
self._processed_msgs = 0
4142
self._stick_response: Future[StickResponse] | None = None
4243
self._stick_lock = Lock()
4344
self._current_request: None | PlugwiseRequest = None
4445
self._unsubscribe_stick_response: Callable[[], None] | None = None
4546

47+
@property
48+
def processed_messages(self) -> int:
49+
"""Return the number of processed messages."""
50+
return self._processed_msgs
51+
4652
async def start(self) -> None:
4753
"""Start the sender."""
4854
# Subscribe to ACCEPT stick responses, which contain the seq_id we need.
@@ -133,6 +139,7 @@ async def write_request_to_port(self, request: PlugwiseRequest) -> None:
133139
finally:
134140
self._stick_response.cancel()
135141
self._stick_lock.release()
142+
self._processed_msgs += 1
136143

137144
async def _process_stick_response(self, response: StickResponse) -> None:
138145
"""Process stick response."""

plugwise_usb/helpers/cache.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,8 @@ async def initialize_cache(self, create_root_folder: bool = False) -> None:
5959
cache_dir = self._get_writable_os_dir()
6060
await makedirs(cache_dir, exist_ok=True)
6161
self._cache_path = cache_dir
62-
if os_name == "nt":
63-
self._cache_file = f"{cache_dir}\\{self._file_name}"
64-
else:
65-
self._cache_file = f"{cache_dir}/{self._file_name}"
62+
63+
self._cache_file = os_path_join(self._cache_path, self._file_name)
6664
self._cache_file_exists = await ospath.exists(self._cache_file)
6765
self._initialized = True
6866
_LOGGER.debug("Start using network cache file: %s", self._cache_file)

plugwise_usb/nodes/circle.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from __future__ import annotations
44

5-
from asyncio import Task, create_task, gather
5+
from asyncio import Task, create_task
66
from collections.abc import Awaitable, Callable
77
from dataclasses import replace
88
from datetime import UTC, datetime
@@ -453,11 +453,8 @@ async def get_missing_energy_logs(self) -> None:
453453
log_address, _ = calc_log_address(log_address, 1, -4)
454454
total_addresses -= 1
455455

456-
if not all(await gather(*log_update_tasks)):
457-
_LOGGER.info(
458-
"Failed to request one or more update energy log for %s",
459-
self._mac_in_str,
460-
)
456+
for task in log_update_tasks:
457+
await task
461458

462459
if self._cache_enabled:
463460
await self._energy_log_records_save_to_cache()
@@ -475,9 +472,8 @@ async def get_missing_energy_logs(self) -> None:
475472
)
476473

477474
missing_addresses = sorted(missing_addresses, reverse=True)
478-
await gather(
479-
*[self.energy_log_update(address) for address in missing_addresses]
480-
)
475+
for address in missing_addresses:
476+
await self.energy_log_update(address)
481477

482478
if self._cache_enabled:
483479
await self._energy_log_records_save_to_cache()
@@ -528,7 +524,7 @@ async def _energy_log_records_load_from_cache(self) -> bool:
528524
"""Load energy_log_record from cache."""
529525
cache_data = self._get_cache(CACHE_ENERGY_COLLECTION)
530526
if (cache_data := self._get_cache(CACHE_ENERGY_COLLECTION)) is None:
531-
_LOGGER.debug(
527+
_LOGGER.warning(
532528
"Failed to restore energy log records from cache for node %s", self.name
533529
)
534530
return False
@@ -811,7 +807,7 @@ async def _load_from_cache(self) -> bool:
811807
return False
812808
# Energy collection
813809
if await self._energy_log_records_load_from_cache():
814-
_LOGGER.debug(
810+
_LOGGER.warning(
815811
"Node %s failed to load energy_log_records from cache",
816812
self._mac_in_str,
817813
)

plugwise_usb/nodes/helpers/subscription.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from dataclasses import dataclass
88
from typing import Any
99

10+
1011
from ...api import NodeFeature
1112

1213

@@ -20,11 +21,12 @@ class NodeFeatureSubscription:
2021

2122
class FeaturePublisher:
2223
"""Base Class to call awaitable of subscription when event happens."""
24+
def __init__(self) -> None:
25+
self._feature_update_subscribers: dict[
26+
Callable[[], None],
27+
NodeFeatureSubscription,
28+
] = {}
2329

24-
_feature_update_subscribers: dict[
25-
Callable[[], None],
26-
NodeFeatureSubscription,
27-
] = {}
2830

2931
def subscribe_to_feature_update(
3032
self,

plugwise_usb/nodes/node.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ def __init__(
6363
loaded_callback: Callable[[NodeEvent, str], Awaitable[None]],
6464
):
6565
"""Initialize Plugwise base node class."""
66+
super().__init__()
6667
self._loaded_callback = loaded_callback
6768
self._message_subscribe = controller.subscribe_to_messages
6869
self._features: tuple[NodeFeature, ...] = NODE_FEATURES
@@ -415,7 +416,8 @@ async def _available_update_state(
415416
if (
416417
self._last_seen is not None
417418
and timestamp is not None
418-
and self._last_seen < timestamp
419+
and (timestamp - self._last_seen).seconds > 5
420+
419421
):
420422
self._last_seen = timestamp
421423
await self.publish_feature_update_to_subscribers(

tests/test_usb.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -838,7 +838,7 @@ async def test_node_relay_and_power(self, monkeypatch: pytest.MonkeyPatch) -> No
838838
assert await stick.nodes["2222222222222222"].load()
839839
self.test_init_relay_state_on = asyncio.Future()
840840
self.test_init_relay_state_off = asyncio.Future()
841-
unsub_inti_relay = stick.nodes["0098765432101234"].subscribe_to_feature_update(
841+
unsub_inti_relay = stick.nodes["2222222222222222"].subscribe_to_feature_update(
842842
node_feature_callback=self.node_init_relay_state,
843843
features=(pw_api.NodeFeature.RELAY_INIT,),
844844
)

0 commit comments

Comments
 (0)