From e5b31d8bc7edb7d4f13f1f14159f66b323349837 Mon Sep 17 00:00:00 2001
From: Tobias Ahrens
Date: Mon, 9 Dec 2024 15:47:39 +0100
Subject: [PATCH] Register queue on streaming handle before sending the
subscribe request
This commit creates the subscription queue befor sending out the
subscribe request. This ensures that if the data server sends an
update event before the subscribe request returns, the data is still
processed.
---
CHANGELOG.md | 3 +++
src/labone/core/session.py | 19 ++++++++++++-------
2 files changed, 15 insertions(+), 7 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b309afa..184a3a5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,8 @@
# Labone Python API Changelog
+## Version 3.2.1
+* Fix bug that caused subscriptions to potentially miss value updates after the subscription was registered but before the subscribe functions returned.
+
## Version 3.2.0
* `subscribe` accepts keyword arguments, which are forwarded to the data-server.
This allows to configure the subscription to the data-server.
diff --git a/src/labone/core/session.py b/src/labone/core/session.py
index 0425a7f..df8f5e9 100644
--- a/src/labone/core/session.py
+++ b/src/labone/core/session.py
@@ -853,23 +853,28 @@ async def subscribe(
},
}
if get_initial_value:
- _, initial_value = await asyncio.gather(
- self._session.subscribe(subscription=subscription),
- self.get(path),
- )
new_queue_type = queue_type or DataQueue
queue = new_queue_type(
path=path,
handle=streaming_handle,
)
- queue.put_nowait(initial_value)
+ _, initial_value = await asyncio.gather(
+ self._session.subscribe(subscription=subscription),
+ self.get(path),
+ )
+ # If the queue already has received a update event we do not
+ # need to put the initial value in the queue. As it may break the
+ # order.
+ if queue.empty():
+ queue.put_nowait(initial_value)
return queue
- await self._session.subscribe(subscription=subscription)
new_queue_type = queue_type or DataQueue
- return new_queue_type(
+ queue = new_queue_type(
path=path,
handle=streaming_handle,
)
+ await self._session.subscribe(subscription=subscription)
+ return queue
async def wait_for_state_change(
self,