JdbcPagingItemReader отсутствует для чтения нескольких записей при использовании с SimpleAsyncTaskExecutor

Я пытаюсь прочитать таблицу и сбросить данные в файл

Моя конфигурация работы выглядит так

Исполнитель задач -->

   @Bean
    public TaskExecutor tvlTaskExecutor() {

        SimpleAsyncTaskExecutor asyncTaskExecutor = new SimpleAsyncTaskExecutor();
        asyncTaskExecutor.setConcurrencyLimit(5);
        return asyncTaskExecutor;
    }

Шаг настройки -->

@Bean
public Step tvlFileCreationStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
    return new StepBuilder("tvl-file-creation-step", jobRepository)
            .<String, String>chunk(50, transactionManager)
            .reader(tvlTableDataItemReader())
            .writer(tvlFileWriter)
            .listener(tvlStepExecutionListener)
            .taskExecutor(tvlTaskExecutor())
            .build();
}

конфигурация задания -->

@Bean(name = "tvlRunJob")
public Job tvlRunJob( JobRepository jobRepository,  Step tvlFileCreationStep) {
    return new JobBuilder("tvlFileCreationJob", jobRepository)
            .start(tvlFileCreationStep)
            .build();

}

и, наконец, читатель -->

public JdbcPagingItemReader<String> jdbcPagingItemReader(DataSource dataSource) throws Exception {

    // reading ITAG keys records using JDBC in a paging fashion
    JdbcPagingItemReader<String> reader = new JdbcPagingItemReader<String>();
    reader.setDataSource(dataSource);
    reader.setPageSize(50);
    reader.setSaveState(true);
    reader.setQueryProvider(createQuery(dataSource));
    reader.setRowMapper(new TVLRowMapper());
    return reader;



}

private PagingQueryProvider createQuery(DataSource dataSource) throws Exception {
    final Map<String, Order> sortKeys = new HashMap<>();
    sortKeys.put("TAG_SERIAL_NUMBER", Order.ASCENDING);

    final SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
    queryProvider.setDataSource(dataSource);
    queryProvider.setSelectClause(" TAG_SERIAL_NUMBER ");
    queryProvider.setFromClause(" ITAGENTITY_10052024 ");
    queryProvider.setWhereClause(" TAG_SERIAL_NUMBER like '09900%' ");
    queryProvider.setSortKeys(sortKeys);
    return queryProvider.getObject();
}

Моя конфигурация писателя выглядит так -->

using XMLOutputFactory2 - from the stax2
and using XMLStreamWriter2 as writer - from stax2 implementaion 
which is thread safe as per documentation 
package path - org.codehaus.stax2.*;

Когда я использую однопоточный подход - он читает все записи, но когда я использую TaskExecutor - он пропускает несколько записей, не хватает 4-5 записей из 700 записей.

(Примечание: фактический объем огромен. Поэтому мне нужно реализовать многопоточный подход.)

Пожалуйста, дайте мне знать, если я сделал что-то не так или неправильно. Любая помощь приветствуется, спасибо

Я сомневаюсь, что это программа чтения, одновременная запись в файл просто не будет работать, могут возникнуть ситуации, когда каждый экземпляр записи указывает на одно и то же место в файле, перезаписывая изменения друг друга (или может даже ухудшиться).

M. Deinum 21.05.2024 07:17
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
1
54
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Это был очень глупый промах, JdbcPagingItemReader — при работе с исполнителем задач Async — мы не можем использовать состояние.

Я использовал SimpleAsyncTask — он просто рандомизирует фрагменты и отправляет их писателю, так что с моим писателем все в порядке. Какое-то время я получал некоторые записи больше, а какие-то меньше.

Поэтому для Async просто убедитесь, что для saveState установлено значение false.

 JdbcPagingItemReader<String> reader = new JdbcPagingItemReader<String>();
    reader.setDataSource(dataSource);
    reader.setPageSize(50);
    // change the below line  
    reader.setSaveState(false);
    reader.setQueryProvider(createQuery(dataSource));
    reader.setRowMapper(new TVLRowMapper());
    return reader;

Другие вопросы по теме