Skip to content
This repository was archived by the owner on Dec 16, 2025. It is now read-only.

Commit 5576c0f

Browse files
authored
More robust timeout / disconnect management for Requester (#4)
* refactor: improve timeout handling and error messages in endpoint request method * feat: add ResponseTimeout exception and improve request error handling * fix: improve timeout error handling by ensuring proper decoding of error payload
1 parent a8235bd commit 5576c0f

File tree

2 files changed

+24
-19
lines changed

2 files changed

+24
-19
lines changed

make87/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from make87.endpoints import ( # noqa
44
ProviderNotAvailable,
5+
ResponseTimeout,
56
TypedProvider,
67
TypedRequester,
78
get_provider,
@@ -49,6 +50,7 @@
4950
"Metadata",
5051
"MultiSubscriber",
5152
"ProviderNotAvailable",
53+
"ResponseTimeout",
5254
"TypedProvider",
5355
"TypedPublisher",
5456
"TypedRequester",

make87/endpoints.py

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import zenoh
77
from google.protobuf.message import Message
8-
import concurrent.futures
98

109
from make87.session import get_session
1110
from make87.utils import parse_endpoints, REQ, PRV, IS_IN_RELEASE_MODE, RingChannel, FifoChannel
@@ -17,6 +16,9 @@
1716
class ProviderNotAvailable(Exception): ...
1817

1918

19+
class ResponseTimeout(Exception): ...
20+
21+
2022
class Endpoint:
2123
"""Base class for endpoints."""
2224

@@ -128,34 +130,35 @@ def __init__(
128130
self._express = True if express is None else express
129131

130132
def request(self, message: zenoh.ZBytes, timeout: float = 10.0) -> zenoh.ZBytes:
131-
subscriber = self._session.liveliness().declare_subscriber(key_expr=self.name, history=True)
132-
133-
# Use the standard ThreadPoolExecutor to call subscriber.recv() with a timeout.
134-
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
135-
future = executor.submit(subscriber.recv)
136-
try:
137-
sample = future.result(timeout=timeout)
138-
except concurrent.futures.TimeoutError:
139-
raise ProviderNotAvailable(
140-
f"Endpoint '{self.name}' is not available. Timed out waiting for endpoint to become available."
141-
)
142-
143-
if sample.kind != zenoh.SampleKind.PUT:
144-
raise ProviderNotAvailable(f"Endpoint '{self.name}' is not available.")
145-
146133
reply = self._session.get(
147134
selector=self.name,
148135
payload=message,
149136
encoding=zenoh.Encoding.APPLICATION_PROTOBUF,
150137
priority=self._priority,
151138
express=self._express,
152139
congestion_control=self._congestion_control,
153-
).recv()
140+
timeout=timeout,
141+
)
154142

155143
try:
144+
reply = reply.recv()
145+
except zenoh.ZError as e:
146+
if all(token in str(e).upper() for token in ("CHANNEL", "CLOSED")):
147+
raise ProviderNotAvailable(f"Endpoint '{self.name}' is not available.")
148+
else:
149+
raise Exception(f"Error while requesting endpoint '{self.name}': {e}")
150+
151+
if reply.ok is not None:
156152
return reply.ok.payload
157-
except Exception:
158-
raise Exception(f"Failed to request endpoint '{self.name}': {reply.err.payload.to_string()}")
153+
elif reply.err is not None:
154+
if bytes(reply.err.payload).decode("utf-8").strip().upper() == "TIMEOUT":
155+
raise ResponseTimeout(
156+
f"Waited {timeout}s for response until timed out. Consider increasing your timeout or checking with the provider side."
157+
)
158+
else:
159+
raise Exception(
160+
f"Error returned while requesting endpoint '{self.name}': {reply.err.payload.to_string()}"
161+
)
159162

160163

161164
class _EndpointManager:

0 commit comments

Comments
 (0)