Как запустить Spring Batch Job асинхронно

Я выполнил весенний пакетный документ и не смог запустить свою работу в асинхронном режиме.

Итак, я запускаю задание из веб-контейнера, и задание будет запускаться через конечную точку REST.

Я хотел получить JobInstance ID, чтобы передать его в ответ перед завершением всего задания. Таким образом, они могут позже проверить статус задания по идентификатору JobInstance, а не ждать. Но я не мог заставить его работать. Ниже приведен пример кода, который я пробовал. Пожалуйста, дайте мне знать, что я упускаю или ошибаюсь.

BatchConfig для создания Async JobLauncher

@Configuration
public class BatchConfig {

    @Autowired
    JobRepository jobRepository;


    @Bean
    public JobLauncher simpleJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }
}

Контроллер

@Autowired
JobLauncher jobLauncher;

@RequestMapping(value = "/trigger-job", method = RequestMethod.GET)
public Long workHard() throws Exception {
    JobParameters jobParameters = new JobParametersBuilder().
            addLong("time", System.currentTimeMillis())
            .toJobParameters();
    JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);
    System.out.println(jobExecution.getJobInstance().getInstanceId());
    System.out.println("OK RESPONSE");
    return jobExecution.getJobInstance().getInstanceId();
}

И JobBuilder как компонент

@Component
public class BatchComponent {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    public Job customJob(String someParam) throws Exception {
        return jobBuilderFactory.get("personProcessor")
                .incrementer(new RunIdIncrementer()).listener(listener())
                .flow(personPorcessStep(someParam)).end().build();
    }


    private Step personPorcessStep(String someParam) throws Exception {
        return stepBuilderFactory.get("personProcessStep").<PersonInput, PersonOutput>chunk(1)
                .reader(new PersonReader(someParam)).faultTolerant().
                        skipPolicy(new DataDuplicateSkipper()).processor(new PersonProcessor())
                .writer(new PersonWriter()).build();
    }


    private JobExecutionListener listener() {
        return new PersonJobCompletionListener();
    }

    private class PersonInput {
        String firstName;

        public PersonInput(String firstName) {
            this.firstName = firstName;
        }

        public String getFirstName() {
            return firstName;
        }

        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }
    }

    private class PersonOutput {
        String firstName;

        public String getFirstName() {
            return firstName;
        }

        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }
    }

    public class PersonReader implements ItemReader<PersonInput> {
        private List<PersonInput> items;
        private int count = 0;

        public PersonReader(String someParam) throws InterruptedException {
            Thread.sleep(10000L); //to simulate processing
            //manipulate and provide data in the read method
            //just for testing i have given some dummy example
            items = new ArrayList<PersonInput>();
            PersonInput pi = new PersonInput("john");
            items.add(pi);
        }

        @Override
        public PersonInput read() {
            if (count < items.size()) {
                return items.get(count++);
            }
            return null;
        }
    }


    public class DataDuplicateSkipper implements SkipPolicy {

        @Override
        public boolean shouldSkip(Throwable exception, int skipCount) throws SkipLimitExceededException {
            if (exception instanceof DataIntegrityViolationException) {
                return true;
            }
            return true;
        }
    }


    private class PersonProcessor implements ItemProcessor<PersonInput, PersonOutput> {

        @Override
        public PersonOutput process(PersonInput item) throws Exception {
            return null;
        }
    }

    private class PersonWriter implements org.springframework.batch.item.ItemWriter<PersonOutput> {
        @Override
        public void write(List<? extends PersonOutput> results) throws Exception {
            return;
        }
    }

    private class PersonJobCompletionListener implements JobExecutionListener {
        public PersonJobCompletionListener() {
        }

        @Override
        public void beforeJob(JobExecution jobExecution) {

        }

        @Override
        public void afterJob(JobExecution jobExecution) {
            System.out.println("JOB COMPLETED");
        }
    }
}

Основная функция

@SpringBootApplication
@EnableBatchProcessing
@EnableScheduling
@EnableAsync
public class SpringBatchTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchTestApplication.class, args);
    }
}

Я использую конфигурации на основе аннотаций и использую gradle с пакетом ниже.

compile('org.springframework.boot:spring-boot-starter-batch')

Пожалуйста, дайте мне знать, если потребуется дополнительная информация. Я не смог найти ни одного примера для запуска этого распространенного варианта использования.

Спасибо за уделенное время.

У вас есть исключение?

TinyOS 09.12.2018 06:30

Нет, я не получаю никаких исключений @TinyOS

rram 09.12.2018 06:34

ваша конфигурация мне кажется правильной. Поскольку вы настроили асинхронный исполнитель задач в своем jobLauncher, он должен немедленно вернуть выполнение задания и запустить задание в отдельном потоке. Не могли бы вы подробнее рассказать о couldn't get my job running Asynchronously?

Mahmoud Ben Hassine 09.12.2018 23:53

Я not сразу же получает идентификатор экземпляра задания в ответе контроллера, чтобы его можно было использовать позже для проверки состояния задания. Но здесь инструкция запускается только после того, как работа полностью завершена. @MahmoudBenHassine

rram 10.12.2018 01:13

В этом случае убедитесь, что правильный JobLauncher (тот, который настроен с асинхронным исполнителем задач) введен в ваш контроллер. Вероятно, у вас есть другой jobLauncher с синхронным исполнителем задачи, который выполняет задание до завершения, прежде чем вернуть выполнение задания.

Mahmoud Ben Hassine 10.12.2018 13:39

@MahmoudBenHassine, спасибо. После вашего предложения я использовал bean-компонент с Qualifier, но все тот же, я получаю идентификатор только после завершения задания. Вот что я получаю в консоли IDE, когда нажимаю на контроллер

rram 10.12.2018 15:36
2 OK RESPONSE 2018-12-10 20:04:58.726 INFO 82585 --- [cTaskExecutor-2] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=personProcessor]] launched with the following parameters: [{time=1544452488708}] 2018-12-10 20:04:58.740 INFO 82585 --- [cTaskExecutor-2] o.s.batch.core.job.SimpleStepHandler : Executing step: [personProcessStep] JOB COMPLETED
rram 10.12.2018 15:39
2018-12-10 20:04:58.754 INFO 82585 --- [cTaskExecutor-2] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=personProcessor]] completed with the following parameters: [{time=1544452488708}] and the following status: [COMPLETED]
rram 10.12.2018 15:39
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
3
8
14 817
7

Ответы 7

JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);. Joblauncher будет ждать после завершения задания, прежде чем что-либо вернуть, поэтому ваша служба, вероятно, долго не отвечает, если это ваша проблема. Если вам нужны асинхронные возможности, вы можете взглянуть на Spring @EnableAsync и @Async.

@EnableAsync

@EnableAsync ставится на уровень приложения уже в моей основной функции. Мне также нужно вернуть Id вакансии в ответ. Как это сделать?

rram 09.12.2018 10:54

Хорошо, приношу свои извинения, я не расслышал. Кажется, у вас уже есть правильная настройка, за исключением аннотации вашего метода workHard() с помощью @Async. SimpleAsyncTaskExecutor() также должен немедленно вернуть JobExecution с ExitStatus=Unknown, как указано здесь

Karl Alexander 09.12.2018 11:42

Если я поставлю @Async в контроллер методом workHard(). Я вообще не получаю идентификатор экземпляра задания, но запрос завершается мгновенно с пустым ответом.

rram 09.12.2018 12:25

Несмотря на то, что у вас есть собственный jobLauncher, вы запускаете задание с использованием стандартного jobLauncher, предоставленного Spring. Не могли бы вы автоматически подключить simpleJobLauncher к своему контроллеру и попробовать?

Я согласен, его единственная проблема заключается в том, что он автоматически подключает по умолчанию jobLauncher вместо simpleJobLauncher по имени, для которого он создал bean-компонент.

LethalLima 12.12.2019 05:03

Попробуйте это, в вашей конфигурации вам нужно создать customJobLauncher с SimpleAsyncTaskExecutor, используя @Bean (name = "myJobLauncher"), и тот же @Qualifier будет использоваться в вашем контроллере.

@Bean(name = "myJobLauncher")
public JobLauncher simpleJobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
}

В вашем контроллере

@Autowired
@Qualifier("myJobLauncher")
private JobLauncher jobLauncher;

Пожалуйста, не пишите ответы, состоящие только из кода, постарайтесь уточнить, объясняя, что вы делаете.

JJJ 23.03.2019 12:22

Согласно документации Spring, чтобы вернуть ответ HTTP-запроса асинхронным, необходимо использовать org.springframework.core.task.SimpleAsyncTaskExecutor.

Any implementation of the spring TaskExecutor interface can be used to control how jobs are asynchronously executed.

весенняя пакетная документация

<bean id = "jobLauncher"
  class = "org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name = "jobRepository" ref = "jobRepository" />
<property name = "taskExecutor">
    <bean class = "org.springframework.core.task.SimpleAsyncTaskExecutor" />
</property>

Если я посмотрю на ваш код, я вижу пару ошибок. Прежде всего, ваша настраиваемая конфигурация не загружается, потому что в противном случае внедрение не удастся для повторяющегося экземпляра bean-компонента для того же интерфейса.

В весенней загрузке много волшебства, но если вы не скажете ему про сканирование компонентов, ничего не будет загружено должным образом.

Вторая проблема, которую я вижу, - это ваш класс BatchConfig: он не расширяет DefaultBatchConfigure и не переопределяет getJobLauncher (), поэтому, даже если магия загрузки загрузит все, вы получите значение по умолчанию. Вот конфигурация, которая будет работать, и она соответствует документации @EnableBatchProcessing API

BatchConfig

@Configuration
@EnableBatchProcessing(modular = true)
@Slf4j
public class BatchConfig extends DefaultBatchConfigurer {

  @Override
  @Bean
  public JobLauncher getJobLauncher() {
    try {
      SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
      jobLauncher.setJobRepository(getJobRepository());
      jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
      jobLauncher.afterPropertiesSet();
      return jobLauncher;

    } catch (Exception e) {
      log.error("Can't load SimpleJobLauncher with SimpleAsyncTaskExecutor: {} fallback on default", e);
      return super.getJobLauncher();
    }
  }
}

Основная функция

@SpringBootApplication
@EnableScheduling
@EnableAsync
@ComponentScan(basePackageClasses = {BatchConfig.class})
public class SpringBatchTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchTestApplication.class, args);
    }
}

@ Slf4j это аннотация Lombok, вы можете игнорировать ее, anche ввести журнал Logger вручную

ElPysCampeador 19.09.2019 18:45

Это создает циклическую зависимость между файлом BatchConfig и ModularBatchConfiguration из пакета Spring. Чтобы он работал, просто удалите modular = true из аннотации в BatchConfig.

Luca Pasini 20.06.2021 17:06

Я знаю, что это старый вопрос, но я все равно отправляю этот ответ для будущих пользователей.

После просмотра вашего кода я не могу сказать, почему у вас возникла эта проблема, но я могу предложить вам использовать аннотацию Qualifier, а также использовать ThreadPoolTaskExecutor, как это, и посмотреть, решит ли это вашу проблему.

Вы также можете проверить это руководство: Асинхронная обработка пакетных заданий Spring для получения дополнительной информации. Это поможет вам настроить асинхронное пакетное задание Spring. Это руководство было написано мной.

@Configuration
public class BatchConfig {

 @Autowired
 private JobRepository jobRepository;

 @Bean
 public TaskExecutor threadPoolTaskExecutor(){

  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(12);
        executor.setCorePoolSize(8);
        executor.setQueueCapacity(15);

   return executor;
 }

 @Bean
    public JobLauncher asyncJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();

        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(threadPoolTaskExecutor());
        return jobLauncher;
 }
}

Если вы используете Lombok, это может вам помочь:

TL; DR: Lombok @AllArgsConstructor не работает с аннотацией @QualifierРЕДАКТИРОВАТЬ: если вы включили аннотации @Qualifier в файле lombok.config, чтобы иметь возможность использовать @Qualifier с @AllArgsConstructor следующим образом:

lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier

Я знаю старый вопрос, однако у меня была точно такая же проблема, и ни один из ответов не решил ее.

Я настроил средство запуска асинхронных заданий следующим образом и добавил квалификатор, чтобы убедиться, что этот jobLauncher введен:

 @Bean(name = "asyncJobLauncher")
 public JobLauncher simpleJobLauncher(JobRepository jobRepository) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

И ввел его вот так

@Qualifier("asyncJobLauncher")
private final JobLauncher jobLauncher;

Я использовал Lombok @AllArgsConstructor после того, как изменил его на autowire, была введена правильная программа запуска задания, и теперь задание выполняется асинхронно:

@Autowired
@Qualifier("asyncJobLauncher")
private JobLauncher jobLauncher;

Также мне не пришлось расширять свою конфигурацию с DefaultBatchConfigurer

Другие вопросы по теме