Спать в Spring Boot

У меня запланирована задача (@EnableSceduling) в веб-службе Spring Boot, которая регулярно повторяется. Когда эта задача запускается, она вызывает метод Runnable/run зарегистрированного объекта. В этом методе запуска мне нужно выполнить работу и не выходить из этого метода запуска, пока работа не будет завершена. Проблема в том, что у меня есть другие потоки, выполняющие другую работу, которая необходима этому потоку выполнения для его работы. Итак, в потоке выполнения у меня есть что-то вроде этого:

@Component
public class DoWork implements Runnable {
    
    @override
    public void run() {
    
    // Setup clients.

    // Call services.
    Mono<String> response1 = client1.post();
    response1.subscribe(new MyResponseCallback(), new MyErrorCallback()); 
    
    Mono<String> response2 = client2.post();
    response2.subscribe(new MyResponseCallback(), new MyErrorCallback()); 

    Mono<String> responseX = clientX.post();
    responseX.subscribe(new MyResponseCallback(), new MyErrorCallback());
        
    while(callbacksWorkCompletedFlag == false) {
        
            Thread.sleep (1000);
        }

        // Do computation with callback responses.

        // After computation is completed, exit run method.
    }
}

public class MyResponseCallback implements Consumer<String> {
    
    @override
    public void accept (final Sting response) {
    
        // Do work with response.
    }
}

public class MyErrorCallback implements Consumer<Throwable> {
    
    @override
    public void accept (final Throwable error) {
    
        // Log error.
    }
}

Есть ли лучший способ сделать это при загрузке Java/Spring?

Можно ли иметь более полный пример, а не пытаться представить его по описанию? Как создать минимальный воспроизводимый пример. Например, на данный момент я не могу сказать, подойдет ли что-то вроде Future, Semaphore или CyclicBarrier.

scrhartley 18.09.2023 04:41

@scrhartley Видите обновленный код выше? Это помогает?

code4u 18.09.2023 05:28

Вы имеете в виду подписаться , а не onSubscribe? Или я ищу не в том месте?

scrhartley 18.09.2023 05:46

@scrhartley Да, ты прав. Я исправил это.

code4u 18.09.2023 05:48

Сейчас я пытаюсь найти что-то получше, чем CountDownLatch

scrhartley 18.09.2023 05:52

@scrhartley Как передать CountdownLatch в обратные вызовы?

code4u 18.09.2023 06:01

Просто мысль: вам действительно нужны обратные вызовы или вы можете вместо этого использовать Mono.block(), а затем делать все с помощью императивного кода: String response1 = client1.post().block();

scrhartley 19.09.2023 14:45
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
7
109
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Вот пример использования CompletableFuture. Он использует третий параметр для Mono.subscribe, чтобы сообщить будущему, когда это будет сделано.

@Override
public void run() {
    Mono<String> response1 = client1.post();
    CompletableFuture<?> future1 = new CompletableFuture<>();
    response1.subscribe(
                new MyResponseCallback(), new MyErrorCallback(),
                () -> future1.complete(null));

    Mono<String> response2 = client2.post();
    CompletableFuture<?> future2 = new CompletableFuture<>();
    response2.subscribe(
                new MyResponseCallback(), new MyErrorCallback(),
                () -> future2.complete(null));

    Mono<String> responseX = clientX.post();
    CompletableFuture<?> futureX = new CompletableFuture<>();
    responseX.subscribe(
                new MyResponseCallback(), new MyErrorCallback(),
                () -> futureX.complete(null));
    
    CompletableFuture.allOf(future1, future2, futureX).join();
}

Вот пример CountDownLatch:

@Override
public void run() {
    CountDownLatch latch = new CountDownLatch(3);

    Mono<String> response1 = client1.post();
    response1.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);

    Mono<String> response2 = client2.post();
    response2.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);

    Mono<String> responseX = clientX.post();
    responseX.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);
    
    try {
        latch.await();
    } catch (InterruptedException ex) {}
}

Еще один CompletableFuture пример:

@Override
public void run() {
    List<CompletableFuture<?>> futures = new ArrayList<>();
    Supplier<Runnable> onDone = () -> {
        CompletableFuture<?> future = new CompletableFuture<>();
        futures.add(future);
        return () -> future.complete(null);
    };

    Mono<String> response1 = client1.post();
    response1.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());

    Mono<String> response2 = client2.post();
    response2.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());

    Mono<String> responseX = clientX.post();
    responseX.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());

    CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
}

Все ли обратные вызовы действительно необходимы?

@Override
public void run() {
    // Make requests
    Mono<String> responseMono1 = client1.post();
    Mono<String> responseMono2 = client2.post();
    Mono<String> responseMonoX = clientX.post();
    try {
        // Wait for requests to complete
        String response1 = responseMono1.block();
        String response2 = responseMono2.block();
        String responseX = responseMonoX.block();

        ...
    }
    catch (RuntimeException e) {
        ...
    }
}

Почему ты глотаешь IterruptedException вместо того, чтобы пометить тред как прерванный?

Maksim Eliseev 18.09.2023 07:00

1. InterruptedException обычно не возникает, и даже если очень постараться, какая-то часть библиотечной цепочки все равно не справится с ним должным образом. 2. Я не хотел никого отпугивать кодом. 3. Не обязательно существует один способ справиться с этим, например. возможно, вы зарегистрируете его здесь и тогда, или, может быть, вы повторно создадите его после сброса флага.

scrhartley 18.09.2023 07:08

Было бы здорово предоставить в вашем ответе дополнительную информацию о прерывании потока. Если этот код выполняется в потоке, созданном пользователем, прерванное исключение может быть проглочено в соответствии с политикой прерывания, специфичной для пользователя. Под «созданным пользователем» я имею в виду new Thread(runnable).start(). Но я думаю, что создание новой темы, в которой будет CountdownLatch или CompletableFuture, — это своего рода накладные расходы. И если этот код выполняется в потоке Spring, вам не следует проглатывать InterruptedException, потому что вы не знаете политику прерываний Spring.

Maksim Eliseev 18.09.2023 07:53

В этом примере делается попытка синхронизации между потоками, и я представил механизмы синхронизации между потоками (в конечном счете, реактивность поддерживается пулом потоков). Я не создаю никаких новых тем, просто использую существующие возможности языка (вероятно, задействован какой-то CAS). Я не утверждаю, что моя обработка InterruptedException оптимальна. Отмечу, что в примере Oracle Javadoc они либо объявляют исключение (чего мы не можем сделать здесь, поскольку это Runnable), либо делают то же самое, что и я.

scrhartley 18.09.2023 14:21

Обновлен ответ, включающий использование Mono.block().

scrhartley 19.09.2023 15:00

Вы используете реактивное программирование, но забудьте об этом и пытаетесь решить проблему императивно. Вам не нужно спать, вместо этого используйте возможности Project Reactor.

Вы можете использовать zip, чтобы объединить результаты разных Mono вместе, а затем map получить то, что вам нужно. Нет необходимости проверять boolean, используйте защелки обратного отсчета и т. д.

Mono<String> response1 = client1.post();   
Mono<String> response2 = client2.post();
Mono<String> responseX = clientX.post();
Mono<String> result = Mono
  .zip(response1, response2, response3)
  .map(//dosomething with the result of the 3 mono's)
  .subscribe();

Что вы делаете в своем Consumer, немного неясно, но вы, вероятно, можете использовать для этого и map для каждого Mono (или, если это просто установка логического значения, вы можете удалить их).

Другие вопросы по теме