Как я могу дождаться, пока весь предмет будет израсходован

Моя задача проста: мне нужно запросить скользящий индекс elasticsearch реактивным способом.

Поскольку @Document не поддерживает имя индекса с помощью Spring EL, например @Document(index = "indexName-#(new Date().format(yyyy-MM-dd))")

Итак, я пытаюсь вызвать elasticsearch с помощью ReactiveElasticsearchTemplate, который поддерживает изменение имени индекса во время выполнения.

Но поскольку объем данных превышает 10 000, мне нужно использовать прокрутку, чтобы повторять запрос, пока мы не получим все данные.

Я завершил первый запрос и запрос прокрутки, и он может вернуть значение.

Но мне нужно объединить все результаты и затем вернуться.

Как мне это сделать? На данный момент, когда потребитель все еще работает, пустой результат возвращается во внешний интерфейс. Как я могу попросить поток подождать, пока потребитель завершит поиск elasticsearch и вернет все данные? Спасибо.

public Flux<ELKModel> getByTradeDateBetween(LocalDateTime from, LocalDateTime to)
  throws Exception {
List<ELKModel> result = new ArrayList<ELKModel>();
List<Long> total = new ArrayList<>();
List<Long> currentSize = new ArrayList<>();
List<String> scrollId = new ArrayList<>();

NativeSearchQueryBuilder sourceBuilder = new NativeSearchQueryBuilder();
sourceBuilder.withQuery(
    QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery(TRADE_DATE).gte(from).lte(to)));
sourceBuilder.withPageable(PageRequest.of(0, SINGLE_QUERY_SIZE));
NativeSearchQuery query = sourceBuilder.build();
 elasticsearchSupport
    .scrollStart(query, ELKModel.class)
    .map(ELKModelWrapper::valueFrom).
        subscribe(
        wrapper -> {
          total.add(wrapper.getTotal());l
          currentSize.add(wrapper.getCurrentSize());
          result.addAll(wrapper.getResults());
          scrollId.add(wrapper.getScrollId());
        }).dispose();

while (currentSize.size() == 1 && total.size() == 1 && currentSize.get(0) < total.get(0)) {
    elasticsearchSupport
      .scrollContinue(scrollId.get(0), ELKModel.class)
      .map(ELKModelWrapper::valueFrom)
      .subscribe(
          wrapper -> {
            currentSize.add(0, currentSize.get(0) + wrapper.getCurrentSize());
            result.addAll(wrapper.getResults());
            scrollId.add(0, wrapper.getScrollId());
          }).dispose();
          
}

return Flux.fromIterable(result);

}

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
50
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Поскольку вы используете реактивный подход с Flux, вам не следует блокировать с помощью оператора while. Вместо этого вам следует объединить свои реактивные операции в цепочку.

expand используется для выполнения повторных запросов до тех пор, пока мы не соберем все необходимые данные, применяя операцию прокруткиПродолжить только в том случае, если еще остались данные согласно сравнению общего и текущего размера.

Некоторая модификация вашего кода, но не обязательно работоспособная.

public Flux<ELKModel> getByTradeDateBetween(LocalDateTime from, LocalDateTime to) {

    NativeSearchQueryBuilder sourceBuilder = new NativeSearchQueryBuilder();
    sourceBuilder.withQuery(
            QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery(TRADE_DATE).gte(from).lte(to)));
    sourceBuilder.withPageable(PageRequest.of(0, SINGLE_QUERY_SIZE));
    NativeSearchQuery query = sourceBuilder.build();

    return elasticsearchSupport.scrollStart(query, ELKModel.class)
            .expand(wrapper -> {
                if (wrapper.getTotal() > wrapper.getCurrentSize()) {
                    return elasticsearchSupport.scrollContinue(wrapper.getScrollId(), ELKModel.class)
                            .map(ELKModelWrapper::valueFrom);
                } else {
                    return Flux.empty();
                }
            })
            .flatMap(wrapper -> Flux.fromIterable(wrapper.getResults()));
}

Большое спасибо за ваш совет, похоже, мне нужно заняться самостоятельным изучением реактивной реакции. :П

Jason 29.03.2024 11:05
Ответ принят как подходящий

Вы должны использовать довольно устаревшую версию Spring Data Elasticsearch. Поддержка SpEL для индексных имен в аннотации @Document была введена 4 года назад, вы можете увидеть, как это можно использовать, в моем посте https://www.sothawo.com/2020/07/how-to-provide-a -динамическое-имя-индекса-в-весне-данных-elasticsearch-using-spel/.

Что касается реактивности: вы никогда не должны блокировать поток в реактивном коде. И реактивный код выполняет эту прокрутку под капотом, я не понимаю, почему вы хотите сделать это самостоятельно.

очень ценю ваш ответ и ваш блог, на самом деле я уже пробовал SpEL раньше, но, возможно, это не удалось из-за грамматической ошибки SpEL, и это заставило меня выбрать сложный путь.

Jason 29.03.2024 11:03

Другие вопросы по теме

Похожие вопросы

NPE: невозможно распаковать нулевое значение
Как вернуть реальный объект из макетной конструкции с помощью Mockito
Почему невозможно изменить ArrayList при переборе его с помощью расширенного цикла for, тогда как это можно сделать с помощью обычного цикла for?
Задача отмены Java, выполняющая запрос Oracle через JDBC - соединение разорвано из-за SQLSTATE (08006), ErrorCode (17002). Ошибка ввода-вывода: чтение сокета прервано
Как получить метку, указанную в аннотации с возможностью повторной попытки весной, на уровне класса в MethodInvocateRetryListenerSupport?
Как отличить цвет фона от цвета текста?
Время получения кэша Apache Ignite слишком велико
Java-калькулятор не работает. Нужна помощь в устранении неполадок (для начинающих в области информатики)
Стоит ли получать доступ к данным по блокам на современных ОС/оборудовании?
Как избежать цикла while во время ожидания завершения будущего?