Как выполнить одиночные синхронные и несколько асинхронных запросов в Python?

Я работаю над сценарием Python, где мне нужно сделать первоначальный запрос для получения идентификатора. Получив идентификатор, мне нужно сделать несколько дополнительных запросов, чтобы получить данные, связанные с этим идентификатором. Я понимаю, что эти последующие запросы могут выполняться асинхронно для повышения производительности. Однако я не уверен, как это эффективно реализовать.

Вот упрощенная версия моего текущего синхронного подхода:

import requests

# Initial request to get the ID
response = requests.get('https://api.example.com/get_id')
id = response.json()['id']

# Subsequent requests to get data related to the ID
data1 = requests.get(f'https://api.example.com/data/{id}/info1').json()
data2 = requests.get(f'https://api.example.com/data/{id}/info2').json()
data3 = requests.get(f'https://api.example.com/data/{id}/info3').json()

# Processing the data
process_data(data1, data2, data3)

Я хотел бы выполнять запросы к info1, info2 и info3 асинхронно. Как я могу добиться этого, используя asyncio или любую другую библиотеку?

Я изучил httpx, но не знаю, как правильно структурировать код. Любая помощь или пример кода будут очень признательны!

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
4
0
103
4
Перейти к ответу Данный вопрос помечен как решенный

Ответы 4

ThreadPool, вероятно, самый простой способ сделать это параллельно:

(поскольку библиотека запросов является добросовестной и освобождает GIL во время ожидания ответа HTTP, они выполняются параллельно)

import requests

# Initial request to get the ID
response = requests.get("https://api.example.com/get_id")
id = response.json()["id"]

# Subsequent requests to get data related to the ID
# do these requests in parallel:

# Option 1: Using threads
from multiprocessing.pool import ThreadPool


def get_data(url: str):
    return requests.get(url).json()


urls = [
    f"https://api.example.com/data/{id}/info1",
    f"https://api.example.com/data/{id}/info2",
    f"https://api.example.com/data/{id}/info3",
]

with ThreadPool(3) as pool:
    data1, data2, data3 = pool.map(get_data, urls)


def process_data(*args):
    pass


# Processing the data
process_data(data1, data2, data3)

Я не уверен, просил ли ОП делать это параллельно. Насколько я понимаю, они просто хотят использовать асинхронную операцию для запросов. ОП должен убедиться, что если это используется в каком-либо производственном приложении, перед этим следует принять во внимание, поскольку при этом одновременно используются три потока вместо одного. По сути, если это конечная точка, то для завершения одного вызова API потребуется три ресурса.

Sina Sarshar Pour 22.06.2024 00:46

3 потока не используют в 3 раза больше системных ресурсов. Потоки легкие, а у Python есть GIL, поэтому вам никогда не придется выполнять в три раза больше работы на земле Python. Единственное исключение из этого правила - это когда вы вызываете некоторый код, который не находится на территории Python, и освобождает GIL - т.е. некоторую библиотеку, которая является оболочкой Python для некоторого кода более низкого уровня - т.е. расширение Python, написанное на C/Rust и т. д. В В этом конкретном примере три вещи могут выполняться параллельно, потому что большая часть времени тратится на ожидание ответа HTTP, который освобождает GIL. Ожидание не требует в 3 раза больше ресурсов.

Alex L 24.06.2024 10:17

По моему опыту написания производственного кода, потоки гораздо проще писать (а также поддерживать и рассуждать), чем асинхронные, и они работают так же хорошо. Как только вы напишете одну асинхронную функцию на Python (и на многих других языках, таких как JavaScript и Rust), вам в конечном итоге придется создать кучу других функций, которые вызывают указанные функции, также асинхронные, и асинхронность распространяется по всему вашему приложению. Это может быть приемлемо, или вы можете использовать asyncio.run, но, по моему опыту, потоки проще и легче и обеспечивают очень сопоставимую производительность. ОП сказал «улучшить производительность» - вот что я сделал.

Alex L 24.06.2024 10:22

Как и почти в любой проблеме, существует множество способов добиться схожих результатов, и я предложил только один из них. Люди могут выбирать, хотят ли они пойти по пути асинхронности или потоков, в зависимости от своих требований и ограничений.

Alex L 24.06.2024 10:27

Спасибо @Alex L за информацию. Я не знал, что могу использовать Threadpool для создания нескольких потоков вместо создания нескольких процессов.

Sina Sarshar Pour 25.06.2024 00:31

Ничто не мешает использовать асинхронные системы синхронно, просто нужно await выполнить любые действия, которые они хотят завершить, до следующего запланированного дела.

Основная трудность заключается в обеспечении асинхронной совместимости приложения, что может оказаться значительно более сложной задачей.

Я реализовал окружающую структуру для задачи, которую вы описали ниже. Если вы хотите реализовать это в долгосрочном проекте, вероятно, будет разумно более широко рассмотреть окружающий код и структуру.

import httpx
import asyncio

async def main():
    async with httpx.AsyncClient() as client:
        # Initial request to get the ID
        response = await client.get('https://api.example.com/get_id')  
        id = response.json()['id']
        print('done with sync task')
        
        # Subsequent requests to get data related to the ID
        task1 = asyncio.create_task(client.get(f'https://api.example.com/data/{id}/info1'))
        task2 = asyncio.create_task(client.get(f'https://api.example.com/data/{id}/info2'))
        task3 = asyncio.create_task(client.get(f'https://api.example.com/data/{id}/info3'))
        
        # Only await the tasks after having scheduled them all. With larger task counts asyncio.gather() should be considered.
        data1 = (await task1).json()
        data2 = (await task2).json()
        data3 = (await task3).json()
        print(data1, data2, data3)

asyncio.run(main())

Обратите внимание, что первоначальный вызов API полностью синхронен и может быть выполнен с помощью response = httpx.get('https://api.example.com/get_id') без необходимости ожидания. Учитывая, что мы уже находимся внутри httpxClient, это было бы немного анти-шаблоном.

Я настоятельно рекомендую вам прочитать документацию httpx , они были отличной отправной точкой, когда я впервые познакомился с асинхронным программированием. документация asyncio для справки

Ссылки в вашем коде не работают, Я использовал другой API.

Посмотрите, поможет ли вам это решение.

import asyncio
import aiohttp

async def fetch_data(session, url):
    async with session.get(url) as response:
        return await response.json()

async def main():
    base_url = 'https://swapi.dev/api/'
    async with aiohttp.ClientSession() as session:
        # Synchronous request for basic information
        response = await session.get(base_url + 'people/')
        people = (await response.json()).get("results", [])

        # Asynchronous requests to personal pages
        tasks = [fetch_data(session, person["url"]) for person in people]
        data = await asyncio.gather(*tasks)

        for person in data:
            print(person["name"])

if __name__ == '__main__':
    asyncio.run(main())

Ответ принят как подходящий

Аналогично, но с использованием группы задач (python >= 3.11)

import httpx
import asyncio

async def main():
    async with httpx.AsyncClient() as client:
        response = await client.get('https://api.example.com/get_id')  
        id = response.json()['id']
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(client.get(f'https://api.example.com/data/{id}/info1'))
            task2 = tg.create_task(client.get(f'https://api.example.com/data/{id}/info2'))
            task3 = tg.create_task(client.get(f'https://api.example.com/data/{id}/info3'))
    # Processing the data
    process_data(task1.result(), task2.result(), task3.result())
        

Другие вопросы по теме