Skip to content

Commit

Permalink
pad the queue buffer a little more
Browse files Browse the repository at this point in the history
  • Loading branch information
deeleeramone committed Dec 22, 2024
1 parent 9030261 commit 8cd6396
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def enqueue(self, message):
while retries < self.max_retries:

if self.queue.qsize() / self.queue.maxsize > 0.3:
await sleep(0.000005)
await sleep(0.00005)
if self.queue.qsize() / self.queue.maxsize > 0.5:
await sleep(0.00005)
if self.queue.qsize() / self.queue.maxsize > 0.55:
Expand All @@ -52,8 +52,12 @@ async def enqueue(self, message):
await sleep(0.00005)
if self.queue.qsize() / self.queue.maxsize > 0.7:
await sleep(0.00005)
if self.queue.qsize() / self.queue.maxsize > 0.99:
if self.queue.qsize() / self.queue.maxsize > 0.75:
await sleep(0.00005)
if self.queue.qsize() / self.queue.maxsize > 0.8:
await sleep(0.00005)
if self.queue.qsize() / self.queue.maxsize > 0.98:
await sleep(0.0005)

if self.queue.full():
retries += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ async def connect_and_stream():
queue.process_queue(lambda message: process_message(message))
)
tasks.add(handler_task)
for i in range(0, 64):
for i in range(0, 128):
new_task = asyncio.create_task(
queue.process_queue(lambda message: process_message(message))
)
Expand Down

0 comments on commit 8cd6396

Please sign in to comment.