Обновление вызова функции в Python Coroutine

У меня есть простой класс для получения URL-адресов при периодическом обновлении токена доступа с использованием нового цикла в отдельном потоке:

import asyncio
import aiohttp
from threading import Thread
from random import randint

LOOP = asyncio.get_event_loop()

NEW_LOOP = asyncio.new_event_loop()
t = Thread(target=NEW_LOOP.run_forever)
t.start()

class SOME_CLASS(object):
    def __init__(self, loop=LOOP, new_loop=NEW_LOOP):
        self.token = None
        self.loop = loop
        self.new_loop = new_loop
        self.semaphore = asyncio.Semaphore(3)
        self.session = aiohttp.ClientSession(loop=self.loop)

        asyncio.run_coroutine_threadsafe(self.get_token(), self.new_loop)

    def _get_headers(self):
        headers = {
                    'x-access-token': str(self.token),
                    'content-type': 'application/json',
                  }

        return headers

    async def get_token(self):
        while True:
            self.token = randint(1, 100)
            await asyncio.sleep(2)        

    async def fetch(self, url):
        async with self.semaphore:
            headers = self._get_headers()
            async with self.session.get(url, headers=headers) as response:
                await response.read()
                return headers

    async def fetch_all(self):
        urls = ['https://httpbin.org/get?x = {i}'for i in range(1000)]
        futures = asyncio.as_completed([self.fetch(url) for url in urls])
        for future in futures:
            await asyncio.sleep(1)
            headers = await future
            print(f'headers: {headers}  token: {self.token}')

    def run(self):
        self.loop.run_until_complete(self.fetch_all())

if __name__ == '__main__':
    sc = SOME_CLASS()
    sc.run()

Однако я заметил, что, хотя self.token действительно обновляется каждые две секунды, токен, хранящийся в headers, остается неизменным. Кажется, что self._get_headers() вызывается заранее, а не после того, как он получает семафор:

headers: {'x-access-token': '8', 'content-type': 'application/json'}  token: 8
headers: {'x-access-token': '8', 'content-type': 'application/json'}  token: 8
headers: {'x-access-token': '8', 'content-type': 'application/json'}  token: 78
headers: {'x-access-token': '8', 'content-type': 'application/json'}  token: 78
headers: {'x-access-token': '8', 'content-type': 'application/json'}  token: 41
headers: {'x-access-token': '8', 'content-type': 'application/json'}  token: 56
headers: {'x-access-token': '8', 'content-type': 'application/json'}  token: 56
headers: {'x-access-token': '8', 'content-type': 'application/json'}  token: 74
headers: {'x-access-token': '8', 'content-type': 'application/json'}  token: 74
headers: {'x-access-token': '8', 'content-type': 'application/json'}  token: 4
headers: {'x-access-token': '8', 'content-type': 'application/json'}  token: 4
headers: {'x-access-token': '8', 'content-type': 'application/json'}  token: 10
headers: {'x-access-token': '8', 'content-type': 'application/json'}  token: 10
headers: {'x-access-token': '8', 'content-type': 'application/json'}  token: 44
headers: {'x-access-token': '8', 'content-type': 'application/json'}  token: 44
headers: {'x-access-token': '8', 'content-type': 'application/json'}  token: 28
headers: {'x-access-token': '8', 'content-type': 'application/json'}  token: 28

Как мне убедиться, что токен действительно обновляется в headers прямо перед отправкой HTTP-запроса?

Не понимаю, зачем второй цикл. Я думаю, вам следует полностью его опустить и заменить asyncio.run_coroutine_threadsafe(self.get_token()) на простой loop.create_task(self.get_token()). Один из главных преимущества asyncio - это отсутствие необходимости создавать потоки (и иметь дело с их весом для ОС и различными проблемами синхронизации) для подобных вещей.

user4815162342 27.05.2018 08:21

Кажется, вы хотите, чтобы локальная переменная headers автоматически отражала обновленный токен - почему? Вещи выглядят так, как будто они работают отлично, так как они есть: 1) headers извлекаются перед выборкой с использованием текущего токена, 2) токен обновляется каждые две секунды, влияя на новые загрузки. Поскольку вы возвращаете headers, вы получаете «старое» значение (фактически значение, используемое для этой загрузки, что мне кажется правильным) - если вы хотите вернуть новое текущее значение, вы можете вместо этого вернуть self._get_headers().

user4815162342 27.05.2018 08:45

На самом деле, я хочу, чтобы токен обновлялся каждые 2 секунды и чтобы этот новый токен использовался и отправлялся в HTTP-запросе с небольшой задержкой или без нее. Прямо сейчас я не вижу, чтобы токен обновлялся в заголовке. Цель не столько в том, чтобы вернуть заголовок, сколько в том, чтобы убедиться, что обновленный токен используется в HTTP-запросе. Возврат заголовка позволяет мне распечатать то, что отправлено в HTTP-запросе.

slaw 27.05.2018 09:21
Прямо сейчас я не вижу, чтобы токен обновлялся в заголовке. Откуда ты знаешь? Вы печатаете значение header только после загрузки, когда оно совершенно неактуально (потому что следующая загрузка снова вызовет _get_headers). Вы должны распечатать header прямо перед вызовом session.get - если вы тогда видите устаревшее значение, у вас есть причина для тревоги.
user4815162342 27.05.2018 09:43

Например, посмотрите на этот код, который похож на ваш, за исключением того, что материал aiohttp заменен на сон. Вы можете запустить его и увидеть, что изменение токена вступает в силу немедленно. Также я бы посоветовал избавиться от ненужных потоков, как показано здесь.

user4815162342 27.05.2018 14:57

Спасибо! Я был неправ, и это помогло

slaw 28.05.2018 02:15

Что касается цикла событий в отдельном потоке, возможно, мое предложение из одного из ваших предыдущих вопросов было неверным. Вы должны никогда запускать два цикла событий параллельно, это только вносит путаницу. Если у вас уже есть глобальный цикл событий, работающий в фоновом режиме, вы всегда можете использовать его, отправляя ему данные с помощью asyncio.run_coroutine_threadsafe из других потоков. См. Пример эта модификация вашего кода.

user4815162342 28.05.2018 08:22

Хороший совет. Я удалил другой поток и добавил get_token обратно в основной поток через create_task. Раньше я не понимал, что create_task будет выполняться немедленно, и думал, что для этого по-прежнему требуется вызов run_until_complete (что предотвратило бы возврат цикла событий до его завершения). Однако теперь я понимаю, что create_task будет выполняться и немедленно возвращаться, пока вы ждете выполнения задачи. Затем я могу запускать другие задачи блокировки петель с run_until_complete. Ваши комментарии были очень полезными, и я благодарю вас за время и терпение! Я многому научился!

slaw 28.05.2018 22:39
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
8
114
2

Ответы 2

asyncio.sleep(2) должен установить высокий уровень 0,2, поместить отладочную печать в выборку и посмотреть, как она обновляется.

Нет, это не помогает. На выходе значение x-access-token должно соответствовать значению token. Обратите внимание, что тогда значение x-access-token никогда не изменится даже по истечении нескольких минут прошедшего времени. Проблема в том, что self._get_headers не вызывается при получении семафора. Вместо этого кажется, что он вызывается один раз, когда возвращается будущее.

slaw 27.05.2018 03:48

может я не понимаю. Внутри fetch запрос на получение использует правильный токен, теперь проходит время, вы получаете ответ, пока токен меняется, вы ждете fetch_all, еще время проходит, токен все еще изменяется repl.it/repls/UnfitAptGuiltware

tiennes 27.05.2018 04:09

Сон для токена должен быть дольше, чем выборка (в моем реальном случае сон составляет 5 минут, а выборка возвращает результаты в микросекундах). x-access-token слишком долго обновляет свое значение, иногда даже более 10 секунд, в то время как self.token обновляется каждые 2 секунды. Не должно быть 10+ секундной задержки. Если есть, как мне обойти это, не меняя время сна?

slaw 27.05.2018 04:20

У вас есть сложная оболочка вокруг базовой проблемы Python. Переменная self.token - это простое целое число. Значение, хранящееся в словаре headers, представляет собой строковое представление этого простого целого числа. Когда вы переназначаете self.token на другое целое число, программа не может узнать, какие еще объекты вы хотели бы изменить одновременно.

Одно из решений - сделать self.token объектом с внутренним состоянием. На этот объект можно ссылаться в разных местах вашей программы. Когда вы меняете его внутреннее состояние, это, конечно же, немедленно вступает в силу. Вот небольшая программа, иллюстрирующая это, с class Token. Надеюсь, вы сможете адаптировать его к своей более сложной ситуации. Не думаю, что использование сопрограмм влияет на это. Я думаю, что если вы присвоите значение x-access-token параметру self.token, который теперь является экземпляром Token, вы почти всегда сможете найти решение.

import random

# This way doesn't work
class MyClass:
    def __init__(self):
        self.token = random.randint(1, 100)

    def change_token(self):
        self.token = random.randint(1, 100)

c = MyClass()
a = c.token
for _ in range(10):
    c.change_token()
    print(a, c.token)  # a does not change when c.token does

# This works
class Token:
    def __init__(self, n):
        self.token = n

    def __repr__(self):
        return str(self.token)

    def set_token(self, n):
        self.token = n

class MyClass2:
    def __init__(self):
        self.token = Token(random.randint(1, 100))

    def change_token(self):
        self.token.set_token(random.randint(1, 100))

print()
c = MyClass2()
a = c.token
for _ in range(10):
    c.change_token()
    print(a, c.token)  # a and c.token are the same object

Другие вопросы по теме