У меня запланирована задача (@EnableSceduling) в веб-службе Spring Boot, которая регулярно повторяется. Когда эта задача запускается, она вызывает метод Runnable/run зарегистрированного объекта. В этом методе запуска мне нужно выполнить работу и не выходить из этого метода запуска, пока работа не будет завершена. Проблема в том, что у меня есть другие потоки, выполняющие другую работу, которая необходима этому потоку выполнения для его работы. Итак, в потоке выполнения у меня есть что-то вроде этого:
@Component
public class DoWork implements Runnable {
@override
public void run() {
// Setup clients.
// Call services.
Mono<String> response1 = client1.post();
response1.subscribe(new MyResponseCallback(), new MyErrorCallback());
Mono<String> response2 = client2.post();
response2.subscribe(new MyResponseCallback(), new MyErrorCallback());
Mono<String> responseX = clientX.post();
responseX.subscribe(new MyResponseCallback(), new MyErrorCallback());
while(callbacksWorkCompletedFlag == false) {
Thread.sleep (1000);
}
// Do computation with callback responses.
// After computation is completed, exit run method.
}
}
public class MyResponseCallback implements Consumer<String> {
@override
public void accept (final Sting response) {
// Do work with response.
}
}
public class MyErrorCallback implements Consumer<Throwable> {
@override
public void accept (final Throwable error) {
// Log error.
}
}
Есть ли лучший способ сделать это при загрузке Java/Spring?
@scrhartley Видите обновленный код выше? Это помогает?
Вы имеете в виду подписаться , а не onSubscribe? Или я ищу не в том месте?
@scrhartley Да, ты прав. Я исправил это.
Сейчас я пытаюсь найти что-то получше, чем CountDownLatch
@scrhartley Как передать CountdownLatch в обратные вызовы?
Просто мысль: вам действительно нужны обратные вызовы или вы можете вместо этого использовать Mono.block(), а затем делать все с помощью императивного кода: String response1 = client1.post().block();




Вот пример использования CompletableFuture. Он использует третий параметр для Mono.subscribe, чтобы сообщить будущему, когда это будет сделано.
@Override
public void run() {
Mono<String> response1 = client1.post();
CompletableFuture<?> future1 = new CompletableFuture<>();
response1.subscribe(
new MyResponseCallback(), new MyErrorCallback(),
() -> future1.complete(null));
Mono<String> response2 = client2.post();
CompletableFuture<?> future2 = new CompletableFuture<>();
response2.subscribe(
new MyResponseCallback(), new MyErrorCallback(),
() -> future2.complete(null));
Mono<String> responseX = clientX.post();
CompletableFuture<?> futureX = new CompletableFuture<>();
responseX.subscribe(
new MyResponseCallback(), new MyErrorCallback(),
() -> futureX.complete(null));
CompletableFuture.allOf(future1, future2, futureX).join();
}
Вот пример CountDownLatch:
@Override
public void run() {
CountDownLatch latch = new CountDownLatch(3);
Mono<String> response1 = client1.post();
response1.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);
Mono<String> response2 = client2.post();
response2.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);
Mono<String> responseX = clientX.post();
responseX.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);
try {
latch.await();
} catch (InterruptedException ex) {}
}
Еще один CompletableFuture пример:
@Override
public void run() {
List<CompletableFuture<?>> futures = new ArrayList<>();
Supplier<Runnable> onDone = () -> {
CompletableFuture<?> future = new CompletableFuture<>();
futures.add(future);
return () -> future.complete(null);
};
Mono<String> response1 = client1.post();
response1.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());
Mono<String> response2 = client2.post();
response2.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());
Mono<String> responseX = clientX.post();
responseX.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
}
Все ли обратные вызовы действительно необходимы?
@Override
public void run() {
// Make requests
Mono<String> responseMono1 = client1.post();
Mono<String> responseMono2 = client2.post();
Mono<String> responseMonoX = clientX.post();
try {
// Wait for requests to complete
String response1 = responseMono1.block();
String response2 = responseMono2.block();
String responseX = responseMonoX.block();
...
}
catch (RuntimeException e) {
...
}
}
Почему ты глотаешь IterruptedException вместо того, чтобы пометить тред как прерванный?
1. InterruptedException обычно не возникает, и даже если очень постараться, какая-то часть библиотечной цепочки все равно не справится с ним должным образом. 2. Я не хотел никого отпугивать кодом. 3. Не обязательно существует один способ справиться с этим, например. возможно, вы зарегистрируете его здесь и тогда, или, может быть, вы повторно создадите его после сброса флага.
Было бы здорово предоставить в вашем ответе дополнительную информацию о прерывании потока. Если этот код выполняется в потоке, созданном пользователем, прерванное исключение может быть проглочено в соответствии с политикой прерывания, специфичной для пользователя. Под «созданным пользователем» я имею в виду new Thread(runnable).start(). Но я думаю, что создание новой темы, в которой будет CountdownLatch или CompletableFuture, — это своего рода накладные расходы. И если этот код выполняется в потоке Spring, вам не следует проглатывать InterruptedException, потому что вы не знаете политику прерываний Spring.
В этом примере делается попытка синхронизации между потоками, и я представил механизмы синхронизации между потоками (в конечном счете, реактивность поддерживается пулом потоков). Я не создаю никаких новых тем, просто использую существующие возможности языка (вероятно, задействован какой-то CAS). Я не утверждаю, что моя обработка InterruptedException оптимальна. Отмечу, что в примере Oracle Javadoc они либо объявляют исключение (чего мы не можем сделать здесь, поскольку это Runnable), либо делают то же самое, что и я.
Обновлен ответ, включающий использование Mono.block().
Вы используете реактивное программирование, но забудьте об этом и пытаетесь решить проблему императивно. Вам не нужно спать, вместо этого используйте возможности Project Reactor.
Вы можете использовать zip, чтобы объединить результаты разных Mono вместе, а затем map получить то, что вам нужно. Нет необходимости проверять boolean, используйте защелки обратного отсчета и т. д.
Mono<String> response1 = client1.post();
Mono<String> response2 = client2.post();
Mono<String> responseX = clientX.post();
Mono<String> result = Mono
.zip(response1, response2, response3)
.map(//dosomething with the result of the 3 mono's)
.subscribe();
Что вы делаете в своем Consumer, немного неясно, но вы, вероятно, можете использовать для этого и map для каждого Mono (или, если это просто установка логического значения, вы можете удалить их).
Можно ли иметь более полный пример, а не пытаться представить его по описанию? Как создать минимальный воспроизводимый пример. Например, на данный момент я не могу сказать, подойдет ли что-то вроде Future, Semaphore или CyclicBarrier.