Skip to content

Commit 0fe1085

Browse files
committed
Fix issue with Queue and write with wait==True.
1 parent a1c7b61 commit 0fe1085

File tree

2 files changed

+18
-20
lines changed

2 files changed

+18
-20
lines changed

iot/client.py

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
class ASetByte:
4343
def __init__(self):
4444
self._ba = bytearray(32)
45-
self._eve = asyncio.Event()
46-
self._eve.set() # Empty event initially set
4745
self._evdis = asyncio.Event() # Discard event
4846

4947
def __bool__(self):
@@ -53,17 +51,11 @@ def __contains__(self, i):
5351
return (self._ba[i >> 3] & 1 << (i & 7)) > 0
5452

5553
def add(self, i):
56-
self._eve.clear()
5754
self._ba[i >> 3] |= 1 << (i & 7)
5855

5956
def discard(self, i):
6057
self._ba[i >> 3] &= ~(1 << (i & 7))
6158
self._evdis.set()
62-
if not any(self._ba):
63-
self._eve.set()
64-
65-
async def wait_empty(self): # Pause until empty
66-
await self._eve.wait()
6759

6860
async def has_not(self, i): # Pause until i not in set
6961
while i in self:
@@ -140,6 +132,7 @@ def inner(feed=WDT_CB):
140132
self._evfail = asyncio.Event() # Set by any comms failure
141133
self._evok = asyncio.Event() # Set by 1st successful read
142134
self._s_lock = asyncio.Lock() # For internal send conflict.
135+
self._w_lock = asyncio.Lock() # For .write rate limit
143136
self._last_wr = utime.ticks_ms()
144137
self._lineq = Queue(20) # 20 entries
145138
self.connects = 0 # Connect count for test purposes/app access
@@ -162,15 +155,19 @@ async def readline(self):
162155

163156
async def write(self, buf, qos=True, wait=True):
164157
if qos and wait: # Disallow concurrent writes
165-
await self._acks_pend.wait_empty()
166-
# Prepend message ID to a copy of buf
167-
fstr = '{:02x}{}' if buf.endswith('\n') else '{:02x}{}\n'
168-
mid = next(getmid)
169-
self._acks_pend.add(mid)
170-
buf = fstr.format(mid, buf)
171-
await self._write(buf)
172-
if qos: # Return when an ACK received
173-
await self._do_qos(mid, buf)
158+
await self._w_lock.acquire()
159+
try: # In case of cancellation/timeout
160+
# Prepend message ID to a copy of buf
161+
fstr = '{:02x}{}' if buf.endswith('\n') else '{:02x}{}\n'
162+
mid = next(getmid)
163+
self._acks_pend.add(mid)
164+
buf = fstr.format(mid, buf)
165+
await self._write(buf)
166+
if qos: # Return when an ACK received
167+
await self._do_qos(mid, buf)
168+
finally:
169+
if qos and wait:
170+
self._w_lock.release()
174171

175172
def close(self):
176173
self._close() # Close socket and WDT

iot/primitives/queue.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@ def _get(self):
3131
return self._queue.pop(0)
3232

3333
async def get(self): # Usage: item = await queue.get()
34-
if self.empty():
35-
# Queue is empty, put the calling Task on the waiting queue
34+
while self.empty(): # May be multiple tasks waiting on get()
35+
# Queue is empty, suspend task until a put occurs
36+
# 1st of N tasks gets, the rest loop again
3637
self._evput.clear()
3738
await self._evput.wait()
3839
return self._get()
@@ -48,7 +49,7 @@ def _put(self, val):
4849
self._queue.append(val)
4950

5051
async def put(self, val): # Usage: await queue.put(item)
51-
if self.full():
52+
while self.full():
5253
# Queue full
5354
self._evget.clear()
5455
await self._evget.wait()

0 commit comments

Comments
 (0)