Я пытаюсь прочитать таблицу и сбросить данные в файл
Моя конфигурация работы выглядит так
Исполнитель задач -->
@Bean
public TaskExecutor tvlTaskExecutor() {
SimpleAsyncTaskExecutor asyncTaskExecutor = new SimpleAsyncTaskExecutor();
asyncTaskExecutor.setConcurrencyLimit(5);
return asyncTaskExecutor;
}
Шаг настройки -->
@Bean
public Step tvlFileCreationStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) throws Exception {
return new StepBuilder("tvl-file-creation-step", jobRepository)
.<String, String>chunk(50, transactionManager)
.reader(tvlTableDataItemReader())
.writer(tvlFileWriter)
.listener(tvlStepExecutionListener)
.taskExecutor(tvlTaskExecutor())
.build();
}
конфигурация задания -->
@Bean(name = "tvlRunJob")
public Job tvlRunJob( JobRepository jobRepository, Step tvlFileCreationStep) {
return new JobBuilder("tvlFileCreationJob", jobRepository)
.start(tvlFileCreationStep)
.build();
}
и, наконец, читатель -->
public JdbcPagingItemReader<String> jdbcPagingItemReader(DataSource dataSource) throws Exception {
// reading ITAG keys records using JDBC in a paging fashion
JdbcPagingItemReader<String> reader = new JdbcPagingItemReader<String>();
reader.setDataSource(dataSource);
reader.setPageSize(50);
reader.setSaveState(true);
reader.setQueryProvider(createQuery(dataSource));
reader.setRowMapper(new TVLRowMapper());
return reader;
}
private PagingQueryProvider createQuery(DataSource dataSource) throws Exception {
final Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("TAG_SERIAL_NUMBER", Order.ASCENDING);
final SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource);
queryProvider.setSelectClause(" TAG_SERIAL_NUMBER ");
queryProvider.setFromClause(" ITAGENTITY_10052024 ");
queryProvider.setWhereClause(" TAG_SERIAL_NUMBER like '09900%' ");
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
Моя конфигурация писателя выглядит так -->
using XMLOutputFactory2 - from the stax2
and using XMLStreamWriter2 as writer - from stax2 implementaion
which is thread safe as per documentation
package path - org.codehaus.stax2.*;
Когда я использую однопоточный подход - он читает все записи, но когда я использую TaskExecutor - он пропускает несколько записей, не хватает 4-5 записей из 700 записей.
(Примечание: фактический объем огромен. Поэтому мне нужно реализовать многопоточный подход.)
Пожалуйста, дайте мне знать, если я сделал что-то не так или неправильно. Любая помощь приветствуется, спасибо




Это был очень глупый промах, JdbcPagingItemReader — при работе с исполнителем задач Async — мы не можем использовать состояние.
Я использовал SimpleAsyncTask — он просто рандомизирует фрагменты и отправляет их писателю, так что с моим писателем все в порядке. Какое-то время я получал некоторые записи больше, а какие-то меньше.
Поэтому для Async просто убедитесь, что для saveState установлено значение false.
JdbcPagingItemReader<String> reader = new JdbcPagingItemReader<String>();
reader.setDataSource(dataSource);
reader.setPageSize(50);
// change the below line
reader.setSaveState(false);
reader.setQueryProvider(createQuery(dataSource));
reader.setRowMapper(new TVLRowMapper());
return reader;
Я сомневаюсь, что это программа чтения, одновременная запись в файл просто не будет работать, могут возникнуть ситуации, когда каждый экземпляр записи указывает на одно и то же место в файле, перезаписывая изменения друг друга (или может даже ухудшиться).