Я работаю над сценарием Python, где мне нужно сделать первоначальный запрос для получения идентификатора. Получив идентификатор, мне нужно сделать несколько дополнительных запросов, чтобы получить данные, связанные с этим идентификатором. Я понимаю, что эти последующие запросы могут выполняться асинхронно для повышения производительности. Однако я не уверен, как это эффективно реализовать.
Вот упрощенная версия моего текущего синхронного подхода:
import requests
# Initial request to get the ID
response = requests.get('https://api.example.com/get_id')
id = response.json()['id']
# Subsequent requests to get data related to the ID
data1 = requests.get(f'https://api.example.com/data/{id}/info1').json()
data2 = requests.get(f'https://api.example.com/data/{id}/info2').json()
data3 = requests.get(f'https://api.example.com/data/{id}/info3').json()
# Processing the data
process_data(data1, data2, data3)
Я хотел бы выполнять запросы к info1, info2 и info3 асинхронно. Как я могу добиться этого, используя asyncio или любую другую библиотеку?
Я изучил httpx, но не знаю, как правильно структурировать код. Любая помощь или пример кода будут очень признательны!
ThreadPool, вероятно, самый простой способ сделать это параллельно:
(поскольку библиотека запросов является добросовестной и освобождает GIL во время ожидания ответа HTTP, они выполняются параллельно)
import requests
# Initial request to get the ID
response = requests.get("https://api.example.com/get_id")
id = response.json()["id"]
# Subsequent requests to get data related to the ID
# do these requests in parallel:
# Option 1: Using threads
from multiprocessing.pool import ThreadPool
def get_data(url: str):
return requests.get(url).json()
urls = [
f"https://api.example.com/data/{id}/info1",
f"https://api.example.com/data/{id}/info2",
f"https://api.example.com/data/{id}/info3",
]
with ThreadPool(3) as pool:
data1, data2, data3 = pool.map(get_data, urls)
def process_data(*args):
pass
# Processing the data
process_data(data1, data2, data3)
3 потока не используют в 3 раза больше системных ресурсов. Потоки легкие, а у Python есть GIL, поэтому вам никогда не придется выполнять в три раза больше работы на земле Python. Единственное исключение из этого правила - это когда вы вызываете некоторый код, который не находится на территории Python, и освобождает GIL - т.е. некоторую библиотеку, которая является оболочкой Python для некоторого кода более низкого уровня - т.е. расширение Python, написанное на C/Rust и т. д. В В этом конкретном примере три вещи могут выполняться параллельно, потому что большая часть времени тратится на ожидание ответа HTTP, который освобождает GIL. Ожидание не требует в 3 раза больше ресурсов.
По моему опыту написания производственного кода, потоки гораздо проще писать (а также поддерживать и рассуждать), чем асинхронные, и они работают так же хорошо. Как только вы напишете одну асинхронную функцию на Python (и на многих других языках, таких как JavaScript и Rust), вам в конечном итоге придется создать кучу других функций, которые вызывают указанные функции, также асинхронные, и асинхронность распространяется по всему вашему приложению. Это может быть приемлемо, или вы можете использовать asyncio.run
, но, по моему опыту, потоки проще и легче и обеспечивают очень сопоставимую производительность. ОП сказал «улучшить производительность» - вот что я сделал.
Как и почти в любой проблеме, существует множество способов добиться схожих результатов, и я предложил только один из них. Люди могут выбирать, хотят ли они пойти по пути асинхронности или потоков, в зависимости от своих требований и ограничений.
Спасибо @Alex L за информацию. Я не знал, что могу использовать Threadpool для создания нескольких потоков вместо создания нескольких процессов.
Ничто не мешает использовать асинхронные системы синхронно, просто нужно await
выполнить любые действия, которые они хотят завершить, до следующего запланированного дела.
Основная трудность заключается в обеспечении асинхронной совместимости приложения, что может оказаться значительно более сложной задачей.
Я реализовал окружающую структуру для задачи, которую вы описали ниже. Если вы хотите реализовать это в долгосрочном проекте, вероятно, будет разумно более широко рассмотреть окружающий код и структуру.
import httpx
import asyncio
async def main():
async with httpx.AsyncClient() as client:
# Initial request to get the ID
response = await client.get('https://api.example.com/get_id')
id = response.json()['id']
print('done with sync task')
# Subsequent requests to get data related to the ID
task1 = asyncio.create_task(client.get(f'https://api.example.com/data/{id}/info1'))
task2 = asyncio.create_task(client.get(f'https://api.example.com/data/{id}/info2'))
task3 = asyncio.create_task(client.get(f'https://api.example.com/data/{id}/info3'))
# Only await the tasks after having scheduled them all. With larger task counts asyncio.gather() should be considered.
data1 = (await task1).json()
data2 = (await task2).json()
data3 = (await task3).json()
print(data1, data2, data3)
asyncio.run(main())
Обратите внимание, что первоначальный вызов API полностью синхронен и может быть выполнен с помощью response = httpx.get('https://api.example.com/get_id')
без необходимости ожидания. Учитывая, что мы уже находимся внутри httpx
Client
, это было бы немного анти-шаблоном.
Я настоятельно рекомендую вам прочитать документацию httpx , они были отличной отправной точкой, когда я впервые познакомился с асинхронным программированием. документация asyncio для справки
Ссылки в вашем коде не работают, Я использовал другой API.
Посмотрите, поможет ли вам это решение.
import asyncio
import aiohttp
async def fetch_data(session, url):
async with session.get(url) as response:
return await response.json()
async def main():
base_url = 'https://swapi.dev/api/'
async with aiohttp.ClientSession() as session:
# Synchronous request for basic information
response = await session.get(base_url + 'people/')
people = (await response.json()).get("results", [])
# Asynchronous requests to personal pages
tasks = [fetch_data(session, person["url"]) for person in people]
data = await asyncio.gather(*tasks)
for person in data:
print(person["name"])
if __name__ == '__main__':
asyncio.run(main())
Аналогично, но с использованием группы задач (python >= 3.11)
import httpx
import asyncio
async def main():
async with httpx.AsyncClient() as client:
response = await client.get('https://api.example.com/get_id')
id = response.json()['id']
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(client.get(f'https://api.example.com/data/{id}/info1'))
task2 = tg.create_task(client.get(f'https://api.example.com/data/{id}/info2'))
task3 = tg.create_task(client.get(f'https://api.example.com/data/{id}/info3'))
# Processing the data
process_data(task1.result(), task2.result(), task3.result())
Я не уверен, просил ли ОП делать это параллельно. Насколько я понимаю, они просто хотят использовать асинхронную операцию для запросов. ОП должен убедиться, что если это используется в каком-либо производственном приложении, перед этим следует принять во внимание, поскольку при этом одновременно используются три потока вместо одного. По сути, если это конечная точка, то для завершения одного вызова API потребуется три ресурса.