У меня проблема, которая мучает меня уже несколько дней
У меня есть FixedThreadPool с фиксированным размером (для простоты предположим 10), которому я назначаю 275 Runnable:
Logging.log("----- Thread Pool creation -----")
val pool = Executors.newFixedThreadPool(num_of_threads)
Logging.log("> submitting tables")
for(t <- jobTablesList) {
val db = t(0)
val tb = t(1)
val st = t(2).toInt
pool.submit(new DataMigration( db, tb, st, retry_times, file_threads))
}
pool.shutdown()
pool.awaitTermination(10000, TimeUnit.HOURS)
Logging.log("----- DATA MIGRATION END! -----")
Я уверен, что это 275, потому что я сначала печатаю длину jobTablesList. Класс DataMigration расширяет Runnable, и это метод запуска:
override def run(): Unit = {
Logging.log("Migration start\n\ttable: " + table + "\n\tdatabase: " + database + "\n\tstatus: " + status)
try {
if (status == 0) createTable()
else readTable()
if (!isTableView) {
if (status == 1) migrateTable()
if (status == 2) validateTable()
}
}
catch {
case e: Throwable => migrationError(e)
}
Logging.log("Migration end\n\ttable:\t" + table + "\n\tdatabase:\t" + database + "\n\tstatus:\t" + status)
}
Как видите, я точно знаю, когда поток начинает и заканчивает свою работу, просто потому, что он печатает эту информацию в журнале.
Проблема в том, что в какой-то момент пул перестает назначать задачи, и пока оставшиеся в очереди продолжают обрабатываться и выполняться, больше не добавляются.
например, при последнем запуске скрипт застрял в этом состоянии (num_of_threads = 15):
почему пул не планирует другие потоки? Загрузка процессора очень низкая, а оперативная память в порядке.
Кто-то может помочь мне понять?
Спасибо!
Это потому, что вы вызываете pool.shutdown()
до того, как все задачи будут обработаны. Вам нужно найти способ вызвать завершение работы только после того, как все задачи были обработаны.
Нет, отключение только гарантирует, что никакой другой поток не будет отправлен после
Через какое-то время я понял...
По сути, у меня был класс DataMigration, определенный в ДВУХ РАЗНЫХ ФАЙЛАХ, которые не взаимодействуют друг с другом, внутри ДВУХ РАЗНЫХ ОБЪЕКТОВ (конечно, с другим именем), и Scala, по-видимому, это не нравится, но без сообщения об этом
Он просто случайно перестал отправлять темы через некоторое время
Я удалил второй файл, и в настоящее время он продолжает работать ожидаемым образом.
Какой вообще смысл использовать Scala, если вы собираетесь использовать его как Java? - В любом случае, вы можете попробовать вместо этого использовать
Futures
? вы можете создать свой собственныйExecutionContext
из фиксированного пула и использоватьFuture.traverse
для составления всех фьючерсов в один.