Skip to content

Commit

Permalink
Register queue on streaming handle before sending the subscribe request
Browse files Browse the repository at this point in the history
This commit creates the subscription queue befor sending out the
subscribe request. This ensures that if the data server sends an
update event before the subscribe request returns, the data is still
processed.
  • Loading branch information
tobiasah committed Dec 9, 2024
1 parent abde7fb commit e5b31d8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Labone Python API Changelog

## Version 3.2.1
* Fix bug that caused subscriptions to potentially miss value updates after the subscription was registered but before the subscribe functions returned.

## Version 3.2.0
* `subscribe` accepts keyword arguments, which are forwarded to the data-server.
This allows to configure the subscription to the data-server.
Expand Down
19 changes: 12 additions & 7 deletions src/labone/core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -853,23 +853,28 @@ async def subscribe(
},
}
if get_initial_value:
_, initial_value = await asyncio.gather(
self._session.subscribe(subscription=subscription),
self.get(path),
)
new_queue_type = queue_type or DataQueue
queue = new_queue_type(
path=path,
handle=streaming_handle,
)
queue.put_nowait(initial_value)
_, initial_value = await asyncio.gather(
self._session.subscribe(subscription=subscription),
self.get(path),
)
# If the queue already has received a update event we do not
# need to put the initial value in the queue. As it may break the
# order.
if queue.empty():
queue.put_nowait(initial_value)
return queue
await self._session.subscribe(subscription=subscription)
new_queue_type = queue_type or DataQueue
return new_queue_type(
queue = new_queue_type(
path=path,
handle=streaming_handle,
)
await self._session.subscribe(subscription=subscription)
return queue

async def wait_for_state_change(
self,
Expand Down

0 comments on commit e5b31d8

Please sign in to comment.