Пытаюсь решить https://leetcode.com/problems/web-crawler-multithreaded/
Этот код работает (ну, по крайней мере, для игрушечных тестовых случаев, прежде чем в конечном итоге использовать TLE)
from collections import deque
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor
from threading import Lock, Thread
import queue
import time
class Solution:
def __init__(self):
self.visited = set()
self.frontier = queue.Queue()
self.visitLock = Lock()
def threadCrawler(self, htmlParser):
while True:
nextUrl = self.frontier.get()
urls = htmlParser.getUrls(nextUrl)
with self.visitLock:
self.visited.add(nextUrl)
host = urlparse(nextUrl).hostname
urls = list(filter(lambda x: urlparse(x).hostname == host, urls))
with self.visitLock:
urls = list(filter(lambda x: x not in self.visited, urls))
for url in urls:
self.frontier.put(url)
self.frontier.task_done()
def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
self.frontier.put(startUrl)
n = 10
for i in range(n):
Thread(target=self.threadCrawler, args=(htmlParser,), daemon=True).start()
self.frontier.join()
return self.visited
Но этот код, использующий ThreadPoolExecutor, не работает — в игрушечных примерах время ожидания истекает даже с одним потоком.
from collections import deque
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor
from threading import Lock, Thread
import queue
import time
class Solution:
def __init__(self):
self.visited = set()
self.frontier = queue.Queue()
self.visitLock = Lock()
def threadCrawler(self, htmlParser):
while True:
nextUrl = self.frontier.get()
urls = htmlParser.getUrls(nextUrl)
with self.visitLock:
self.visited.add(nextUrl)
host = urlparse(nextUrl).hostname
urls = list(filter(lambda x: urlparse(x).hostname == host, urls))
with self.visitLock:
urls = list(filter(lambda x: x not in self.visited, urls))
for url in urls:
self.frontier.put(url)
self.frontier.task_done()
def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
self.frontier.put(startUrl)
n = 1
executor = ThreadPoolExecutor(max_workers=n)
for i in range(n):
executor.submit(self.threadCrawler,htmlParser, daemon=True)
self.frontier.join()
return self.visited
Даже когда я удаляю параметр демона, сохраняю фьючерсы и проверяю их результат, все равно получается TLE.
def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
self.frontier.put(startUrl)
n = 1
executor = ThreadPoolExecutor(max_workers=n)
futures = []
for i in range(n):
futures.append(executor.submit(self.threadCrawler,htmlParser))
self.frontier.join()
for i in range(n):
print(futures[i].result())
return self.visited
Я попытался удалить параметр демона и сохранить/проверить будущий результат (отредактированный вопрос), но TLE остается
Вы не используете версию вашего кода concurrent.futures с правильным шаблоном: вы запускаете серверную функцию «выполняться вечно», которая берет целевые задачи из очереди, как и должна быть какая-то неуправляемая функция потоковой обработки.
Функции, подобные будущему, должны иметь ограниченное время жизни и возвращать только свой результат, а не шаблон while True:\n target = queue.get()
.
(Это на самом деле происходит, но очередь, вызываемые параметры и выборка и отправка параметров являются внутренним кодом исполнителя, поэтому не нужно беспокоиться).
Поскольку ваш Future
не предназначен для того, чтобы когда-либо возвращаться, вполне естественно, что время его ожидания истекает!
Теперь идем дальше: поскольку рабочая функция отвечает за поиск новых целей задач (сканируемых URL-адресов), возможно, для этого примера более естественно просто использовать threading.Thread и управлять очередью в своем собственном коде: потому что concurrent.Futures
не спроектирован таким образом, чтобы одна задача могла напрямую публиковать другие задачи.
В этой модели задача должна запускаться один раз, получать результаты и возвращать их обратно, а основной поток — это та часть, которая должна создавать новые задачи для полученных URL-адресов, если это необходимо. Это можно сделать с помощью некоторого рефакторинга -
Вот переработанный пример с использованием шаблона concurrent.futures. Я попытался немного упростить код, извлекая только один элемент из as_completed
за раз. (но я не могу это проверить, так как у меня нет вашего класса HtmlParser, который также выполняет фактический ввод-вывод или образец сайта, чтобы его попробовать)
from urllib.parse import urljoin, urlparse
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
class Solution:
def __init__(self):
self.visited = set()
def threadCrawler(self, htmlParser, next_url):
urls = htmlParser.getUrls(next_url)
host = urlparse(nextUrl).hostname
return set(filter(lambda x: urlparse(x).hostname == host, urls))
def crawl(self, startUrl: str, htmlParser: 'HtmlParser') -> List[str]:
#self.frontier.put(startUrl)
n = 1 # or whatever number
visited = {startUrl}
with ThreadPoolExecutor(max_workers=n) as executor:
tasks = set()
next_urls = {startUrl}
while tasks or next_urls:
for url in next_urls:
# create as many new tasks as the new_urls fetched by the last completed task:
tasks.add(executor.submit(self.threadCrawler, htmlParser, url))
# fetch a single, completed task
completed = next(as_completed(tasks))
tasks.remove(completed)
urls = completed.result() # Do this inside a try/except if the task code may raise any exceptions
next_urls = urls - visited # get the new_urls fetched by the task
visited.update(urls) # update the found URLs
self.visited = visited
return self.visited
Итак, обратите внимание, что интересующая изменяемая структура (self.visited) манипулируется только в основном потоке, а передача сообщений в потоки управляется исполнителем, поэтому код пользователя (наш код) не нуждается ни в чем из этого. . В результате в этом примере метод сканера снизился с 11 до 3 LoC (и ваш исходный код даже несколько неправильно использовал блокировки).
Эй, спасибо за подробный ответ. Два вопроса - откуда исполнитель узнает, что результат хранится в переменной urls? Как мой первоначальный ответ неправильно обрабатывает блокировки?
«Как исполнитель узнает, что результат хранится в переменной urls?» - извините, моя вина: там отсутствует утверждение return
- сейчас исправлю.
Второй взгляд на замки: извините, с ними все в порядке. Просто я бы устроил тогда по-другому. Наверное. Я беспокоился, что блокировка self.visited.add
не работает, и, просматривая сейчас, я понял ваше намерение.
Я не смог найти в документации никаких намеков на то, что
ThreadPoolExecutor.submit
поддерживает параметрdaemon
. Вызов должен завершиться ошибкой, которая подавляется, поскольку вы не сохраняете и не проверяете объектFuture
, возвращаемыйsubmit
.