Я пытаюсь сделать простой поисковый робот, используя trio
и asks
. Я использую питомник для одновременного запуска нескольких сканеров и канал памяти для ведения списка URL-адресов для посещения.
Каждый поисковый робот получает клоны обоих концов этого канала, поэтому они могут получить URL-адрес (через Receive_channel), прочитать его, найти и добавить новые URL-адреса для посещения (через send_channel).
async def main():
send_channel, receive_channel = trio.open_memory_channel(math.inf)
async with trio.open_nursery() as nursery:
async with send_channel, receive_channel:
nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
async def crawler(send_channel, receive_channel):
async for url in receive_channel: # I'm a consumer!
content = await ...
urls_found = ...
for u in urls_found:
await send_channel.send(u) # I'm a producer too!
В этом сценарии потребители являются производителями. Как изящно все остановить?
Условия отключения всего:
Я пробовал с async with send_channel
внутри crawler()
, но не смог найти хороший способ сделать это. Я также пытался найти какой-то другой подход (какой-то рабочий пул, привязанный к каналу памяти и т. д.), но и здесь не повезло.
спасибо, обязательно прочитаю. Но я знаю, как в целом работает веб-скрапинг, я просто хотел сделать очень правильный и элегантный с помощью трио.
Здесь есть как минимум две проблемы.
Во-первых, это ваше предположение об остановке, когда канал пуст. Поскольку вы выделяете канал памяти размером 0, он всегда будет пустым. Вы можете передать URL-адрес только в том случае, если сканер готов его получить.
Это создает проблему номер два. Если вы когда-нибудь найдете больше URL-адресов, чем вы выделили сканерам, ваше приложение заблокируется.
Причина в том, что, поскольку вы не сможете передать все найденные URL-адреса сканеру, сканер никогда не будет готов получить новый URL-адрес для сканирования, потому что он застрял в ожидании, когда другой сканер возьмет один из его URL-адресов.
Это становится еще хуже, потому что, если один из других поисковых роботов найдет новые URL-адреса, они тоже застрянут позади поискового робота, который уже ожидает передачи своих URL-адресов, и они никогда не смогут взять один из URL-адресов, ожидающих обработки. обработанный.
Соответствующая часть документации:
https://trio.readthedocs.io/en/stable/reference-core.html#buffering-in-channels
Предполагая, что мы исправим это, что делать дальше?
Вероятно, вам нужно вести список (набор?) всех посещенных URL-адресов, чтобы убедиться, что вы не посещаете их снова.
Чтобы действительно выяснить, когда остановиться, вместо того, чтобы закрывать каналы, вероятно, намного проще просто отменить питомник.
Допустим, мы модифицируем основной цикл следующим образом:
async def main():
send_channel, receive_channel = trio.open_memory_channel(math.inf)
active_workers = trio.CapacityLimiter(3) # Number of workers
async with trio.open_nursery() as nursery:
async with send_channel, receive_channel:
nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
while True:
await trio.sleep(1) # Give the workers a chance to start up.
if active_workers.borrowed_tokens == 0 and send_channel.statistics().current_buffer_used == 0:
nursery.cancel_scope.cancel() # All done!
Теперь нам нужно немного модифицировать краулеры, чтобы забирать токен, когда он активен.
async def crawler(active_workers, send_channel, receive_channel):
async for url in receive_channel: # I'm a consumer!
with active_workers:
content = await ...
urls_found = ...
for u in urls_found:
await send_channel.send(u) # I'm a producer too!
Другие вещи, чтобы рассмотреть -
Вы можете использовать send_channel.send_noblock(u)
в сканере. Поскольку у вас есть неограниченный буфер, вероятность возникновения исключения WillBlock исключена, и поведение, при котором триггер контрольной точки не срабатывает при каждой отправке, может оказаться желательным. Таким образом, вы точно знаете, что конкретный URL-адрес полностью обработан и все новые URL-адреса были добавлены, прежде чем другие задачи получат возможность получить новый URL-адрес, или родительская задача получит возможность проверить, выполнена ли работа.
Извините, это я по памяти написал. Изменено на math.inf. Но помимо неправильного размера канала, как изящно закрыть все, когда больше нечего делать? Или я просто пытаюсь использовать неправильный шаблон здесь?
@PawełLis Я добавил возможное решение. Вероятно, есть много способов реализовать что-то подобное.
Это решение, которое я придумал, когда пытался реорганизовать проблему:
async def main():
send_channel, receive_channel = trio.open_memory_channel(math.inf)
limit = trio.CapacityLimiter(3)
async with send_channel:
await send_channel.send(('https://start-url', send_channel.clone()))
#HERE1
async with trio.open_nursery() as nursery:
async for url, send_channel in receive_channel: #HERE3
nursery.start(consumer, url, send_channel, limit)
async def crawler(url, send_channel, limit, task_status):
async with limit, send_channel:
content = await ...
links = ...
for link in links:
await send_channel.send((link, send_channel.clone()))
#HERE2
(я пропускал посещенные URL-адреса)
Здесь нет 3-х долгоживущих потребителей, но есть не более 3-х потребителей, когда для них достаточно работы.
В #HERE1 send_channel закрыт (поскольку он использовался в качестве диспетчера контекста), единственное, что поддерживает канал, — это его клон внутри этого канала.
В #HERE2 клон тоже закрыт (потому что менеджер контекста). Если канал пуст, то этот клон был последним, что поддерживало существование канала. Канал умирает, для концов цикла (#HERE3).
ЕСЛИ не были найдены URL-адреса, и в этом случае они были добавлены в канал вместе с дополнительными клонами send_channel, которые будут поддерживать работу канала достаточно долго для обработки этих URL-адресов.
И это, и решения Андерса Э. Андерсена кажутся мне хакерскими: одно использует sleep
и statistics()
, другое создает клоны send_channel и помещает их в канал... мне кажется, что это программная реализация бутылки Клейна. Я, вероятно, буду искать какие-то другие подходы.
Хм, в #HERE1
канал отправки закрыт, поэтому цикл приема пропускает закрытый канал?
канал действительно закрыт, когда все send_channels закрыты. В #HERE1 исходный send_channel закрывается, но на одну строку выше #HERE1 мы создаем клон, который не будет закрыт.
Глядя на это прямо сейчас, мне, вероятно, не нужно закрывать оригинал и создавать клон (две строки выше #HERE1), я мог бы просто поместить оригинал в канал и позволить первому сканеру закрыть его. #ЗДЕСЬ2
Здесь есть хорошая статья о веб-сканировании — scrapingbee.com/blog/crawling-python