CompletableFuture с тайм-аутом

Я только недавно начал использовать CompletableFuture, и у меня есть проблема, в которой у меня есть N запросов.

Каждый запрос должен быть отправлен на 2 разные конечные точки, а его результаты должны сравниваться в формате JSON. Поскольку у меня есть тонны запросов, которые нужно сделать, и я не знаю, сколько времени может занять каждый запрос, я хочу ограничить время ожидания результата, например, 3 секунды или около того.

Итак, я написал этот тестовый код:

public class MainTest {

   private static final Logger logger = LoggerFactory.getLogger(MainTest.class);
   private Instant start;

   public static void main(String[] args) {

       MainTest main = new MainTest();
       main.start();
   }

   public void start(){
       String req1 = "http://localhost:8080/testing";
       String req2 = "http://127.0.0.1:8095/testing2";

       ExecutorService exec = Executors.newCachedThreadPool();

       start = Instant.now();
       CompletableFuture<String> comp1 = CompletableFuture.supplyAsync(() -> doReq(req1), exec);
       CompletableFuture<String> comp2 = CompletableFuture.supplyAsync(() -> doReq(req2), exec);


       List<CompletableFuture<String>> completables = List.of(comp1,comp2);

       logger.info("Waiting completables");

       CompletableFuture<List<String>> a = allOf(completables);


       List<String> r = new ArrayList<>();
       try {
           r = a.get(3, TimeUnit.SECONDS);
       } catch (InterruptedException e) {
           e.printStackTrace();
       } catch (ExecutionException e) {
           e.printStackTrace();
       } catch (TimeoutException e) {
           e.printStackTrace();
       }finally {
           Instant end = Instant.now();
           logger.info(" Took: " + DurationFormatUtils.formatDurationHMS(Duration.between(start, end).toMillis()));

           System.out.println(r.size());
           r.forEach(System.out::println);
       }
       exec.shutdown();
   }

   public String doReq(String request){
       AtomicReference<String> response = new AtomicReference<>("default");
       try{
           logger.info("Sending request: {}", request);
           Unirest.get(request).asJson()
                   .ifSuccess(r -> {
                       response.set(r.getBody().toString());
                   })
                   .ifFailure(r -> {
                       logger.error("Oh No! Status" + r.getStatus());
                       r.getParsingError().ifPresent(e -> {
                           logger.error("Parsing Exception: ", e);
                           logger.error("Original body: " + e.getOriginalBody());
                       });
                   });
       } catch (Exception e) {
           logger.error("Error on request! {}", e.getMessage());

       }
      return response.get();
   }


   public <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futuresList) {
       CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[0]));
       return allFuturesResult.thenApply(v ->
                       futuresList.stream().
                       map(CompletableFuture::join).
                       collect(Collectors.<T>toList())
       );
   }
}

Проблема возникает, когда любой из запросов занимает более 3 секунд... Я хочу получить результат тех, которые успели его получить... Я намеренно поставил в своей сети задержку в одном из моих запросов на 7 секунд, и я получаю следующий вывод: у одного из них было время, но его результат отсутствует в списке...

2020-12-09T17:05:03,878 [pool-2-thread-2] INFO (MainTest:85) - Sending request: http://127.0.0.1:8095/testing2
2020-12-09T17:05:03,878 [pool-2-thread-1] INFO (MainTest:85) - Sending request: http://localhost:8080/testing
2020-12-09T17:05:03,878 [main] INFO (MainTest:53) - Waiting completables
java.util.concurrent.TimeoutException
    at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1886)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
    at me.testing.MainTest.start(MainTest.java:60)
    at me.testing.MainTest.main(MainTest.java:31)
2020-12-09T17:05:06,889 [main] INFO (MainTest:69) -  Took: 00:00:03.009
0
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
11
0
26 759
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Если вы получаете тайм-аут, вы должны получить значения из уже завершенных.

Можно как-то так:

public <T> List<T> getAllCompleted(List<CompletableFuture<T>> futuresList, long timeout, TimeUnit unit) {
  CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[futuresList.size()]));
  try {
    allFuturesResult.get(timeout, unit);
  } catch (Exception e) {
    // you may log it
  }
  return futuresList
    .stream()
    .filter(future -> future.isDone() && !future.isCompletedExceptionally()) // keep only the ones completed
    .map(CompletableFuture::join) // get the value from the completed future
    .collect(Collectors.<T>toList()); // collect as a list
}

Вот полный рабочий пример, я просто заменил doReq на sleep, потому что у меня нет вашего веб-сервиса:

public class MainTest {

    private Instant start;

    public static void main(String[] args) {

        MainTest main = new MainTest();
        main.start();
    }

    public void start() {
        String req1 = "http://localhost:8080/testing";
        String req2 = "http://127.0.0.1:8095/testing2";

        ExecutorService exec = Executors.newCachedThreadPool();

        start = Instant.now();
        CompletableFuture<String> comp1 = CompletableFuture.supplyAsync(() -> doReq(req1), exec);
        CompletableFuture<String> comp2 = CompletableFuture.supplyAsync(() -> doReq(req2), exec);

        List<CompletableFuture<String>> completables = List.of(comp1, comp2);

        System.out.println("Waiting completables");

        List<String> r = getAllCompleted(completables, 3, TimeUnit.SECONDS);
        Instant end = Instant.now();
        System.out.println(" Took: " + DurationFormatUtils.formatDurationHMS(Duration.between(start, end).toMillis()));

        System.out.println(r.size());
        r.forEach(System.out::println);
        exec.shutdown();
    }

    public String doReq(String request) {
        if (request.contains("localhost")) {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "response1";
        }
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "response2";
    }

    public <T> List<T> getAllCompleted(List<CompletableFuture<T>> futuresList, long timeout, TimeUnit unit) {
        CompletableFuture<Void> allFuturesResult = CompletableFuture.allOf(futuresList.toArray(new CompletableFuture[futuresList.size()]));
        try {
            allFuturesResult.get(timeout, unit);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return futuresList.stream()
            .filter(future -> future.isDone() && !future.isCompletedExceptionally()) // keep only the ones completed
            .map(CompletableFuture::join) // get the value from the completed future
            .collect(Collectors.<T>toList()); // collect as a list
    }
}

Это решение заставило меня двигаться. Это отличный способ управлять таймаутом в java 8, где completeOnTimeout недоступен.

bluelurker 08.09.2022 10:48

Другие вопросы по теме

Как я могу элегантно синхронизировать доступ к произвольному буферу?
Летучее бывает-до выяснения/непонимания?
Неожиданный вывод многопоточной программы C++
Если поток, заблокированный в std::condition_variable, уведомлен, но блокировка связанного с ним мьютекса не снята, каков будет результат?
Гарантирует ли хранилище memory_order_seq_cst, что загрузка memory_order_relaxed будет считывать записанное значение?
Блок потока данных TPL, который изменяет состояние и отправляет одно сообщение после завершения
Является ли многократная попытка блокировки хорошим решением для предотвращения взаимоблокировок?
Избыточный мьютекс. Является ли атомарный объект заменой мьютекса? Генерирует ли мьютекс атомарность, как атомарные объекты?
Сохраняет ли переключатель контекста (сохраняет состояние переменных) значение переменных при возобновлении задачи?
Шаблон наблюдателя с актером