Нахожусь в стадии разработки экспериментальной системы на Java с тяжелой БД MySQL, содержащей тысячи записей, для каждой из которых требуется выполнить ряд операций, причем параллельно.
Я хорошо знаю об использовании потоков Java, но я не знаю, как лучше всего/эффективнее использовать его для множества записей из БД.
Предположим, мы смотрим на следующую таблицу БД:
Table technicians
ID NUMBER
DISTRICT_CODE NUMBER NOT NULL
EVENT_START_DATE DATE NOT NULL
EVENT_END_DATE DATE NOT NULL
INCHARGE NUMBER NOT NULL
EFFECTIVE_FROM DATE DEFAULT SYSDATE NOT NULL
EFFECTIVE_TO DATE
STATUS NUMBER NOT NULL
Затем мы выполним следующее извлечение:
SELECT * FROM technicians WHERE INCHARGE = 23;
Теперь я серьезно обсуждаю, помещать ли извлеченную информацию в список (например, ArrayList) или другую структуру данных (обратите внимание, что каждое извлечение содержит около 4000 записей, и это происходит каждые 3 секунды снова и снова) и как реализовать Thread для каждой записи отдельно.
Наивная идея, которая возникла, заключается в том, что после запроса к БД и получения информации просматривайте каждую запись записи в цикле (например, sql.hasNext()) и запускайте объект ThreadPoolExecutor для каждой записи, но я склонен полагать, что есть более эффективные и быстрые способы.
Любое предложение приветствуется
Обновлено: я вижу, что были подняты вопросы о действиях, которые необходимо предпринять для каждой из записей, поэтому я постараюсь ответить на них.
Для каждой строки мы будем запускать несколько разных API для каждого поля, чтобы убедиться в правильности типа ответа (например, правильный, неправильный, правильный, но значение короткое и т. д.) и т. д.
Для меня важно отметить, что каждое из действий происходит по отношению к API, который является внешним по отношению к системе (находится на другом удаленном сервере), поэтому иногда для одного поля будет сделано несколько вызовов разных API, поэтому высокая мощность и параллельная работа важны.
Например:
Для поля INCHARGE - мы отправим значение во внешний источник API, который проверит данные, и, если информация верна, мы снова отправим поле в другой API, и мы получим информацию, связанную с ним.
может быть неправильным, но мне кажется, что это идеальный сценарий для Spring Batch. Может стоит подумать.
Тип рабочей нагрузки очень важен при многопоточности для задач базы данных. Если он в основном читает, то это нормально, но если у вас больше записей, вы можете столкнуться с конфликтующими записями, работающими с одними и теми же данными. Если вы попытаетесь решить эту проблему с помощью блокировки записей, вы вызовете тупиковую ситуацию с блокировкой. Если каждый поток имеет свой собственный набор данных, который не взаимодействует с данными других потоков, тогда все потокобезопасно.
Что именно вы имеете в виду под «выполнить ряд операций»? Обработка нескольких тысяч записей каждые три секунды не представляет большой нагрузки, если вы просто выполняете какие-то вычисления или создаете отчет и не будете создавать потоки. Я голосую за закрытие как неясное, требующее подробностей.
Пожалуйста, уточните вашу конкретную проблему или предоставьте дополнительную информацию, чтобы выделить именно то, что вам нужно. Как сейчас написано, трудно точно сказать, о чем вы спрашиваете.
@SpaceTrucker Я только что отредактировал свой пост, надеюсь, теперь все немного прояснилось. В любом случае действия относятся не к простым математическим операциям или использованию локальных функций, а к использованию внешнего по отношению к системе API.




Кажется, вы хотите обрабатывать некоторые строки в базе данных каждые три секунды. Для каждого раза вы хотите запросить около четырех тысяч строк. Каждая из этих строк должна обрабатываться отдельно, независимо от других строк в этой таблице. Похоже, вы не обновляете строки, а вместо этого отправляете данные строки через вызовы других служб, например вызовы веб-служб.
Поэтому загрузите данные в память, так как объем кажется низким. Определите класс для хранения данных для каждой строки. Поскольку мы используем этот класс в первую очередь для прозрачной и неизменной передачи данных, определите класс как записывать.
record Technician ( int id , LocalDate eventStart , … ) {}
Создавайте экземпляры этих объектов Technician по мере того, как вы зацикливаете набор результатов из своего запроса.
Для каждого объекта Technician перейдите к конструктору класса, реализующего Callable. Метод run этого класса определяет работу, которую необходимо выполнить при обработке данных этой строки, переходе к вызовам веб-службы и т. д.
Callable возвращает значение. Давайте определим еще одну запись, чтобы сигнализировать об успехе/неуспехе и идентификаторе записи.
record TechnicianProcessingResult ( int id , boolean succeeded ) {}
Сделайте эту запись похожей на нашу Callable.
class ProcessTechnicianTask implements Callable< TechnicianProcessingResult > {
private final Technician technician ;
ProcessTechnicianTask( Technician t ) { // Constructor.
this.technician = t ;
}
public TechnicianProcessingResult call() {
System.out.println( "Processing technician Id " + this.technician.id );
…
return new TechnicianProcessingResult( this.technician.id , true ) ;
… or …
return new TechnicianProcessingResult( this.technician.id , false ) ;
}
}
Создайте задачу для каждого объекта Technician, который вы создали для каждой строки, извлеченной из базы данных. Собери задания.
List< ProcessTechnicianTask > tasks = new ArrayList<>() ;
…
tasks.add( new ProcessTechnicianTask( nthTechnician ) ) ;
Отправьте этот набор задач службе-исполнителю, которую вы уже установили. Как правило, указывайте почти столько потоков, сколько доступно ядер процессора.
ExecutorService executorService = Executors.newFixedThreadPool( 5 ) ;
…
List< Future< TechnicianProcessingResult > > futures = executorService.invokeAll( tasks , 3 , TimeUnit.SECONDS ) ;
Обратите внимание на аргументы тайм-аута, которые срабатывают в случае, если что-то пойдет не так и выполнение ваших задач займет слишком много времени.
Проверьте список фьючерсов, чтобы увидеть, являются ли они сделано, и были ли они отменен, и проверьте их объект результата.
Вы хотите повторять это каждые три секунды. Так что также создайте однопоточный ScheduledExecutorService. Запланируйте повторяющуюся задачу, Runnable или Callable, которая выполняет вышеуказанную работу по запросу к базе данных, создает экземпляры Technician объектов, назначает каждый объект ProcessTechnicianTask, и все это передается нашей другой службе-исполнителю.
Обязательно корректно отключите объекты службы исполнителя. В противном случае их резервные пулы потоков могут продолжать работать бесконечно, как зомби ?♂️. См. шаблонный код в Javadoc.
Все это уже много раз обсуждалось на Stack Overflow. Так что ищите, чтобы узнать больше.
Вы, кажется, имели в виду этот подход. Но вы задавались вопросом, есть ли «более эффективные и быстрые способы». Нет, лучшего выхода я не вижу. Узким местом здесь являются сетевые вызовы к вашему API, предположительно вызовы веб-сервисов. Создание объектов записи, их сбор и отправка службам-исполнителям будут выполняться очень быстро по сравнению с потоками, ожидающими ответа от сетевых вызовов.
Единственное, что может значительно повысить производительность в вашем сценарии, — это виртуальные потоки и структурированный параллелизм, обещанные Проект Ткацкий станок.
В текущей версии Java каждый поток Java сопоставляется непосредственно с потоком хост-ОС. Каждый из ваших нескольких потоков будет бездействовать во время вызовов веб-служб, останавливая выполнение до тех пор, пока эти вызовы не вернутся. Эти потоки имеют большой вес, поэтому их не может быть очень много.
В Project Loom многие виртуальные потоки сопоставляются с каждым потоком хост-ОС. Эти виртуальные потоки легковесны, поэтому у нас могут быть тысячи и даже миллионы. Когда виртуальный поток блокируется, например, в ожидании возврата вызова веб-службы, этот виртуальный поток «припаркован»/«отключен» от потока хост-ОС, чтобы другой виртуальный поток мог использовать хост-поток. Таким образом, другие виртуальные потоки могут выполнять свою работу, пока предыдущий виртуальный поток ожидает возврата вызова своего веб-сервиса.
В вашей ситуации одновременно может выполняться намного больше вызовов веб-служб, а не только несколько. Ядра ЦП на вашем компьютере остаются гораздо более загруженными, поэтому большее количество строк обрабатывается за меньшее время. Вы можете увидеть многократное улучшение времени обработки.
Проект Loom незакончен, но находится в активной разработке. Экспериментальные сборки доступен сейчас основаны на Java 19 раннего доступа. См. статьи, интервью и презентации с членами команды, включая Рона Пресслера и Алана Бейтмана.
Без предоставления более подробной информации о том, какие операции выполняются, трудно ответить. Также вы не указали, существуют ли какие-либо требования к прогонам наборов данных, такие как максимальное время выполнения или может ли одновременно выполняться только одна работающая операция и так далее.