|
| 1 | +# asyncio |
| 2 | +Тестовые задания для проверки знания библиотеки asyncio. |
| 3 | + |
| 4 | +## Manual |
| 5 | +Есть две вариации первого задания (requests): |
| 6 | +* Interactiveness (более быстрая вариация, которая поможет проверить именно основные моменты). |
| 7 | +* Independence (более сложная вариация, где кандидату нужно будет писать все с нуля). |
| 8 | + |
| 9 | +#### Independence requests |
| 10 | +Задачи: |
| 11 | +Нужно написать код, который шлёт запросы в API, который умеет: |
| 12 | +1. Параллелизировать запросы |
| 13 | +2. Лимитировать запросы для случаев (когда API сервер может обрабатывать только 10 запросов за раз) |
| 14 | +3. Выдавать результаты по мере поступления (если один из конкурентных запросов выполнился быстро, то получить его результат) |
| 15 | + |
| 16 | +Вариант решения в самом низу блока "Interactiveness requests". |
| 17 | + |
| 18 | +#### Interactiveness requests |
| 19 | +Нам нужно делать N запросов в API, все работает медленно. Как ускорить код используя асинхронность? |
| 20 | + |
| 21 | +Вариант решения: |
| 22 | +```python |
| 23 | +async def fetch_example(session) -> int: |
| 24 | + async with session.get('https://www.example.com') as response: |
| 25 | + return response.status |
| 26 | + |
| 27 | + |
| 28 | +async def read_example_async(requests_count: int) -> list[int]: |
| 29 | + async with aiohttp.ClientSession() as session: |
| 30 | + tasks = [fetch_example(session) for _ in range(requests_count)] |
| 31 | + responses = await asyncio.gather(*tasks) |
| 32 | + |
| 33 | + return responses |
| 34 | +``` |
| 35 | +Вопросы: |
| 36 | +1. Что если одна из тасок поднимет исключение? |
| 37 | +2. Как сделать повторные запросы только для тех тасок, которые не завершились? |
| 38 | + |
| 39 | +Теперь мы наш другой сервис (example.com) не выдерживает такую нагрузку. Необходимо уменьшить её. |
| 40 | +Вариант решения с помощью семафоров: |
| 41 | +```python |
| 42 | +async def fetch_example(session: aiohttp.ClientSession, semaphore: asyncio.Semaphore): |
| 43 | + async with semaphore: |
| 44 | + async with session.get('https://www.example.com') as response: |
| 45 | + return response.status |
| 46 | + |
| 47 | + |
| 48 | +async def read_example_async(requests_count: int) -> list[int]: |
| 49 | + semaphore = asyncio.Semaphore(10) |
| 50 | + |
| 51 | + async with aiohttp.ClientSession() as session: |
| 52 | + tasks = [fetch_example(session, semaphore) for _ in range(requests_count)] |
| 53 | + responses = await asyncio.gather(*tasks) |
| 54 | + |
| 55 | + return responses |
| 56 | +``` |
| 57 | +Вопросы: |
| 58 | +1. Чем отличается semaphore от bounded semaphore? |
| 59 | +2. Что если запросы могут длиться слишком долго и мы бы хотели добавить таймауты. Как это реализовать? |
| 60 | + |
| 61 | +Некоторые запросы выполняются слишком долго и нам нужно возвращать те, которые уже получены как можно раньше. Как этого добиться? |
| 62 | +Вариант решения: |
| 63 | +```python |
| 64 | +async def fetch_example(session: aiohttp.ClientSession, semaphore: asyncio.Semaphore) -> int: |
| 65 | + async with semaphore: |
| 66 | + async with session.get('https://www.example.com') as response: |
| 67 | + return response.status |
| 68 | + |
| 69 | + |
| 70 | +async def read_example_async(requests_count: int): |
| 71 | + semaphore = asyncio.Semaphore(10) |
| 72 | + |
| 73 | + async with aiohttp.ClientSession() as session: |
| 74 | + tasks = [fetch_example(session, semaphore) for _ in range(requests_count)] |
| 75 | + |
| 76 | + for coroutine in asyncio.as_completed(tasks): |
| 77 | + response = await coroutine |
| 78 | + some_business_logic(response) |
| 79 | +``` |
| 80 | +Вопросы: |
| 81 | +1. Какие у as_completed есть минусы по сравнению с gather? |
| 82 | + |
| 83 | +* #### hashlib |
| 84 | +Мы узнали что в базе хранятся незахешированные пароли. И нам нужно срочно захешировать их все. |
| 85 | +Пользователей в базе очень много. Даже с нашими мощностями это будет крайне долгая операция. |
| 86 | +Как нам ускорить следующий код? |
| 87 | +Проблема в том, что hashlib не асинхронный. |
| 88 | + |
| 89 | +Вариант решения: |
| 90 | +```python |
| 91 | +from concurrent.futures.thread import ThreadPoolExecutor |
| 92 | + |
| 93 | +async def hash_all_passwords() -> list[str]: |
| 94 | + loop = asyncio.get_running_loop() |
| 95 | + tasks = [] |
| 96 | + |
| 97 | + with ThreadPoolExecutor(max_workers=100) as pool: |
| 98 | + for password in passwords: |
| 99 | + tasks.append(loop.run_in_executor(pool, functools.partial(get_hash, password))) |
| 100 | + |
| 101 | + hashed_passwords = await asyncio.gather(*tasks) |
| 102 | + return hashed_passwords |
| 103 | +``` |
| 104 | +Решение заключается в том, что библиотеки которые написаны на ЯП в которых нет GIL позволяют нам использовать их в тредах. |
| 105 | +Кандитат может предложить решение с использованием мультипроцессинга, это тоже валидно. |
| 106 | + |
| 107 | +Вопросы: |
| 108 | +1. Как выбрать оптимальное количество воркеров? |
| 109 | +2. Что в этом случае будет эффективней мультипроцессинг или мультитрединг? |
| 110 | + |
| 111 | + |
| 112 | +* #### sync_primitives |
| 113 | +Найти проблему в коде и решить её. |
| 114 | +У нас есть параллельная задача, которой нужна копия данных сайта. Сначала она проверит есть ли она в кэше, если есть, |
| 115 | +то получит её из кэша, а если нет, то сделает запрос на API. |
| 116 | +Проблема в том, что чтения данных занимает некоторое время для возврата и обновления кэша, при одновременном выполнении |
| 117 | +нескольких параллельных задач все они предполагают, что этих данных в кэше нет, и делают запросы на API. |
| 118 | +В нашем примере обе таски сделают запрос на API, хотя мы бы хотели, чтобы запрос сделал лишь первый, а второй получил данные из кеша. |
| 119 | + |
| 120 | +Если у кандидата возникнут сложности, то он может попробовать запустить код. |
| 121 | +```python |
| 122 | +async def get_value(key: str, lock: asyncio.Lock): |
| 123 | + async with lock: |
| 124 | + if key not in cache: |
| 125 | + print(f"The value of key {key} is not in cache") |
| 126 | + value = await request_remote() |
| 127 | + cache[key] = value |
| 128 | + else: |
| 129 | + print(f"The value of key {key} is already in cache") |
| 130 | + value = cache[key] |
| 131 | + print(f"The value of {key} is {value}") |
| 132 | + return value |
| 133 | + |
| 134 | + |
| 135 | +async def main(): |
| 136 | + lock = asyncio.Lock() |
| 137 | + task_one = asyncio.create_task(get_value("status", lock)) |
| 138 | + task_two = asyncio.create_task(get_value("status", lock)) |
| 139 | + |
| 140 | + await asyncio.gather(task_one, task_two) |
| 141 | +``` |
| 142 | +Вопросы: |
| 143 | +1. ... |
| 144 | + |
| 145 | + |
| 146 | +## todo: |
| 147 | +- Можно накинуть задачку на Condition/Event/BoundedSemaphore. |
| 148 | +- В 3.11 появился Barrier, можно придумать задачу на него. |
0 commit comments