Я использую aiohttp с методом limited_as_completed для ускорения удаления (около 100 миллионов статических страниц веб-сайта). Однако код останавливается через несколько минут и возвращает ошибку TimeoutError. Я пробовал несколько вещей, но все равно не смог предотвратить поднять asyncio.TimeoutError. Могу я спросить, как я могу проигнорировать ошибку и продолжить?
Код, который я использую:
N=123
import html
from lxml import etree
import requests
import asyncio
import aiohttp
from aiohttp import ClientSession, TCPConnector
import pandas as pd
import re
import csv
import time
from itertools import islice
import sys
from contextlib import suppress
start = time.time()
data = {}
data['name'] = []
filename = "C:\\Users\\xxxx"+ str(N) + ".csv"
def limited_as_completed(coros, limit):
futures = [
asyncio.ensure_future(c)
for c in islice(coros, 0, limit)
]
async def first_to_finish():
while True:
await asyncio.sleep(0)
for f in futures:
if f.done():
futures.remove(f)
try:
newf = next(coros)
futures.append(
asyncio.ensure_future(newf))
except StopIteration as e:
pass
return f.result()
while len(futures) > 0:
yield first_to_finish()
async def get_info_byid(i, url, session):
async with session.get(url,timeout=20) as resp:
print(url)
with suppress(asyncio.TimeoutError):
r = await resp.text()
name = etree.HTML(r).xpath('//h2[starts-with(text(),"Customer Name")]/text()')
data['name'].append(name)
dataframe = pd.DataFrame(data)
dataframe.to_csv(filename, index=False, sep='|')
limit = 1000
async def print_when_done(tasks):
for res in limited_as_completed(tasks, limit):
await res
url = "http://xxx.{}.html"
loop = asyncio.get_event_loop()
async def main():
connector = TCPConnector(limit=10)
async with ClientSession(connector=connector,headers=headers,raise_for_status=False) as session:
coros = (get_info_byid(i, url.format(i), session) for i in range(N,N+1000000))
await print_when_done(coros)
loop.run_until_complete(main())
loop.close()
print("took", time.time() - start, "seconds.")
Журнал ошибок:
Traceback (most recent call last):
File "C:\Users\xxx.py", line 111, in <module>
loop.run_until_complete(main())
File "C:\Users\xx\AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 573, in run_until_complete
return future.result()
File "C:\Users\xxx.py", line 109, in main
await print_when_done(coros)
File "C:\Users\xxx.py", line 98, in print_when_done
await res
File "C:\Users\xxx.py", line 60, in first_to_finish
return f.result()
File "C:\Users\xxx.py", line 65, in get_info_byid
async with session.get(url,timeout=20) as resp:
File "C:\Users\xx\AppData\Local\Programs\Python\Python37-32\lib\site-packages\aiohttp\client.py", line 855, in __aenter__
self._resp = await self._coro
File "C:\Users\xx\AppData\Local\Programs\Python\Python37-32\lib\site-packages\aiohttp\client.py", line 391, in _request
await resp.start(conn)
File "C:\Users\xx\AppData\Local\Programs\Python\Python37-32\lib\site-packages\aiohttp\client_reqrep.py", line 770, in start
self._continue = None
File "C:\Users\xx\AppData\Local\Programs\Python\Python37-32\lib\site-packages\aiohttp\helpers.py", line 673, in __exit__
raise asyncio.TimeoutError from None
concurrent.futures._base.TimeoutError
я пытался 1) добавить expect asyncio.TimeoutError: pass. Не работает
async def get_info_byid(i, url, session):
async with session.get(url,timeout=20) as resp:
print(url)
try:
r = await resp.text()
name = etree.HTML(r).xpath('//h2[starts-with(text(),"Customer Name")]/text()')
data['name'].append(name)
dataframe = pd.DataFrame(data)
dataframe.to_csv(filename, index=False, sep='|')
except asyncio.TimeoutError:
pass
2) подавить (asyncio.TimeoutError), как показано выше. Не работает
Я только вчера изучил aiohttp, так что, может быть, в моем коде есть другие ошибки, которые вызывают ошибку тайм-аута только через несколько минут работы? Большое спасибо, если кто знает, как с этим бороться!
@YuriiKramarenko Большое спасибо! Это сработало для некоторых ошибок, но через некоторое время у asyncio продолжают появляться другие новые ошибки, такие как проблемы с памятью. Мне может потребоваться либо вернуться к запросам, либо использовать пакет для вызова py для небольших диапазонов, чтобы я мог передать память или случайные ошибки
Я думаю, что лучшее решение - это написать простую оболочку для session.get()
, которая позволит вам делать запросы без диспетчера контекста и использовать его со сборкой.
Добавить небольшой пример к ответам
Простой пример (не очень хорошо, но отлично работает):
import asyncio
from aiohttp.client import ClientSession
class Wrapper:
def __init__(self, session):
self._session = session
async def get(self, url):
try:
async with self._session.get(url, timeout=20) as resp:
return await resp.text()
except Exception as e:
print(e)
loop = asyncio.get_event_loop()
wrapper = Wrapper(ClientSession())
responses = loop.run_until_complete(
asyncio.gather(
wrapper.get('http://google.com'),
wrapper.get('http://google.com'),
wrapper.get('http://google.com'),
wrapper.get('http://google.com'),
wrapper.get('http://google.com')
)
)
print(responses)
Еще раз спасибо за вашу помощь. Мне потребовалось некоторое время, чтобы понять, как включить оболочку в свой фреймворк. (Возможно, я ошибаюсь, но) Кажется, что оболочка запрашивает у Python запись в файл csv только после того, как будет выполнена сборка парсинга. Однако время написания здесь не ограничивает. Кажется, что сервер веб-сайта может обрабатывать только ограниченный запрос каждую минуту, и если я разрешаю одновременно более 2 запросов (лимиты> 2), часто будет появляться ошибка 503.
то, что сделал @Yurii Kramarenko, наверняка поднимет Исключение незакрытой клиентской сессии, поскольку сессия никогда не закрывалась должным образом. Я рекомендую вот что:
import asyncio
import aiohttp
async def main(urls):
async with aiohttp.ClientSession(timeout=self.timeout) as session:
tasks=[self.do_something(session,url) for url in urls]
await asyncio.gather(*tasks)
Мне нравится ответ @jbxiaoyu, но для тайм-аута kwarg, похоже, требуется специальный объект, поэтому я подумал, что добавлю, что вам нужно создать объект ClientTimeout, а затем передать его сеансу, например:
from aiohttp import ClientSession, ClientTimeout
timeout = ClientTimeout(total=600)
async with ClientSession(timeout=timeout) as session:
tasks=[self.do_something(session,url) for url in urls]
await asyncio.gather(*tasks)
Попробуйте обернуть
async with session.get(url,timeout=20) as resp:
вtry except
.