Почему мой код истекает с помощью ThreadPoolExecutor, но не с обычными потоками

Пытаюсь решить https://leetcode.com/problems/web-crawler-multithreaded/

Этот код работает (ну, по крайней мере, для игрушечных тестовых случаев, прежде чем в конечном итоге использовать TLE)

from collections import deque
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor
from threading import Lock, Thread
import queue
import time
class Solution:
    def __init__(self):
        self.visited = set()
        self.frontier = queue.Queue()
        self.visitLock = Lock()
        
    def threadCrawler(self, htmlParser):
        while True:
            nextUrl = self.frontier.get()
            urls = htmlParser.getUrls(nextUrl)
            with self.visitLock:
                self.visited.add(nextUrl)
            host = urlparse(nextUrl).hostname
            urls = list(filter(lambda x: urlparse(x).hostname == host, urls))
            with self.visitLock:
                urls = list(filter(lambda x: x not in self.visited, urls))
            for url in urls:
                self.frontier.put(url)
            self.frontier.task_done()

    def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
        self.frontier.put(startUrl)
        n = 10
        for i in range(n):
            Thread(target=self.threadCrawler, args=(htmlParser,), daemon=True).start()
        self.frontier.join()
        return self.visited

Но этот код, использующий ThreadPoolExecutor, не работает — в игрушечных примерах время ожидания истекает даже с одним потоком.

from collections import deque
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor
from threading import Lock, Thread
import queue
import time
class Solution:
    def __init__(self):
        self.visited = set()
        self.frontier = queue.Queue()
        self.visitLock = Lock()
        
    def threadCrawler(self, htmlParser):
        while True:
            nextUrl = self.frontier.get()
            urls = htmlParser.getUrls(nextUrl)
            with self.visitLock:
                self.visited.add(nextUrl)
            host = urlparse(nextUrl).hostname
            urls = list(filter(lambda x: urlparse(x).hostname == host, urls))
            with self.visitLock:
                urls = list(filter(lambda x: x not in self.visited, urls))
            for url in urls:
                self.frontier.put(url)
            self.frontier.task_done()

    def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
        self.frontier.put(startUrl)
        n = 1
        executor = ThreadPoolExecutor(max_workers=n)
        for i in range(n):
            executor.submit(self.threadCrawler,htmlParser, daemon=True)
        self.frontier.join()
        return self.visited

Даже когда я удаляю параметр демона, сохраняю фьючерсы и проверяю их результат, все равно получается TLE.

 def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
        self.frontier.put(startUrl)
        n = 1
        executor = ThreadPoolExecutor(max_workers=n)
        futures = []
        for i in range(n):
            futures.append(executor.submit(self.threadCrawler,htmlParser))
        self.frontier.join()
        for i in range(n):
            print(futures[i].result())
        return self.visited

Я не смог найти в документации никаких намеков на то, что ThreadPoolExecutor.submit поддерживает параметр daemon. Вызов должен завершиться ошибкой, которая подавляется, поскольку вы не сохраняете и не проверяете объект Future, возвращаемый submit.

Michael Butscher 29.06.2024 21:29

Я попытался удалить параметр демона и сохранить/проверить будущий результат (отредактированный вопрос), но TLE остается

user1114 30.06.2024 20:02
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
2
68
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы не используете версию вашего кода concurrent.futures с правильным шаблоном: вы запускаете серверную функцию «выполняться вечно», которая берет целевые задачи из очереди, как и должна быть какая-то неуправляемая функция потоковой обработки.

Функции, подобные будущему, должны иметь ограниченное время жизни и возвращать только свой результат, а не шаблон while True:\n target = queue.get(). (Это на самом деле происходит, но очередь, вызываемые параметры и выборка и отправка параметров являются внутренним кодом исполнителя, поэтому не нужно беспокоиться).

Поскольку ваш Future не предназначен для того, чтобы когда-либо возвращаться, вполне естественно, что время его ожидания истекает!

Теперь идем дальше: поскольку рабочая функция отвечает за поиск новых целей задач (сканируемых URL-адресов), возможно, для этого примера более естественно просто использовать threading.Thread и управлять очередью в своем собственном коде: потому что concurrent.Futures не спроектирован таким образом, чтобы одна задача могла напрямую публиковать другие задачи.

В этой модели задача должна запускаться один раз, получать результаты и возвращать их обратно, а основной поток — это та часть, которая должна создавать новые задачи для полученных URL-адресов, если это необходимо. Это можно сделать с помощью некоторого рефакторинга -

Вот переработанный пример с использованием шаблона concurrent.futures. Я попытался немного упростить код, извлекая только один элемент из as_completed за раз. (но я не могу это проверить, так как у меня нет вашего класса HtmlParser, который также выполняет фактический ввод-вывод или образец сайта, чтобы его попробовать)


from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

class Solution:
    def __init__(self):
        self.visited = set()
        
    def threadCrawler(self, htmlParser, next_url):
        urls = htmlParser.getUrls(next_url)
        host = urlparse(nextUrl).hostname
        return set(filter(lambda x: urlparse(x).hostname == host, urls))

    def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
        #self.frontier.put(startUrl)
        n = 1 # or whatever number
        
        visited = {startUrl}
        with ThreadPoolExecutor(max_workers=n) as executor:
            tasks = set()
            next_urls = {startUrl}
            while tasks or next_urls:
                for url in next_urls:
                    # create as many new tasks as the new_urls fetched by the last completed task:
                    tasks.add(executor.submit(self.threadCrawler, htmlParser, url))
                
                # fetch a single, completed task
                completed = next(as_completed(tasks))
                tasks.remove(completed)
                urls = completed.result()  # Do this inside a try/except if the task code may raise any  exceptions
                next_urls = urls - visited # get the new_urls fetched by the task
                visited.update(urls)  # update the found URLs 
                
        self.visited = visited 
        return self.visited
            

Итак, обратите внимание, что интересующая изменяемая структура (self.visited) манипулируется только в основном потоке, а передача сообщений в потоки управляется исполнителем, поэтому код пользователя (наш код) не нуждается ни в чем из этого. . В результате в этом примере метод сканера снизился с 11 до 3 LoC (и ваш исходный код даже несколько неправильно использовал блокировки).

Эй, спасибо за подробный ответ. Два вопроса - откуда исполнитель узнает, что результат хранится в переменной urls? Как мой первоначальный ответ неправильно обрабатывает блокировки?

user1114 02.07.2024 03:07

«Как исполнитель узнает, что результат хранится в переменной urls?» - извините, моя вина: там отсутствует утверждение return - сейчас исправлю.

jsbueno 02.07.2024 14:07

Второй взгляд на замки: извините, с ними все в порядке. Просто я бы устроил тогда по-другому. Наверное. Я беспокоился, что блокировка self.visited.add не работает, и, просматривая сейчас, я понял ваше намерение.

jsbueno 02.07.2024 14:11

Другие вопросы по теме