У меня есть следующий код, который выполняется каждый раз для разного количества потоков:
class Worker<T> {
public void process() {
System.out.pritnln("Test");
}
}
class Processor {
private void doStuff(final Collection<Worker<V>> col) {
final int size = col.size();
if (size > 0) {
final ExecutorService threads = Executors.newFixedThreadPool(col.size());
for (Worker<V> w : col) {
threads.submit(() -> w.process());
}
threads.shutdown();
}
}
}
Который печатается каждый раз в новом идентификаторе опросов:
(pool-66-thread-1) Test
(pool-66-thread-2) Test
(pool-67-thread-1) Test
(pool-68-thread-1) Test
(pool-68-thread-3) Test
(pool-68-thread-2) Test
Интересно, это обычное поведение, или в какой-то момент происходит утечка памяти, и он взорвется. Разве он не должен повторно использовать предыдущий pools
?
Как я вижу, те предыдущие пулы уже shutdown
благодаря звонку threads.shutdown()
I wonder if this is the common behaviour, or at some point there is memory leak and will explode. Shouldn't it reuse previous pools?
Ну, вы постоянно создаете новые пулы.
// this creates a new pool
final ExecutorService threads = Executors.newFixedThreadPool(col.size());
Что касается утечек памяти, так как вы закрываете свои пулы, это должно быть нормально (но вы должны делать это finally
, чтобы быть в безопасности, если есть исключения).
Если вы хотите повторно использовать пул (что имеет смысл), вы должны сделать threads
переменной экземпляра вашего Processor
(и убедиться, что Processor implements AutoCloseable
и вы отключили threads
в методе закрытия).
Спасибо за ваш ответ @Thilo, проблема с повторным использованием пулов заключается в том, что каждый раз, когда я его выполняю, мне может понадобиться разное количество потоков, иногда достаточно 1 потока, может быть, позже мне понадобится 5, а в какой-то момент стресса мне может понадобиться иметь 3 тысячи потоков. Я не нашел хорошей идеи держать в памяти огромный пул потоков в 3 тысячи потоков, а то и больше, так как не могу толком определить количество потоков. (Это исходит от Кафки, получившей события с нескольких устройств)
Привет @Mayday. Похоже, вы можете захотеть заглянуть в ThreadPoolExecutor, где вы можете указать размер ядра (минимальное количество активных потоков) и время поддержания активности (как долго поток простаивает до завершения). docs.oracle.com/javase/7/docs/api/java/util/concurrent/…
Также маловероятно, что вы действительно сможете получить что-то от 3000 потоков. Определите разумную сумму на основе рабочей нагрузки и мощности компьютера, а затем соответствующим образом определите размер пула потоков.
Вы вызываете Executors.newFixedThreadPool()
в своей функции, которая создает новый пул потоков.
Это не неправильно как таковой, но это редкость и противоречит многим мотивам использования пулов потоков. Более стандартным подходом было бы создание единого пула потоков в вашем классе Processor
и отправка ему заданий как таковых:
class Processor {
private final ExecutorService service = Executors.newFixedThreadExecutor(count);
private void doStuff() {
...
...
service.submit(() -> w.process());
}
}
Спасибо за ваш ответ @cameron1024. Пожалуйста, не стесняйтесь проверить комментарий, который я разместил в другом ответе, поэтому я не повторяю его здесь :)
Я прочитал ваш комментарий, вы должны создать его с максимальным числом, которое вы ожидаете. Они будут потреблять ограниченные ресурсы, когда они не работают, особенно в современных системах. Помните, что преждевременная оптимизация — это зло. Сначала напишите разумный код, а затем оптимизируйте только в случае очевидных проблем с производительностью.
Идея наличия пула потоков состоит в том, чтобы ограничить количество создаваемых потоков и повторно использовать уже созданные потоки, чтобы избежать ненужной перегрузки, обрабатывающей тонны потоков в данный момент времени, и сделать его более эффективным. Теперь проблема с вашим кодом заключается в том, что он не ограничивает количество потоков и не использует их повторно. Поскольку вы выполняете только некоторую печать, вы можете не увидеть перегрузку, но если вы добавите больше обработки в свой рабочий процесс и продолжите вызывать его параллельно, вы увидите резкую задержку в выполнении обработки.
Почему вы удивляетесь, когда функция с именем "new...ThreadPool" создает новый пул потоков?