Проблема производительности весенней партии JPAItemReader

Ниже приведена конфигурация моего весеннего пакетного задания, которое берет записи из БД, выполняет некоторую обработку в процессоре элементов, обновляет столбец состояния и записывает обратно в БД.

Когда я запустил 10 тыс. записей, я мог видеть, что он берет каждую запись одну за другой и обновляет статус таким же образом. Изначально я планировал использовать многопоточность, но это не имеет никакого смысла, так как моя работа выполняется один раз в день с количеством записей от 10 до 100 тыс. (Записи составляют менее 5 тысяч в большинстве дней, а очень немногие дни в году (от 5 до 10 дней) составляют от 50 до 100 тысяч).

Я не хочу добавлять больше процессоров и платить за Kubernetes только 10 дней в году. Теперь проблема в том, что когда я запускал это задание, требуется всего 100 записей, которые он запускает каждый запрос на выборку независимо, а не по 100 за раз. Кроме того, обновление также выполняется по одной записи за раз, и для обработки 10 000 записей требуется 10 минут, что очень медленно.

Как сделать более быстрое чтение, обработку и запись? Я могу избавиться от многопоточности и немного увеличить загрузку процессора время от времени. Более подробная информация представлена ​​в виде комментариев в коде.

@Configuration
@EnableBatchProcessing
public class BatchConfiguration extends DefaultBatchConfigurer{

public final static Logger logger = LoggerFactory.getLogger(BatchConfiguration.class);

@Autowired
JobBuilderFactory jobBuilderFactory;

@Autowired
StepBuilderFactory stepBuilderFactory;

@Autowired
MyRepository myRepository;


@Autowired
private EntityManagerFactory entityManagerFactory;

@Value("${chunk-size}")
private int chunkSize;

@Value("${max-threads}")
private int maxThreads;

private final DataSource dataSource;


/**
 * @param dataSource
 * Override to do not set datasource even if a datasource exist during intialization.
 * Initialize will use a Map based JobRepository (instead of database) for Spring batch meta tables
 */
@Override
public void setDataSource(DataSource dataSource) {
}

@Override
public PlatformTransactionManager getTransactionManager() {
    return jpaTransactionManager();
}


@Autowired
public BatchConfiguration(@Qualifier("dataSource") DataSource dataSource) {
    this.dataSource = dataSource;
}

@Bean
public JpaTransactionManager jpaTransactionManager() {
    final JpaTransactionManager transactionManager = new JpaTransactionManager();
    transactionManager.setDataSource(dataSource);
    return transactionManager;
}


@Bean
@StepScope
public JdbcPagingItemReader<ModelEntity> importReader() {  // I tried using RepositoryItemReader but records were skipped by JPA hence I went for JdbcPagingItemReader
    JdbcPagingItemReader<ModelEntity> reader = new JdbcPagingItemReader<ModelEntity>();
    final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
    sqlPagingQueryProviderFactoryBean.setDataSource( dataSource );
    sqlPagingQueryProviderFactoryBean.setSelectClause( "SELECT *" );
    sqlPagingQueryProviderFactoryBean.setFromClause( "FROM mytable" );
    sqlPagingQueryProviderFactoryBean.setWhereClause( "WHERE STATUS = 'myvalue' " );
    sqlPagingQueryProviderFactoryBean.setSortKey( "primarykey" );
    try {
        reader.setQueryProvider( sqlPagingQueryProviderFactoryBean.getObject() );
    } catch (Exception e) {
        e.printStackTrace();
    }
    reader.setDataSource( dataSource );
    reader.setPageSize( chunkSize );
    reader.setSaveState( Boolean.FALSE );
    reader.setRowMapper( new BeanPropertyRowMapper<ModelEntity>(ModelEntity.class ) );
    return reader;
}



@Bean
public ItemWriter<ModelEntity> databaseWriter() {
    RepositoryItemWriter<ModelEntity> repositoryItemWriter=new RepositoryItemWriter<>();
    repositoryItemWriter.setRepository(myRepository);
    repositoryItemWriter.setMethodName("save");
    return repositoryItemWriter;
}

@Bean
public Myprocessor myprocessor() { 
    return new Myprocessor();
}

@Bean
public JobExecutionListener jobExecutionListener() {
    return new JobExecutionListener();
}

@Bean
public StepExecutionListener stepExecutionListener() {
    return new StepExecutionListener();
}

@Bean
public ChunkExecutionListener chunkListener() {
    return new ChunkExecutionListener();
}

@Bean
public TaskExecutor taskExecutor() {
 SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
 taskExecutor.setConcurrencyLimit(maxThreads);
return taskExecutor;
}

@Bean
public Job processJob() {
    return jobBuilderFactory.get("myjob")
            .incrementer(new RunIdIncrementer())
            .start(processStep())
            .listener(jobExecutionListener())
            .build();
}

@Bean
public Step processStep() {
    return stepBuilderFactory.get("processStep")
            .<ModelEntity,ModelEntity>chunk(chunkSize)
            .reader(importReader())
            .processor(myprocessor())
            .writer(databaseWriter())
            .taskExecutor(taskExecutor())
            .listener(stepExecutionListener())
            .listener(chunkListener())
            .transactionManager(getTransactionManager())
            .throttleLimit(maxThreads)
            .build();
    }

}

Я использую репозиторий JpaRepository и код ниже. (Предполагая, что метод сохранения его родительского класса CrudRepository будет сохраняться)

public interface MyRepository extends JpaRepository<ModelEntity, BigInteger> {

}

Процессор, как показано ниже

@Component
public class Myprocessor implements ItemProcessor<Myprocessor,Myprocessor> {

@Override
public ModelEntity process(ModelEntity modelEntity) throws Exception {
    try {
    // This is fast and working fine
       if ((myProcessing)) {
            modelEntity.setStatus(success);
        } else {
            modelEntity.setStatus(failed);
        }
    }
    catch (Exception e){
        logger.info( "Exception occurred while processing"+e );
      }
    return modelEntity;
 }

 // This is fast and working fine
 public Boolean myProcessing(ModelEntity modelEntity){
 //Processor Logic Here
    return processingStatus;
 }

 }

Файл свойств ниже

logging.level.org.hibernate.SQL=DEBUG
logging.level.com.zaxxer.hikari.HikariConfig=DEBUG
logging.level.org.hibernate.type.descriptor.sql.BasicBinder=TRACE 
logging.level.org.springframework.jdbc.core.JdbcTemplate=DEBUG
logging.level.org.springframework.jdbc.core.StatementCreatorUtils=TRACE


spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.url=url
spring.datasource.username=username
spring.datasource.password=password 
spring.jpa.hibernate.connection.provider_class
=org.hibernate.hikaricp.internal.HikariCPConnectionProvider
spring.jpa.database-platform=org.hibernate.dialect.Oracle10gDialect
spring.jpa.show-sql=false
spring.main.allow-bean-definition-overriding=true
spring.batch.initializer.enabled=false
spring.batch.job.enabled=false
spring.batch.initialize-schema=never 
chunk-size=100
max-threads=5

Вы пытались использовать метод saveAll вместо save в вашем repositoryItemWriter?

Mahmoud Ben Hassine 28.01.2019 09:51

Да я тоже самое пробовал. Но дело было не в этом

Learner 29.01.2019 18:43
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
2
1 865
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Вы можете включить пакетную обработку JDBC для операторов INSERT, UPDATE и DELETE всего одним свойством конфигурации:

spring.jpa.properties.hibernate.jdbc.batch_size 

Он определяет количество обновлений, которые одновременно отправляются в базу данных для выполнения.

Подробнее см. эта ссылка

Ответ принят как подходящий

Спасибо всем за предложения. Я сам нашел проблему. Я использовал JdbcPagingItemReader и RepositoryItemWriter. Читатель работал, как и ожидалось, но писатель запускал запрос выбора для каждой записи, переданной после процессора. Я считаю, что причина в том, что запись сохраняется в JPA только после процессора, поскольку читатель не является стандартным читателем JPA. Хотя я не уверен в этом. Но изменение средства записи на JdbcBatchItemWriter устранило проблему.

Другие вопросы по теме