Эффективный способ реализации потоков в Java для многих записей БД

Нахожусь в стадии разработки экспериментальной системы на Java с тяжелой БД MySQL, содержащей тысячи записей, для каждой из которых требуется выполнить ряд операций, причем параллельно.

Я хорошо знаю об использовании потоков Java, но я не знаю, как лучше всего/эффективнее использовать его для множества записей из БД.

Предположим, мы смотрим на следующую таблицу БД:

Table technicians
    ID          NUMBER
    DISTRICT_CODE       NUMBER NOT NULL
    EVENT_START_DATE    DATE NOT NULL
    EVENT_END_DATE      DATE NOT NULL
    INCHARGE        NUMBER NOT NULL
    EFFECTIVE_FROM      DATE DEFAULT SYSDATE NOT NULL
    EFFECTIVE_TO        DATE
    STATUS          NUMBER NOT NULL

Затем мы выполним следующее извлечение:

SELECT * FROM technicians WHERE INCHARGE = 23;

Теперь я серьезно обсуждаю, помещать ли извлеченную информацию в список (например, ArrayList) или другую структуру данных (обратите внимание, что каждое извлечение содержит около 4000 записей, и это происходит каждые 3 секунды снова и снова) и как реализовать Thread для каждой записи отдельно.

Наивная идея, которая возникла, заключается в том, что после запроса к БД и получения информации просматривайте каждую запись записи в цикле (например, sql.hasNext()) и запускайте объект ThreadPoolExecutor для каждой записи, но я склонен полагать, что есть более эффективные и быстрые способы.

Любое предложение приветствуется

Обновлено: я вижу, что были подняты вопросы о действиях, которые необходимо предпринять для каждой из записей, поэтому я постараюсь ответить на них.

Для каждой строки мы будем запускать несколько разных API для каждого поля, чтобы убедиться в правильности типа ответа (например, правильный, неправильный, правильный, но значение короткое и т. д.) и т. д.

Для меня важно отметить, что каждое из действий происходит по отношению к API, который является внешним по отношению к системе (находится на другом удаленном сервере), поэтому иногда для одного поля будет сделано несколько вызовов разных API, поэтому высокая мощность и параллельная работа важны.

Например:

Для поля INCHARGE - мы отправим значение во внешний источник API, который проверит данные, и, если информация верна, мы снова отправим поле в другой API, и мы получим информацию, связанную с ним.

Без предоставления более подробной информации о том, какие операции выполняются, трудно ответить. Также вы не указали, существуют ли какие-либо требования к прогонам наборов данных, такие как максимальное время выполнения или может ли одновременно выполняться только одна работающая операция и так далее.

SpaceTrucker 11.05.2022 15:36

может быть неправильным, но мне кажется, что это идеальный сценарий для Spring Batch. Может стоит подумать.

Pavel 11.05.2022 16:28

Тип рабочей нагрузки очень важен при многопоточности для задач базы данных. Если он в основном читает, то это нормально, но если у вас больше записей, вы можете столкнуться с конфликтующими записями, работающими с одними и теми же данными. Если вы попытаетесь решить эту проблему с помощью блокировки записей, вы вызовете тупиковую ситуацию с блокировкой. Если каждый поток имеет свой собственный набор данных, который не взаимодействует с данными других потоков, тогда все потокобезопасно.

Cheshiremoe 11.05.2022 20:01

Что именно вы имеете в виду под «выполнить ряд операций»? Обработка нескольких тысяч записей каждые три секунды не представляет большой нагрузки, если вы просто выполняете какие-то вычисления или создаете отчет и не будете создавать потоки. Я голосую за закрытие как неясное, требующее подробностей.

Basil Bourque 11.05.2022 20:15

Пожалуйста, уточните вашу конкретную проблему или предоставьте дополнительную информацию, чтобы выделить именно то, что вам нужно. Как сейчас написано, трудно точно сказать, о чем вы спрашиваете.

Community 11.05.2022 20:24

@SpaceTrucker Я только что отредактировал свой пост, надеюсь, теперь все немного прояснилось. В любом случае действия относятся не к простым математическим операциям или использованию локальных функций, а к использованию внешнего по отношению к системе API.

Zar Tel 12.05.2022 09:28
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
6
47
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Кажется, вы хотите обрабатывать некоторые строки в базе данных каждые три секунды. Для каждого раза вы хотите запросить около четырех тысяч строк. Каждая из этих строк должна обрабатываться отдельно, независимо от других строк в этой таблице. Похоже, вы не обновляете строки, а вместо этого отправляете данные строки через вызовы других служб, например вызовы веб-служб.

Да, пользоваться услугами исполнителя

Поэтому загрузите данные в память, так как объем кажется низким. Определите класс для хранения данных для каждой строки. Поскольку мы используем этот класс в первую очередь для прозрачной и неизменной передачи данных, определите класс как записывать.

record Technician ( int id , LocalDate eventStart , … ) {}

Создавайте экземпляры этих объектов Technician по мере того, как вы зацикливаете набор результатов из своего запроса.

Для каждого объекта Technician перейдите к конструктору класса, реализующего Callable. Метод run этого класса определяет работу, которую необходимо выполнить при обработке данных этой строки, переходе к вызовам веб-службы и т. д.

Callable возвращает значение. Давайте определим еще одну запись, чтобы сигнализировать об успехе/неуспехе и идентификаторе записи.

record TechnicianProcessingResult ( int id , boolean succeeded ) {}

Сделайте эту запись похожей на нашу Callable.

class ProcessTechnicianTask implements Callable< TechnicianProcessingResult > {

    private final Technician technician ;

    ProcessTechnicianTask( Technician t ) { // Constructor.
        this.technician = t ;
    }
 
    public TechnicianProcessingResult call() {
        System.out.println( "Processing technician Id " + this.technician.id );
        …
        return new TechnicianProcessingResult( this.technician.id , true ) ;
        … or …
        return new TechnicianProcessingResult( this.technician.id , false ) ;
    }
}

Создайте задачу для каждого объекта Technician, который вы создали для каждой строки, извлеченной из базы данных. Собери задания.

List< ProcessTechnicianTask > tasks = new ArrayList<>() ;
…
tasks.add( new ProcessTechnicianTask( nthTechnician ) ) ;

Отправьте этот набор задач службе-исполнителю, которую вы уже установили. Как правило, указывайте почти столько потоков, сколько доступно ядер процессора.

ExecutorService executorService = Executors.newFixedThreadPool( 5 ) ;
…
List< Future< TechnicianProcessingResult > > futures = executorService.invokeAll( tasks , 3 , TimeUnit.SECONDS ) ;

Обратите внимание на аргументы тайм-аута, которые срабатывают в случае, если что-то пойдет не так и выполнение ваших задач займет слишком много времени.

Проверьте список фьючерсов, чтобы увидеть, являются ли они сделано, и были ли они отменен, и проверьте их объект результата.

Вы хотите повторять это каждые три секунды. Так что также создайте однопоточный ScheduledExecutorService. Запланируйте повторяющуюся задачу, Runnable или Callable, которая выполняет вышеуказанную работу по запросу к базе данных, создает экземпляры Technician объектов, назначает каждый объект ProcessTechnicianTask, и все это передается нашей другой службе-исполнителю.

Обязательно корректно отключите объекты службы исполнителя. В противном случае их резервные пулы потоков могут продолжать работать бесконечно, как зомби ?‍♂️. См. шаблонный код в Javadoc.

Все это уже много раз обсуждалось на Stack Overflow. Так что ищите, чтобы узнать больше.

Вы, кажется, имели в виду этот подход. Но вы задавались вопросом, есть ли «более эффективные и быстрые способы». Нет, лучшего выхода я не вижу. Узким местом здесь являются сетевые вызовы к вашему API, предположительно вызовы веб-сервисов. Создание объектов записи, их сбор и отправка службам-исполнителям будут выполняться очень быстро по сравнению с потоками, ожидающими ответа от сетевых вызовов.

Проект Ткацкий станок

Единственное, что может значительно повысить производительность в вашем сценарии, — это виртуальные потоки и структурированный параллелизм, обещанные Проект Ткацкий станок.

В текущей версии Java каждый поток Java сопоставляется непосредственно с потоком хост-ОС. Каждый из ваших нескольких потоков будет бездействовать во время вызовов веб-служб, останавливая выполнение до тех пор, пока эти вызовы не вернутся. Эти потоки имеют большой вес, поэтому их не может быть очень много.

В Project Loom многие виртуальные потоки сопоставляются с каждым потоком хост-ОС. Эти виртуальные потоки легковесны, поэтому у нас могут быть тысячи и даже миллионы. Когда виртуальный поток блокируется, например, в ожидании возврата вызова веб-службы, этот виртуальный поток «припаркован»/«отключен» от потока хост-ОС, чтобы другой виртуальный поток мог использовать хост-поток. Таким образом, другие виртуальные потоки могут выполнять свою работу, пока предыдущий виртуальный поток ожидает возврата вызова своего веб-сервиса.

В вашей ситуации одновременно может выполняться намного больше вызовов веб-служб, а не только несколько. Ядра ЦП на вашем компьютере остаются гораздо более загруженными, поэтому большее количество строк обрабатывается за меньшее время. Вы можете увидеть многократное улучшение времени обработки.

Проект Loom незакончен, но находится в активной разработке. Экспериментальные сборки доступен сейчас основаны на Java 19 раннего доступа. См. статьи, интервью и презентации с членами команды, включая Рона Пресслера и Алана Бейтмана.

Другие вопросы по теме