У меня есть простой класс для получения URL-адресов при периодическом обновлении токена доступа с использованием нового цикла в отдельном потоке:
import asyncio
import aiohttp
from threading import Thread
from random import randint
LOOP = asyncio.get_event_loop()
NEW_LOOP = asyncio.new_event_loop()
t = Thread(target=NEW_LOOP.run_forever)
t.start()
class SOME_CLASS(object):
def __init__(self, loop=LOOP, new_loop=NEW_LOOP):
self.token = None
self.loop = loop
self.new_loop = new_loop
self.semaphore = asyncio.Semaphore(3)
self.session = aiohttp.ClientSession(loop=self.loop)
asyncio.run_coroutine_threadsafe(self.get_token(), self.new_loop)
def _get_headers(self):
headers = {
'x-access-token': str(self.token),
'content-type': 'application/json',
}
return headers
async def get_token(self):
while True:
self.token = randint(1, 100)
await asyncio.sleep(2)
async def fetch(self, url):
async with self.semaphore:
headers = self._get_headers()
async with self.session.get(url, headers=headers) as response:
await response.read()
return headers
async def fetch_all(self):
urls = ['https://httpbin.org/get?x = {i}'for i in range(1000)]
futures = asyncio.as_completed([self.fetch(url) for url in urls])
for future in futures:
await asyncio.sleep(1)
headers = await future
print(f'headers: {headers} token: {self.token}')
def run(self):
self.loop.run_until_complete(self.fetch_all())
if __name__ == '__main__':
sc = SOME_CLASS()
sc.run()
Однако я заметил, что, хотя self.token действительно обновляется каждые две секунды, токен, хранящийся в headers, остается неизменным. Кажется, что self._get_headers() вызывается заранее, а не после того, как он получает семафор:
headers: {'x-access-token': '8', 'content-type': 'application/json'} token: 8
headers: {'x-access-token': '8', 'content-type': 'application/json'} token: 8
headers: {'x-access-token': '8', 'content-type': 'application/json'} token: 78
headers: {'x-access-token': '8', 'content-type': 'application/json'} token: 78
headers: {'x-access-token': '8', 'content-type': 'application/json'} token: 41
headers: {'x-access-token': '8', 'content-type': 'application/json'} token: 56
headers: {'x-access-token': '8', 'content-type': 'application/json'} token: 56
headers: {'x-access-token': '8', 'content-type': 'application/json'} token: 74
headers: {'x-access-token': '8', 'content-type': 'application/json'} token: 74
headers: {'x-access-token': '8', 'content-type': 'application/json'} token: 4
headers: {'x-access-token': '8', 'content-type': 'application/json'} token: 4
headers: {'x-access-token': '8', 'content-type': 'application/json'} token: 10
headers: {'x-access-token': '8', 'content-type': 'application/json'} token: 10
headers: {'x-access-token': '8', 'content-type': 'application/json'} token: 44
headers: {'x-access-token': '8', 'content-type': 'application/json'} token: 44
headers: {'x-access-token': '8', 'content-type': 'application/json'} token: 28
headers: {'x-access-token': '8', 'content-type': 'application/json'} token: 28
Как мне убедиться, что токен действительно обновляется в headers прямо перед отправкой HTTP-запроса?
Кажется, вы хотите, чтобы локальная переменная headers автоматически отражала обновленный токен - почему? Вещи выглядят так, как будто они работают отлично, так как они есть: 1) headers извлекаются перед выборкой с использованием текущего токена, 2) токен обновляется каждые две секунды, влияя на новые загрузки. Поскольку вы возвращаете headers, вы получаете «старое» значение (фактически значение, используемое для этой загрузки, что мне кажется правильным) - если вы хотите вернуть новое текущее значение, вы можете вместо этого вернуть self._get_headers().
На самом деле, я хочу, чтобы токен обновлялся каждые 2 секунды и чтобы этот новый токен использовался и отправлялся в HTTP-запросе с небольшой задержкой или без нее. Прямо сейчас я не вижу, чтобы токен обновлялся в заголовке. Цель не столько в том, чтобы вернуть заголовок, сколько в том, чтобы убедиться, что обновленный токен используется в HTTP-запросе. Возврат заголовка позволяет мне распечатать то, что отправлено в HTTP-запросе.
header только после загрузки, когда оно совершенно неактуально (потому что следующая загрузка снова вызовет _get_headers). Вы должны распечатать header прямо перед вызовом session.get - если вы тогда видите устаревшее значение, у вас есть причина для тревоги.
Например, посмотрите на этот код, который похож на ваш, за исключением того, что материал aiohttp заменен на сон. Вы можете запустить его и увидеть, что изменение токена вступает в силу немедленно. Также я бы посоветовал избавиться от ненужных потоков, как показано здесь.
Спасибо! Я был неправ, и это помогло
Что касается цикла событий в отдельном потоке, возможно, мое предложение из одного из ваших предыдущих вопросов было неверным. Вы должны никогда запускать два цикла событий параллельно, это только вносит путаницу. Если у вас уже есть глобальный цикл событий, работающий в фоновом режиме, вы всегда можете использовать его, отправляя ему данные с помощью asyncio.run_coroutine_threadsafe из других потоков. См. Пример эта модификация вашего кода.
Хороший совет. Я удалил другой поток и добавил get_token обратно в основной поток через create_task. Раньше я не понимал, что create_task будет выполняться немедленно, и думал, что для этого по-прежнему требуется вызов run_until_complete (что предотвратило бы возврат цикла событий до его завершения). Однако теперь я понимаю, что create_task будет выполняться и немедленно возвращаться, пока вы ждете выполнения задачи. Затем я могу запускать другие задачи блокировки петель с run_until_complete. Ваши комментарии были очень полезными, и я благодарю вас за время и терпение! Я многому научился!






asyncio.sleep(2) должен установить высокий уровень 0,2, поместить отладочную печать в выборку и посмотреть, как она обновляется.
Нет, это не помогает. На выходе значение x-access-token должно соответствовать значению token. Обратите внимание, что тогда значение x-access-token никогда не изменится даже по истечении нескольких минут прошедшего времени. Проблема в том, что self._get_headers не вызывается при получении семафора. Вместо этого кажется, что он вызывается один раз, когда возвращается будущее.
может я не понимаю. Внутри fetch запрос на получение использует правильный токен, теперь проходит время, вы получаете ответ, пока токен меняется, вы ждете fetch_all, еще время проходит, токен все еще изменяется repl.it/repls/UnfitAptGuiltware
Сон для токена должен быть дольше, чем выборка (в моем реальном случае сон составляет 5 минут, а выборка возвращает результаты в микросекундах). x-access-token слишком долго обновляет свое значение, иногда даже более 10 секунд, в то время как self.token обновляется каждые 2 секунды. Не должно быть 10+ секундной задержки. Если есть, как мне обойти это, не меняя время сна?
У вас есть сложная оболочка вокруг базовой проблемы Python. Переменная self.token - это простое целое число. Значение, хранящееся в словаре headers, представляет собой строковое представление этого простого целого числа. Когда вы переназначаете self.token на другое целое число, программа не может узнать, какие еще объекты вы хотели бы изменить одновременно.
Одно из решений - сделать self.token объектом с внутренним состоянием. На этот объект можно ссылаться в разных местах вашей программы. Когда вы меняете его внутреннее состояние, это, конечно же, немедленно вступает в силу. Вот небольшая программа, иллюстрирующая это, с class Token. Надеюсь, вы сможете адаптировать его к своей более сложной ситуации. Не думаю, что использование сопрограмм влияет на это. Я думаю, что если вы присвоите значение x-access-token параметру self.token, который теперь является экземпляром Token, вы почти всегда сможете найти решение.
import random
# This way doesn't work
class MyClass:
def __init__(self):
self.token = random.randint(1, 100)
def change_token(self):
self.token = random.randint(1, 100)
c = MyClass()
a = c.token
for _ in range(10):
c.change_token()
print(a, c.token) # a does not change when c.token does
# This works
class Token:
def __init__(self, n):
self.token = n
def __repr__(self):
return str(self.token)
def set_token(self, n):
self.token = n
class MyClass2:
def __init__(self):
self.token = Token(random.randint(1, 100))
def change_token(self):
self.token.set_token(random.randint(1, 100))
print()
c = MyClass2()
a = c.token
for _ in range(10):
c.change_token()
print(a, c.token) # a and c.token are the same object
Не понимаю, зачем второй цикл. Я думаю, вам следует полностью его опустить и заменить
asyncio.run_coroutine_threadsafe(self.get_token())на простойloop.create_task(self.get_token()). Один из главных преимущества asyncio - это отсутствие необходимости создавать потоки (и иметь дело с их весом для ОС и различными проблемами синхронизации) для подобных вещей.