Scala FixedThreadPool перестает запускать потоки, даже если предыдущие завершили свою работу

У меня проблема, которая мучает меня уже несколько дней

У меня есть FixedThreadPool с фиксированным размером (для простоты предположим 10), которому я назначаю 275 Runnable:

Logging.log("----- Thread Pool creation -----")
val pool = Executors.newFixedThreadPool(num_of_threads)
Logging.log("> submitting tables")
for(t <- jobTablesList) {
  val db = t(0)
  val tb = t(1)
  val st = t(2).toInt
  pool.submit(new DataMigration( db, tb, st, retry_times, file_threads))
}

pool.shutdown()
pool.awaitTermination(10000, TimeUnit.HOURS)
Logging.log("----- DATA MIGRATION END! -----")

Я уверен, что это 275, потому что я сначала печатаю длину jobTablesList. Класс DataMigration расширяет Runnable, и это метод запуска:

override def run(): Unit = {
  Logging.log("Migration start\n\ttable: " + table + "\n\tdatabase: " + database + "\n\tstatus: " + status)

  try {
    if (status == 0) createTable()
    else readTable()

    if (!isTableView) {
      if (status == 1) migrateTable()
      if (status == 2) validateTable()
    }
  }
  catch {
    case e: Throwable => migrationError(e)
  }

  Logging.log("Migration end\n\ttable:\t" + table + "\n\tdatabase:\t" + database + "\n\tstatus:\t" + status)

}

Как видите, я точно знаю, когда поток начинает и заканчивает свою работу, просто потому, что он печатает эту информацию в журнале.

Проблема в том, что в какой-то момент пул перестает назначать задачи, и пока оставшиеся в очереди продолжают обрабатываться и выполняться, больше не добавляются.

например, при последнем запуске скрипт застрял в этом состоянии (num_of_threads = 15):

  • Начало таблицы: 188
  • Таблица закончилась: 188

почему пул не планирует другие потоки? Загрузка процессора очень низкая, а оперативная память в порядке.

Кто-то может помочь мне понять?

Спасибо!

Какой вообще смысл использовать Scala, если вы собираетесь использовать его как Java? - В любом случае, вы можете попробовать вместо этого использовать Futures? вы можете создать свой собственный ExecutionContext из фиксированного пула и использовать Future.traverse для составления всех фьючерсов в один.

Luis Miguel Mejía Suárez 11.12.2020 16:21
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
1
109
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Это потому, что вы вызываете pool.shutdown() до того, как все задачи будут обработаны. Вам нужно найти способ вызвать завершение работы только после того, как все задачи были обработаны.

Нет, отключение только гарантирует, что никакой другой поток не будет отправлен после

Asoftware 14.01.2021 11:47
Ответ принят как подходящий

Через какое-то время я понял...

По сути, у меня был класс DataMigration, определенный в ДВУХ РАЗНЫХ ФАЙЛАХ, которые не взаимодействуют друг с другом, внутри ДВУХ РАЗНЫХ ОБЪЕКТОВ (конечно, с другим именем), и Scala, по-видимому, это не нравится, но без сообщения об этом

Он просто случайно перестал отправлять темы через некоторое время

Я удалил второй файл, и в настоящее время он продолжает работать ожидаемым образом.

Другие вопросы по теме