Skip to content

Commit

Permalink
threading (#17)
Browse files Browse the repository at this point in the history
* threading - added some threading examples

* threading - more playing around with asynciou.

* threading - flake8 fixes.
  • Loading branch information
Mark-Barbaric authored Sep 1, 2024
1 parent 8f88aff commit 8781890
Show file tree
Hide file tree
Showing 8 changed files with 711 additions and 3 deletions.
Empty file added .docker-compose.yml
Empty file.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@ __pycache__/

.coverage
.coverage.*
coverage.*
coverage.*

# general files
*.json
*.log
561 changes: 560 additions & 1 deletion poetry.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ httpx = "^0.27.0"
coverage = "^7.6.1"
flask = "^3.0.3"
flask-restful = "^0.3.10"
pydantic-settings = "^2.4.0"
asyncio = "^3.4.3"

[tool.poetry.group.jupyter.dependencies]
matplotlib = "^3.9.2"

[build-system]
requires = ["poetry-core"]
Expand Down
34 changes: 34 additions & 0 deletions src/general_python/threading/asyncio_progress/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# SuperFastPython.com
# example of showing the progress of tasks using a callback
import random
import asyncio


# callback function to show the progress of tasks
def progress():
# report progress of the task
print('.', end='')


# coroutine task
async def work():
# simulate effort
await asyncio.sleep(random.random() * 10)


# main coroutine
async def main():
# create and schedule many tasks
tasks = [asyncio.create_task(work()) for _ in range(20)]
# add done callback function to all tasks
for task in tasks:
task.add_done_callback(progress)
# wait for all tasks to complete
_ = await asyncio.wait(tasks)
# report final message
print('\nAll done.')

# run the asyncio program

if __name__ == "__main__":
asyncio.run(main())
53 changes: 53 additions & 0 deletions src/general_python/threading/asyncio_queues/producer_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# SuperFastPython.com
# example of using an asyncio queue
from random import random
import asyncio


# change this to alter behavior between consumer and producer
MAX_QUEUE_SIZE = 10


# coroutine to generate work
async def producer(queue):
print('Producer: Running')
# generate work
for i in range(10):
# generate a value
value = random()
# block to simulate work
# add to the queue
print(f"Producer: put {value} at count: {i}")
await queue.put(value)
# send an all done signal
await queue.put(None)
print('Producer: Done')


# coroutine to consume work
async def consumer(queue):
print('Consumer: Running')
# consume work
while True:
# get a unit of work
item = await queue.get()
print(f"Consumer: retrieved {item}")
# check for stop signal
if item is None:
break
# report
# all done
print('Consumer: Done')


# entry point coroutine
async def main():
# create the shared queue
queue = asyncio.Queue(MAX_QUEUE_SIZE)
# run the producer and consumers
await asyncio.gather(producer(queue), consumer(queue))


if __name__ == '__main__':
# start the asyncio program
asyncio.run(main())
54 changes: 54 additions & 0 deletions src/general_python/threading/asyncio_queues/progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import asyncio
import random
import time


async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()

# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)

# Notify the queue that the "work item" has been processed.
queue.task_done()

print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()

# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)

# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)

# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at

# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)

print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


if __name__ == "__main__":
asyncio.run(main())
1 change: 0 additions & 1 deletion src/system_design/rest/fastapi_logging/README.md

This file was deleted.

0 comments on commit 8781890

Please sign in to comment.