Python и Trio, где производители являются потребителями, как изящно выйти, когда работа выполнена?

Я пытаюсь сделать простой поисковый робот, используя trio и asks. Я использую питомник для одновременного запуска нескольких сканеров и канал памяти для ведения списка URL-адресов для посещения.

Каждый поисковый робот получает клоны обоих концов этого канала, поэтому они могут получить URL-адрес (через Receive_channel), прочитать его, найти и добавить новые URL-адреса для посещения (через send_channel).

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    async with trio.open_nursery() as nursery:
        async with send_channel, receive_channel:
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())
            nursery.start_soon(crawler, send_channel.clone(), receive_channel.clone())


async def crawler(send_channel, receive_channel):
    async for url in receive_channel:  # I'm a consumer!
        content = await ...
        urls_found = ...
        for u in urls_found:
            await send_channel.send(u)  # I'm a producer too!

В этом сценарии потребители являются производителями. Как изящно все остановить?

Условия отключения всего:

  • канал пуст
  • И
  • все поисковые роботы застряли в первом цикле for, ожидая появления URL-адреса в Receive_channel (что... больше не произойдет)

Я пробовал с async with send_channel внутри crawler(), но не смог найти хороший способ сделать это. Я также пытался найти какой-то другой подход (какой-то рабочий пул, привязанный к каналу памяти и т. д.), но и здесь не повезло.

Здесь есть хорошая статья о веб-сканировании — scrapingbee.com/blog/crawling-python

Anders E. Andersen 15.12.2020 18:42

спасибо, обязательно прочитаю. Но я знаю, как в целом работает веб-скрапинг, я просто хотел сделать очень правильный и элегантный с помощью трио.

Paweł Lis 15.12.2020 18:51
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
2
2
490
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Здесь есть как минимум две проблемы.

Во-первых, это ваше предположение об остановке, когда канал пуст. Поскольку вы выделяете канал памяти размером 0, он всегда будет пустым. Вы можете передать URL-адрес только в том случае, если сканер готов его получить.

Это создает проблему номер два. Если вы когда-нибудь найдете больше URL-адресов, чем вы выделили сканерам, ваше приложение заблокируется.

Причина в том, что, поскольку вы не сможете передать все найденные URL-адреса сканеру, сканер никогда не будет готов получить новый URL-адрес для сканирования, потому что он застрял в ожидании, когда другой сканер возьмет один из его URL-адресов.

Это становится еще хуже, потому что, если один из других поисковых роботов найдет новые URL-адреса, они тоже застрянут позади поискового робота, который уже ожидает передачи своих URL-адресов, и они никогда не смогут взять один из URL-адресов, ожидающих обработки. обработанный.

Соответствующая часть документации:

https://trio.readthedocs.io/en/stable/reference-core.html#buffering-in-channels

Предполагая, что мы исправим это, что делать дальше?

Вероятно, вам нужно вести список (набор?) всех посещенных URL-адресов, чтобы убедиться, что вы не посещаете их снова.

Чтобы действительно выяснить, когда остановиться, вместо того, чтобы закрывать каналы, вероятно, намного проще просто отменить питомник.

Допустим, мы модифицируем основной цикл следующим образом:

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    active_workers = trio.CapacityLimiter(3) # Number of workers
    async with trio.open_nursery() as nursery:
        async with send_channel, receive_channel:
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            nursery.start_soon(crawler, active_workers, send_channel, receive_channel)
            while True:
                await trio.sleep(1) # Give the workers a chance to start up.
                if active_workers.borrowed_tokens == 0 and send_channel.statistics().current_buffer_used == 0:
                    nursery.cancel_scope.cancel() # All done!

Теперь нам нужно немного модифицировать краулеры, чтобы забирать токен, когда он активен.

async def crawler(active_workers, send_channel, receive_channel):
    async for url in receive_channel:  # I'm a consumer!
        with active_workers:
            content = await ...
            urls_found = ...
            for u in urls_found:
                await send_channel.send(u)  # I'm a producer too!

Другие вещи, чтобы рассмотреть -

Вы можете использовать send_channel.send_noblock(u) в сканере. Поскольку у вас есть неограниченный буфер, вероятность возникновения исключения WillBlock исключена, и поведение, при котором триггер контрольной точки не срабатывает при каждой отправке, может оказаться желательным. Таким образом, вы точно знаете, что конкретный URL-адрес полностью обработан и все новые URL-адреса были добавлены, прежде чем другие задачи получат возможность получить новый URL-адрес, или родительская задача получит возможность проверить, выполнена ли работа.

Извините, это я по памяти написал. Изменено на math.inf. Но помимо неправильного размера канала, как изящно закрыть все, когда больше нечего делать? Или я просто пытаюсь использовать неправильный шаблон здесь?

Paweł Lis 15.12.2020 18:56

@PawełLis Я добавил возможное решение. Вероятно, есть много способов реализовать что-то подобное.

Anders E. Andersen 15.12.2020 21:13

Это решение, которое я придумал, когда пытался реорганизовать проблему:

async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
 
    limit = trio.CapacityLimiter(3)

    async with send_channel:
        await send_channel.send(('https://start-url', send_channel.clone()))
    #HERE1

    async with trio.open_nursery() as nursery:
        async for url, send_channel in receive_channel:  #HERE3
            nursery.start(consumer, url, send_channel, limit)

async def crawler(url, send_channel, limit, task_status):
    async with limit, send_channel:
        content = await ...
        links = ...
        for link in links:
            await send_channel.send((link, send_channel.clone()))
    #HERE2

(я пропускал посещенные URL-адреса)

Здесь нет 3-х долгоживущих потребителей, но есть не более 3-х потребителей, когда для них достаточно работы.

В #HERE1 send_channel закрыт (поскольку он использовался в качестве диспетчера контекста), единственное, что поддерживает канал, — это его клон внутри этого канала.

В #HERE2 клон тоже закрыт (потому что менеджер контекста). Если канал пуст, то этот клон был последним, что поддерживало существование канала. Канал умирает, для концов цикла (#HERE3).

ЕСЛИ не были найдены URL-адреса, и в этом случае они были добавлены в канал вместе с дополнительными клонами send_channel, которые будут поддерживать работу канала достаточно долго для обработки этих URL-адресов.

И это, и решения Андерса Э. Андерсена кажутся мне хакерскими: одно использует sleep и statistics(), другое создает клоны send_channel и помещает их в канал... мне кажется, что это программная реализация бутылки Клейна. Я, вероятно, буду искать какие-то другие подходы.

Хм, в #HERE1 канал отправки закрыт, поэтому цикл приема пропускает закрытый канал?

Matthias Urlichs 16.12.2020 16:00

канал действительно закрыт, когда все send_channels закрыты. В #HERE1 исходный send_channel закрывается, но на одну строку выше #HERE1 мы создаем клон, который не будет закрыт.

Paweł Lis 16.12.2020 17:01

Глядя на это прямо сейчас, мне, вероятно, не нужно закрывать оригинал и создавать клон (две строки выше #HERE1), я мог бы просто поместить оригинал в канал и позволить первому сканеру закрыть его. #ЗДЕСЬ2

Paweł Lis 16.12.2020 17:03

Другие вопросы по теме